From 33439e11c81523a0305042d4461d4c3ec2a648fa Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Thu, 16 Apr 2026 16:00:06 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20rendre=20l=E2=80=99ingestion=20des=20tic?= =?UTF-8?q?kets=20atomique?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src-tauri/migrations/001_init.sql | 3 ++ .../006_processed_tickets_unique_index.sql | 47 +++++++++++++++++++ src-tauri/src/db.rs | 23 ++++++++- src-tauri/src/models/ticket.rs | 46 +++++++++++++++--- 4 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 src-tauri/migrations/006_processed_tickets_unique_index.sql diff --git a/src-tauri/migrations/001_init.sql b/src-tauri/migrations/001_init.sql index ea1a332..d4a2c0a 100644 --- a/src-tauri/migrations/001_init.sql +++ b/src-tauri/migrations/001_init.sql @@ -40,6 +40,9 @@ CREATE TABLE IF NOT EXISTS processed_tickets ( processed_at TEXT ); +CREATE UNIQUE INDEX IF NOT EXISTS idx_processed_tickets_tracker_artifact_unique +ON processed_tickets(tracker_id, artifact_id); + CREATE TABLE IF NOT EXISTS worktrees ( id TEXT PRIMARY KEY, ticket_id TEXT NOT NULL REFERENCES processed_tickets(id), diff --git a/src-tauri/migrations/006_processed_tickets_unique_index.sql b/src-tauri/migrations/006_processed_tickets_unique_index.sql new file mode 100644 index 0000000..1db519d --- /dev/null +++ b/src-tauri/migrations/006_processed_tickets_unique_index.sql @@ -0,0 +1,47 @@ +BEGIN; + +DROP TABLE IF EXISTS _processed_ticket_dedup_map; + +CREATE TEMP TABLE _processed_ticket_dedup_map AS +SELECT dup.id AS duplicate_id, + ( + SELECT keep.id + FROM processed_tickets keep + WHERE keep.tracker_id = dup.tracker_id + AND keep.artifact_id = dup.artifact_id + ORDER BY keep.rowid ASC + LIMIT 1 + ) AS keep_id +FROM processed_tickets dup +WHERE dup.rowid > ( + SELECT MIN(base.rowid) + FROM processed_tickets base + WHERE base.tracker_id = dup.tracker_id + AND base.artifact_id = dup.artifact_id +); + +UPDATE worktrees +SET ticket_id = ( + SELECT keep_id + FROM _processed_ticket_dedup_map map + WHERE map.duplicate_id = worktrees.ticket_id +) +WHERE ticket_id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map); + +UPDATE notifications +SET ticket_id = ( + SELECT keep_id + FROM _processed_ticket_dedup_map map + WHERE map.duplicate_id = notifications.ticket_id +) +WHERE ticket_id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map); + +DELETE FROM processed_tickets +WHERE id IN (SELECT duplicate_id FROM _processed_ticket_dedup_map); + +DROP TABLE IF EXISTS _processed_ticket_dedup_map; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_processed_tickets_tracker_artifact_unique +ON processed_tickets(tracker_id, artifact_id); + +COMMIT; diff --git a/src-tauri/src/db.rs b/src-tauri/src/db.rs index fcedc49..3fc8a4b 100644 --- a/src-tauri/src/db.rs +++ b/src-tauri/src/db.rs @@ -6,6 +6,7 @@ const MIGRATION_002: &str = include_str!("../migrations/002_add_last_polled.sql" const MIGRATION_003: &str = include_str!("../migrations/003_add_agents.sql"); const MIGRATION_004: &str = include_str!("../migrations/004_default_agents.sql"); const MIGRATION_005: &str = include_str!("../migrations/005_orchestration_modules_chat_tasks.sql"); +const MIGRATION_006: &str = include_str!("../migrations/006_processed_tickets_unique_index.sql"); pub fn init(db_path: &Path) -> Result { let conn = Connection::open(db_path)?; @@ -51,6 +52,10 @@ fn migrate(conn: &Connection) -> Result<()> { conn.execute_batch(MIGRATION_005)?; conn.pragma_update(None, "user_version", 5)?; } + if version < 6 { + conn.execute_batch(MIGRATION_006)?; + conn.pragma_update(None, "user_version", 6)?; + } Ok(()) } @@ -106,7 +111,7 @@ mod tests { let version: i32 = conn .pragma_query_value(None, "user_version", |row| row.get(0)) .unwrap(); - assert_eq!(version, 5); + assert_eq!(version, 6); } #[test] @@ -131,4 +136,20 @@ mod tests { assert_eq!(analyst_defaults, 1); assert_eq!(developer_defaults, 1); } + + #[test] + fn test_processed_tickets_unique_index_exists() { + let conn = init_in_memory().expect("should initialize"); + + let unique_idx_count: i32 = conn + .query_row( + "SELECT COUNT(*) FROM pragma_index_list('processed_tickets') \ + WHERE name = 'idx_processed_tickets_tracker_artifact_unique' AND \"unique\" = 1", + [], + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(unique_idx_count, 1); + } } diff --git a/src-tauri/src/models/ticket.rs b/src-tauri/src/models/ticket.rs index 7a973e1..2ff2653 100644 --- a/src-tauri/src/models/ticket.rs +++ b/src-tauri/src/models/ticket.rs @@ -40,7 +40,7 @@ const SELECT_ALL_COLS: &str = "SELECT id, tracker_id, artifact_id, artifact_titl detected_at, processed_at FROM processed_tickets"; impl ProcessedTicket { - /// Insert a new ticket if one with the same (tracker_id, artifact_id) doesn't exist. + /// Atomically insert a new ticket keyed by (tracker_id, artifact_id). /// Returns Some(ticket) if inserted, None if it was a duplicate. pub fn insert_if_new( conn: &Connection, @@ -49,15 +49,11 @@ impl ProcessedTicket { artifact_title: &str, artifact_data: &str, ) -> Result> { - if Self::exists(conn, tracker_id, artifact_id)? { - return Ok(None); - } - let id = Uuid::new_v4().to_string(); let now = chrono::Utc::now().to_rfc3339(); - conn.execute( - "INSERT INTO processed_tickets \ + let inserted_rows = conn.execute( + "INSERT OR IGNORE INTO processed_tickets \ (id, tracker_id, artifact_id, artifact_title, artifact_data, status, detected_at) \ VALUES (?1, ?2, ?3, ?4, ?5, 'Pending', ?6)", params![ @@ -70,6 +66,10 @@ impl ProcessedTicket { ], )?; + if inserted_rows == 0 { + return Ok(None); + } + let ticket = ProcessedTicket { id, tracker_id: tracker_id.to_string(), @@ -89,6 +89,7 @@ impl ProcessedTicket { } /// Returns true if a ticket with (tracker_id, artifact_id) already exists. + #[cfg(test)] pub fn exists(conn: &Connection, tracker_id: &str, artifact_id: i32) -> Result { let count: i64 = conn.query_row( "SELECT COUNT(*) FROM processed_tickets WHERE tracker_id = ?1 AND artifact_id = ?2", @@ -262,6 +263,37 @@ mod tests { assert!(second.is_none()); } + #[test] + fn test_unique_constraint_blocks_manual_duplicate_insert() { + let (conn, tracker_id) = setup(); + ProcessedTicket::insert_if_new(&conn, &tracker_id, 909, "Duplicate candidate", "{}") + .expect("first insert should succeed"); + + let duplicate_insert = conn.execute( + "INSERT INTO processed_tickets \ + (id, tracker_id, artifact_id, artifact_title, artifact_data, status, detected_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, 'Pending', ?6)", + rusqlite::params![ + Uuid::new_v4().to_string(), + tracker_id, + 909, + "Duplicate candidate", + "{}", + chrono::Utc::now().to_rfc3339() + ], + ); + assert!(duplicate_insert.is_err()); + + let duplicates: i64 = conn + .query_row( + "SELECT COUNT(*) FROM processed_tickets WHERE tracker_id = ?1 AND artifact_id = ?2", + rusqlite::params![tracker_id, 909], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(duplicates, 1); + } + #[test] fn test_exists() { let (conn, tracker_id) = setup();