fix: rendre l’ingestion des tickets atomique

This commit is contained in:
thibaud-lclr 2026-04-16 16:00:06 +02:00
parent 41a953106d
commit 33439e11c8
4 changed files with 111 additions and 8 deletions

View file

@ -40,6 +40,9 @@ CREATE TABLE IF NOT EXISTS processed_tickets (
processed_at TEXT
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_processed_tickets_tracker_artifact_unique
ON processed_tickets(tracker_id, artifact_id);
CREATE TABLE IF NOT EXISTS worktrees (
id TEXT PRIMARY KEY,
ticket_id TEXT NOT NULL REFERENCES processed_tickets(id),

View file

@ -0,0 +1,47 @@
BEGIN;
DROP TABLE IF EXISTS _processed_ticket_dedup_map;
CREATE TEMP TABLE _processed_ticket_dedup_map AS
SELECT dup.id AS duplicate_id,
(
SELECT keep.id
FROM processed_tickets keep
WHERE keep.tracker_id = dup.tracker_id
AND keep.artifact_id = dup.artifact_id
ORDER BY keep.rowid ASC
LIMIT 1
) AS keep_id
FROM processed_tickets dup
WHERE dup.rowid > (
SELECT MIN(base.rowid)
FROM processed_tickets base
WHERE base.tracker_id = dup.tracker_id
AND base.artifact_id = dup.artifact_id
);
UPDATE worktrees
SET ticket_id = (
SELECT keep_id
FROM _processed_ticket_dedup_map map
WHERE map.duplicate_id = worktrees.ticket_id
)
WHERE ticket_id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map);
UPDATE notifications
SET ticket_id = (
SELECT keep_id
FROM _processed_ticket_dedup_map map
WHERE map.duplicate_id = notifications.ticket_id
)
WHERE ticket_id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map);
DELETE FROM processed_tickets
WHERE id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map);
DROP TABLE IF EXISTS _processed_ticket_dedup_map;
CREATE UNIQUE INDEX IF NOT EXISTS idx_processed_tickets_tracker_artifact_unique
ON processed_tickets(tracker_id, artifact_id);
COMMIT;

View file

@ -6,6 +6,7 @@ const MIGRATION_002: &str = include_str!("../migrations/002_add_last_polled.sql"
const MIGRATION_003: &str = include_str!("../migrations/003_add_agents.sql");
const MIGRATION_004: &str = include_str!("../migrations/004_default_agents.sql");
const MIGRATION_005: &str = include_str!("../migrations/005_orchestration_modules_chat_tasks.sql");
const MIGRATION_006: &str = include_str!("../migrations/006_processed_tickets_unique_index.sql");
pub fn init(db_path: &Path) -> Result<Connection> {
let conn = Connection::open(db_path)?;
@ -51,6 +52,10 @@ fn migrate(conn: &Connection) -> Result<()> {
conn.execute_batch(MIGRATION_005)?;
conn.pragma_update(None, "user_version", 5)?;
}
if version < 6 {
conn.execute_batch(MIGRATION_006)?;
conn.pragma_update(None, "user_version", 6)?;
}
Ok(())
}
@ -106,7 +111,7 @@ mod tests {
let version: i32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, 5);
assert_eq!(version, 6);
}
#[test]
@ -131,4 +136,20 @@ mod tests {
assert_eq!(analyst_defaults, 1);
assert_eq!(developer_defaults, 1);
}
#[test]
fn test_processed_tickets_unique_index_exists() {
let conn = init_in_memory().expect("should initialize");
let unique_idx_count: i32 = conn
.query_row(
"SELECT COUNT(*) FROM pragma_index_list('processed_tickets') \
WHERE name = 'idx_processed_tickets_tracker_artifact_unique' AND \"unique\" = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(unique_idx_count, 1);
}
}

View file

@ -40,7 +40,7 @@ const SELECT_ALL_COLS: &str = "SELECT id, tracker_id, artifact_id, artifact_titl
detected_at, processed_at FROM processed_tickets";
impl ProcessedTicket {
/// Insert a new ticket if one with the same (tracker_id, artifact_id) doesn't exist.
/// Atomically insert a new ticket keyed by (tracker_id, artifact_id).
/// Returns Some(ticket) if inserted, None if it was a duplicate.
pub fn insert_if_new(
conn: &Connection,
@ -49,15 +49,11 @@ impl ProcessedTicket {
artifact_title: &str,
artifact_data: &str,
) -> Result<Option<ProcessedTicket>> {
if Self::exists(conn, tracker_id, artifact_id)? {
return Ok(None);
}
let id = Uuid::new_v4().to_string();
let now = chrono::Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO processed_tickets \
let inserted_rows = conn.execute(
"INSERT OR IGNORE INTO processed_tickets \
(id, tracker_id, artifact_id, artifact_title, artifact_data, status, detected_at) \
VALUES (?1, ?2, ?3, ?4, ?5, 'Pending', ?6)",
params![
@ -70,6 +66,10 @@ impl ProcessedTicket {
],
)?;
if inserted_rows == 0 {
return Ok(None);
}
let ticket = ProcessedTicket {
id,
tracker_id: tracker_id.to_string(),
@ -89,6 +89,7 @@ impl ProcessedTicket {
}
/// Returns true if a ticket with (tracker_id, artifact_id) already exists.
#[cfg(test)]
pub fn exists(conn: &Connection, tracker_id: &str, artifact_id: i32) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM processed_tickets WHERE tracker_id = ?1 AND artifact_id = ?2",
@ -262,6 +263,37 @@ mod tests {
assert!(second.is_none());
}
#[test]
fn test_unique_constraint_blocks_manual_duplicate_insert() {
let (conn, tracker_id) = setup();
ProcessedTicket::insert_if_new(&conn, &tracker_id, 909, "Duplicate candidate", "{}")
.expect("first insert should succeed");
let duplicate_insert = conn.execute(
"INSERT INTO processed_tickets \
(id, tracker_id, artifact_id, artifact_title, artifact_data, status, detected_at) \
VALUES (?1, ?2, ?3, ?4, ?5, 'Pending', ?6)",
rusqlite::params![
Uuid::new_v4().to_string(),
tracker_id,
909,
"Duplicate candidate",
"{}",
chrono::Utc::now().to_rfc3339()
],
);
assert!(duplicate_insert.is_err());
let duplicates: i64 = conn
.query_row(
"SELECT COUNT(*) FROM processed_tickets WHERE tracker_id = ?1 AND artifact_id = ?2",
rusqlite::params![tracker_id, 909],
|row| row.get(0),
)
.unwrap();
assert_eq!(duplicates, 1);
}
#[test]
fn test_exists() {
let (conn, tracker_id) = setup();