fix(orchestrator): recover interrupted inflight tickets on startup
This commit is contained in:
parent
1b751ac16d
commit
5b795c00b3
2 changed files with 129 additions and 0 deletions
|
|
@ -286,6 +286,27 @@ impl ProcessedTicket {
|
|||
rows.collect()
|
||||
}
|
||||
|
||||
pub fn list_inflight(conn: &Connection) -> Result<Vec<ProcessedTicket>> {
|
||||
let sql = format!(
|
||||
"{} WHERE status IN ('Analyzing', 'Developing') ORDER BY detected_at ASC",
|
||||
SELECT_ALL_COLS
|
||||
);
|
||||
let mut stmt = conn.prepare(&sql)?;
|
||||
let rows = stmt.query_map([], from_row)?;
|
||||
rows.collect()
|
||||
}
|
||||
|
||||
pub fn reset_for_retry(conn: &Connection, id: &str) -> Result<()> {
|
||||
conn.execute(
|
||||
"UPDATE processed_tickets \
|
||||
SET status = 'Pending', analyst_report = NULL, developer_report = NULL, \
|
||||
worktree_path = NULL, branch_name = NULL, processed_at = NULL \
|
||||
WHERE id = ?1",
|
||||
params![id],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_error(conn: &Connection, id: &str, error_message: &str) -> Result<()> {
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
conn.execute(
|
||||
|
|
@ -622,6 +643,29 @@ mod tests {
|
|||
assert_eq!(pending2[0].artifact_id, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reset_for_retry_clears_inflight_fields() {
|
||||
let (conn, project_id, tracker_id) = setup();
|
||||
let ticket = ProcessedTicket::insert_if_new(&conn, &project_id, &tracker_id, 1, "T1", "{}")
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
ProcessedTicket::update_status(&conn, &ticket.id, "Developing").unwrap();
|
||||
ProcessedTicket::set_analyst_report(&conn, &ticket.id, "analysis").unwrap();
|
||||
ProcessedTicket::set_developer_report(&conn, &ticket.id, "dev report").unwrap();
|
||||
ProcessedTicket::set_worktree_info(&conn, &ticket.id, "/tmp/wt", "orchai/1").unwrap();
|
||||
|
||||
ProcessedTicket::reset_for_retry(&conn, &ticket.id).unwrap();
|
||||
|
||||
let updated = ProcessedTicket::get_by_id(&conn, &ticket.id).unwrap();
|
||||
assert_eq!(updated.status, "Pending");
|
||||
assert!(updated.analyst_report.is_none());
|
||||
assert!(updated.developer_report.is_none());
|
||||
assert!(updated.worktree_path.is_none());
|
||||
assert!(updated.branch_name.is_none());
|
||||
assert!(updated.processed_at.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_error() {
|
||||
let (conn, project_id, tracker_id) = setup();
|
||||
|
|
|
|||
|
|
@ -149,6 +149,47 @@ pub fn parse_verdict(report: &str) -> Verdict {
|
|||
Verdict::FixNeeded
|
||||
}
|
||||
|
||||
fn recover_interrupted_tickets(db: &Arc<Mutex<Connection>>) -> Result<usize, String> {
|
||||
let inflight = {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
ProcessedTicket::list_inflight(&conn).map_err(|e| format!("list_inflight failed: {}", e))?
|
||||
};
|
||||
|
||||
let mut recovered = 0usize;
|
||||
|
||||
for ticket in inflight {
|
||||
let cleanup_target = {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
let worktree = Worktree::get_by_ticket_id(&conn, &ticket.id)
|
||||
.map_err(|e| format!("get worktree failed: {}", e))?;
|
||||
let project = Project::get_by_id(&conn, &ticket.project_id)
|
||||
.map_err(|e| format!("get project failed: {}", e))?;
|
||||
worktree.map(|wt| (wt, project.path))
|
||||
};
|
||||
|
||||
if let Some((worktree, project_path)) = cleanup_target {
|
||||
if worktree.status == "Active" {
|
||||
// Best effort cleanup so a re-queued ticket can create a fresh worktree.
|
||||
let _ = worktree_manager::delete_worktree(
|
||||
&project_path,
|
||||
&worktree.path,
|
||||
&worktree.branch_name,
|
||||
);
|
||||
}
|
||||
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
let _ = Worktree::delete(&conn, &worktree.id);
|
||||
}
|
||||
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
ProcessedTicket::reset_for_retry(&conn, &ticket.id)
|
||||
.map_err(|e| format!("reset_for_retry failed: {}", e))?;
|
||||
recovered += 1;
|
||||
}
|
||||
|
||||
Ok(recovered)
|
||||
}
|
||||
|
||||
pub struct TicketCliContext<'a> {
|
||||
pub app_handle: &'a AppHandle,
|
||||
pub ticket_id: &'a str,
|
||||
|
|
@ -676,6 +717,19 @@ async fn process_ticket(
|
|||
}
|
||||
|
||||
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle, process_registry: ProcessRegistry) {
|
||||
match recover_interrupted_tickets(&db) {
|
||||
Ok(count) if count > 0 => {
|
||||
eprintln!(
|
||||
"orchestrator: recovered {} interrupted ticket(s) to Pending state",
|
||||
count
|
||||
);
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
eprintln!("orchestrator: failed to recover interrupted tickets: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
tauri::async_runtime::spawn(async move {
|
||||
let mut tick = interval(Duration::from_secs(10));
|
||||
loop {
|
||||
|
|
@ -696,6 +750,7 @@ pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle, process_registry
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db;
|
||||
|
||||
#[test]
|
||||
fn test_build_analyst_prompt_contains_ticket_info() {
|
||||
|
|
@ -793,4 +848,34 @@ mod tests {
|
|||
let report = "Verdict: [VERDICT: NO_FIX] - no code change needed.";
|
||||
assert_eq!(parse_verdict(report), Verdict::NoFix);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recover_interrupted_tickets_requeues_analyzing_status() {
|
||||
let conn = db::init_in_memory().expect("db init should succeed");
|
||||
let project =
|
||||
Project::insert(&conn, "Test", "/tmp/orchai-test", None, "main").expect("project");
|
||||
let ticket = ProcessedTicket::insert_external(
|
||||
&conn,
|
||||
&project.id,
|
||||
"graylog",
|
||||
Some("subject-1"),
|
||||
-1,
|
||||
"Interrupted ticket",
|
||||
"{}",
|
||||
)
|
||||
.expect("ticket");
|
||||
|
||||
ProcessedTicket::update_status(&conn, &ticket.id, "Analyzing")
|
||||
.expect("status update should succeed");
|
||||
|
||||
let shared_db = Arc::new(Mutex::new(conn));
|
||||
let recovered = recover_interrupted_tickets(&shared_db)
|
||||
.expect("recovery should succeed for analyzing tickets");
|
||||
assert_eq!(recovered, 1);
|
||||
|
||||
let guard = shared_db.lock().expect("db lock should succeed");
|
||||
let updated =
|
||||
ProcessedTicket::get_by_id(&guard, &ticket.id).expect("ticket lookup should succeed");
|
||||
assert_eq!(updated.status, "Pending");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue