use crate::models::agent::{Agent, AgentRole}; use crate::models::graylog::GraylogCredentials; use crate::models::module::{ ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE, MODULE_TULEAP_AUTO_RESOLVE, }; use crate::models::project::Project; use crate::models::ticket::ProcessedTicket; use crate::models::tracker::WatchedTracker; use crate::models::worktree::Worktree; use crate::services::process_registry::ProcessRegistry; use crate::services::{cli_process, notifier, worktree_manager}; use rusqlite::Connection; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter}; use tokio::process::Command; use tokio::sync::Mutex as AsyncMutex; use tokio::time::{interval, timeout, Duration}; #[derive(Debug, Clone, PartialEq)] pub enum Verdict { FixNeeded, NoFix, } pub fn build_analyst_prompt(ticket: &ProcessedTicket, project: &Project) -> String { let source_ref = ticket.source_ref.as_deref().unwrap_or("-"); format!( r#"Tu es un analyste technique. Voici un ticket a analyser. ## Ticket - ID: {artifact_id} - Titre: {title} - Donnees: {data} - Source: {source} - Source ref: {source_ref} ## Contexte - Projet: {project_name} - Repo: {project_path} - Branche de base: {base_branch} ## Ta mission 1. Analyse le ticket et identifie les fichiers/fonctions concernes 2. Explique techniquement le probleme 3. Evalue si une correction de code est necessaire 4. Produis un rapport structure en markdown Termine ton rapport par un de ces verdicts sur une ligne separee: [VERDICT: FIX_NEEDED] si une correction de code est necessaire [VERDICT: NO_FIX] si aucune correction n'est necessaire"#, artifact_id = ticket.artifact_id, title = ticket.artifact_title, data = ticket.artifact_data, source = ticket.source, source_ref = source_ref, project_name = project.name, project_path = project.path, base_branch = project.base_branch, ) } pub fn build_developer_prompt( ticket: &ProcessedTicket, project: &Project, analyst_report: &str, worktree_path: &str, ) -> String { let source_ref = ticket.source_ref.as_deref().unwrap_or("-"); format!( r#"Tu es un developpeur. Tu dois corriger un bug ou implementer une fonctionnalite d'apres l'analyse suivante. ## Rapport d'analyse {analyst_report} ## Ticket - ID: {artifact_id} - Titre: {title} - Source: {source} - Source ref: {source_ref} ## Contexte - Projet: {project_name} - Repo (worktree): {worktree_path} - Branche de base: {base_branch} ## Ta mission 1. Implemente la correction dans le code 2. Fais des commits atomiques avec des messages clairs 3. Produis un rapport en markdown decrivant les changements effectues"#, analyst_report = analyst_report, artifact_id = ticket.artifact_id, title = ticket.artifact_title, source = ticket.source, source_ref = source_ref, project_name = project.name, worktree_path = worktree_path, base_branch = project.base_branch, ) } fn append_custom_prompt(base_prompt: String, custom_prompt: &str) -> String { let extra = custom_prompt.trim(); if extra.is_empty() { return base_prompt; } format!( "{base_prompt}\n\n## Instructions supplementaires (agent)\n{extra}", base_prompt = base_prompt, extra = extra ) } fn record_ticket_error( db: &Arc>, app_handle: &AppHandle, project_id: &str, ticket_id: &str, artifact_id: i32, error: &str, ) { if let Ok(conn) = db.lock() { let _ = ProcessedTicket::set_error(&conn, ticket_id, error); } notifier::notify_error(db, app_handle, project_id, ticket_id, artifact_id, error); let _ = app_handle.emit( "ticket-processing-error", serde_json::json!({ "project_id": project_id, "ticket_id": ticket_id, "artifact_id": artifact_id, "error": error }), ); } pub fn parse_verdict(report: &str) -> Verdict { for line in report.lines().rev() { let trimmed = line.trim(); if trimmed.contains("[VERDICT: NO_FIX]") { return Verdict::NoFix; } if trimmed.contains("[VERDICT: FIX_NEEDED]") { return Verdict::FixNeeded; } } Verdict::FixNeeded } pub struct TicketCliContext<'a> { pub app_handle: &'a AppHandle, pub ticket_id: &'a str, pub process_registry: &'a ProcessRegistry, } pub async fn run_cli_command( command: &str, args: &[String], prompt: &str, working_dir: &str, timeout_secs: u64, context: TicketCliContext<'_>, ) -> Result { let TicketCliContext { app_handle, ticket_id, process_registry, } = context; let child = Command::new(command) .args(args) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .current_dir(working_dir) .spawn() .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?; let child = Arc::new(AsyncMutex::new(child)); let cancellation_requested = Arc::new(AtomicBool::new(false)); process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone()); let (stdin, stdout, stderr) = { let mut child_guard = child.lock().await; let stdin = child_guard.stdin.take(); let stdout = child_guard.stdout.take(); let stderr = child_guard.stderr.take(); (stdin, stdout, stderr) }; if let Some(mut stdin) = stdin { use tokio::io::AsyncWriteExt; stdin.write_all(prompt.as_bytes()).await.map_err(|e| { process_registry.unregister_ticket(ticket_id); format!("Failed to write to stdin: {}", e) })?; } let stdout = stdout.ok_or_else(|| { process_registry.unregister_ticket(ticket_id); "Failed to capture stdout".to_string() })?; let stderr = stderr.ok_or_else(|| { process_registry.unregister_ticket(ticket_id); "Failed to capture stderr".to_string() })?; let read_future = cli_process::collect_process_output(child.clone(), stdout, stderr, |line| { let _ = app_handle.emit( "ticket-processing-progress", serde_json::json!({ "ticket_id": ticket_id, "output_chunk": line, }), ); Ok(()) }); let (result, stderr_output, status) = match timeout(Duration::from_secs(timeout_secs), read_future).await { Ok(result) => match result { Ok(values) => values, Err(e) => { process_registry.unregister_ticket(ticket_id); return Err(e); } }, Err(_) => { cancellation_requested.store(true, Ordering::SeqCst); { let mut child_guard = child.lock().await; let _ = child_guard.start_kill(); } process_registry.unregister_ticket(ticket_id); return Err(format!("CLI command timed out after {}s", timeout_secs)); } }; process_registry.unregister_ticket(ticket_id); if cancellation_requested.load(Ordering::SeqCst) { return Err("CLI command cancelled".to_string()); } if !status.success() { let stderr = stderr_output.trim(); let code = status.code().unwrap_or(-1); if stderr.is_empty() { return Err(format!("CLI command exited with code {}", code)); } return Err(format!("CLI command exited with code {}: {}", code, stderr)); } Ok(result) } fn is_ticket_cancelled(db: &Arc>, ticket_id: &str) -> Result { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let current = ProcessedTicket::get_by_id(&conn, ticket_id).map_err(|e| format!("get_by_id: {}", e))?; Ok(current.status == "Cancelled") } async fn process_ticket( db: &Arc>, app_handle: &AppHandle, process_registry: &ProcessRegistry, ) -> Result { let (ticket, project, tracker) = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let pending = ProcessedTicket::list_pending(&conn) .map_err(|e| format!("list_pending failed: {}", e))?; let mut selected: Option<(ProcessedTicket, Project, Option)> = None; for ticket in pending { let project = Project::get_by_id(&conn, &ticket.project_id) .map_err(|e| format!("get project failed: {}", e))?; match ticket.source.as_str() { "tuleap" => { let tracker = match ticket.tracker_id.as_deref() { Some(tracker_id) => Some( WatchedTracker::get_by_id(&conn, tracker_id) .map_err(|e| format!("get tracker failed: {}", e))?, ), None => None, }; let enabled = ProjectModule::is_enabled( &conn, &project.id, MODULE_TULEAP_AUTO_RESOLVE, ) .map_err(|e| format!("module lookup failed: {}", e))?; if enabled { selected = Some((ticket, project, tracker)); break; } } "graylog" => { let enabled = ProjectModule::is_enabled( &conn, &project.id, MODULE_GRAYLOG_AUTO_RESOLVE, ) .map_err(|e| format!("module lookup failed: {}", e))?; if enabled { selected = Some((ticket, project, None)); break; } } _ => { eprintln!( "orchestrator: unsupported ticket source '{}' for ticket {}", ticket.source, ticket.id ); } } } match selected { Some(item) => item, None => return Ok(false), } }; let (analyst_agent, developer_agent) = { let (analyst_id, developer_id) = if ticket.source == "graylog" { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let config = match GraylogCredentials::get_by_project(&conn, &project.id) .map_err(|e| format!("graylog credentials lookup failed: {}", e))? { Some(value) => value, None => { drop(conn); record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Graylog credentials are missing.", ); return Ok(true); } }; drop(conn); ( config.analyst_agent_id.to_string(), config.developer_agent_id.to_string(), ) } else if ticket.source == "tuleap" { let tracker = match &tracker { Some(value) => value, None => { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Missing tracker reference for Tuleap ticket.", ); return Ok(true); } }; if tracker.status != "valid" { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Tracker is invalid. Configure analyst and developer agents.", ); return Ok(true); } let analyst_id = match tracker.analyst_agent_id.as_deref() { Some(id) => id.to_string(), None => { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Tracker has no analyst agent configured.", ); return Ok(true); } }; let developer_id = match tracker.developer_agent_id.as_deref() { Some(id) => id.to_string(), None => { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Tracker has no developer agent configured.", ); return Ok(true); } }; (analyst_id, developer_id) } else { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, &format!("Unsupported ticket source '{}'.", ticket.source), ); return Ok(true); }; let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let analyst_agent = match Agent::get_by_id(&conn, &analyst_id) { Ok(agent) => agent, Err(_) => { drop(conn); record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Configured analyst agent was not found.", ); return Ok(true); } }; let developer_agent = match Agent::get_by_id(&conn, &developer_id) { Ok(agent) => agent, Err(_) => { drop(conn); record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Configured developer agent was not found.", ); return Ok(true); } }; if analyst_agent.role != AgentRole::Analyst { drop(conn); record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Configured analyst agent has an invalid role.", ); return Ok(true); } if developer_agent.role != AgentRole::Developer { drop(conn); record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, "Configured developer agent has an invalid role.", ); return Ok(true); } (analyst_agent, developer_agent) }; { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; ProcessedTicket::update_status(&conn, &ticket.id, "Analyzing") .map_err(|e| format!("update_status failed: {}", e))?; } let _ = app_handle.emit( "ticket-processing-started", serde_json::json!({ "project_id": &project.id, "ticket_id": &ticket.id, "artifact_id": ticket.artifact_id, "step": "analyst", }), ); let analyst_prompt = append_custom_prompt( build_analyst_prompt(&ticket, &project), &analyst_agent.custom_prompt, ); let analyst_args = analyst_agent.tool.to_non_interactive_args(); let analyst_result = run_cli_command( analyst_agent.tool.to_command(), &analyst_args, &analyst_prompt, &project.path, 600, TicketCliContext { app_handle, ticket_id: &ticket.id, process_registry, }, ) .await; let analyst_report = match analyst_result { Ok(report) => report, Err(e) => { if is_ticket_cancelled(db, &ticket.id)? { return Ok(true); } record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, &e, ); return Ok(true); } }; if is_ticket_cancelled(db, &ticket.id)? { return Ok(true); } { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::set_analyst_report(&conn, &ticket.id, &analyst_report) .map_err(|e| format!("set_analyst_report: {}", e))?; } let verdict = parse_verdict(&analyst_report); if verdict == Verdict::NoFix { if is_ticket_cancelled(db, &ticket.id)? { return Ok(true); } let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::update_status(&conn, &ticket.id, "Done") .map_err(|e| format!("update_status: {}", e))?; let _ = app_handle.emit( "ticket-processing-done", serde_json::json!({ "project_id": &project.id, "ticket_id": &ticket.id, "artifact_id": ticket.artifact_id, }), ); notifier::notify_analysis_done(db, app_handle, &project.id, &ticket.id, ticket.artifact_id); return Ok(true); } { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; let current = ProcessedTicket::get_by_id(&conn, &ticket.id) .map_err(|e| format!("get_by_id: {}", e))?; if current.status == "Cancelled" { return Ok(true); } } let worktree_result = worktree_manager::create_worktree(&project.path, &project.base_branch, ticket.artifact_id); if let Err(e) = &worktree_result { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, e, ); } let (wt_path, branch_name) = worktree_result?; { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::set_worktree_info(&conn, &ticket.id, &wt_path, &branch_name) .map_err(|e| format!("set_worktree_info: {}", e))?; Worktree::insert(&conn, &ticket.id, &wt_path, &branch_name) .map_err(|e| format!("insert worktree: {}", e))?; ProcessedTicket::update_status(&conn, &ticket.id, "Developing") .map_err(|e| format!("update_status: {}", e))?; } let _ = app_handle.emit( "ticket-processing-started", serde_json::json!({ "project_id": &project.id, "ticket_id": &ticket.id, "artifact_id": ticket.artifact_id, "step": "developer", }), ); let developer_prompt = append_custom_prompt( build_developer_prompt(&ticket, &project, &analyst_report, &wt_path), &developer_agent.custom_prompt, ); let developer_args = developer_agent.tool.to_non_interactive_args(); let developer_result = run_cli_command( developer_agent.tool.to_command(), &developer_args, &developer_prompt, &wt_path, 600, TicketCliContext { app_handle, ticket_id: &ticket.id, process_registry, }, ) .await; let developer_report = match developer_result { Ok(report) => report, Err(e) => { if is_ticket_cancelled(db, &ticket.id)? { return Ok(true); } record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, &e, ); return Ok(true); } }; if is_ticket_cancelled(db, &ticket.id)? { return Ok(true); } { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::set_developer_report(&conn, &ticket.id, &developer_report) .map_err(|e| format!("set_developer_report: {}", e))?; ProcessedTicket::update_status(&conn, &ticket.id, "Done") .map_err(|e| format!("update_status: {}", e))?; } let _ = app_handle.emit( "ticket-processing-done", serde_json::json!({ "project_id": &project.id, "ticket_id": &ticket.id, "artifact_id": ticket.artifact_id, }), ); notifier::notify_fix_ready(db, app_handle, &project.id, &ticket.id, ticket.artifact_id); Ok(true) } pub fn start(db: Arc>, app_handle: AppHandle, process_registry: ProcessRegistry) { tauri::async_runtime::spawn(async move { let mut tick = interval(Duration::from_secs(10)); loop { tick.tick().await; match process_ticket(&db, &app_handle, &process_registry).await { Ok(true) => { continue; } Ok(false) => {} Err(e) => { eprintln!("orchestrator: {}", e); } } } }); } #[cfg(test)] mod tests { use super::*; #[test] fn test_build_analyst_prompt_contains_ticket_info() { let ticket = ProcessedTicket { id: "t1".into(), tracker_id: Some("tr1".into()), project_id: "p1".into(), source: "tuleap".into(), source_ref: None, artifact_id: 42, artifact_title: "Login crash on empty password".into(), artifact_data: r#"{"id":42,"title":"Login crash"}"#.into(), status: "Pending".into(), analyst_report: None, developer_report: None, worktree_path: None, branch_name: None, detected_at: "2026-01-01T00:00:00Z".into(), processed_at: None, }; let project = Project { id: "p1".into(), name: "MyApp".into(), path: "/home/user/myapp".into(), cloned_from: None, base_branch: "stable".into(), created_at: "2026-01-01T00:00:00Z".into(), }; let prompt = build_analyst_prompt(&ticket, &project); assert!(prompt.contains("42")); assert!(prompt.contains("Login crash on empty password")); assert!(prompt.contains("MyApp")); assert!(prompt.contains("/home/user/myapp")); assert!(prompt.contains("stable")); assert!(prompt.contains("[VERDICT: FIX_NEEDED]")); assert!(prompt.contains("[VERDICT: NO_FIX]")); } #[test] fn test_build_developer_prompt_contains_report() { let ticket = ProcessedTicket { id: "t1".into(), tracker_id: Some("tr1".into()), project_id: "p1".into(), source: "tuleap".into(), source_ref: None, artifact_id: 42, artifact_title: "Login crash".into(), artifact_data: "{}".into(), status: "Developing".into(), analyst_report: None, developer_report: None, worktree_path: None, branch_name: None, detected_at: "2026-01-01T00:00:00Z".into(), processed_at: None, }; let project = Project { id: "p1".into(), name: "MyApp".into(), path: "/home/user/myapp".into(), cloned_from: None, base_branch: "main".into(), created_at: "2026-01-01T00:00:00Z".into(), }; let prompt = build_developer_prompt(&ticket, &project, "## Bug found in auth.rs", "/tmp/wt"); assert!(prompt.contains("## Bug found in auth.rs")); assert!(prompt.contains("42")); assert!(prompt.contains("/tmp/wt")); } #[test] fn test_parse_verdict_fix_needed() { let report = "## Analysis\nBug found.\n[VERDICT: FIX_NEEDED]\n"; assert_eq!(parse_verdict(report), Verdict::FixNeeded); } #[test] fn test_parse_verdict_no_fix() { let report = "## Analysis\nThis is a feature request, not a bug.\n[VERDICT: NO_FIX]\n"; assert_eq!(parse_verdict(report), Verdict::NoFix); } #[test] fn test_parse_verdict_missing_defaults_to_fix() { let report = "## Analysis\nSomething is wrong but I forgot the verdict."; assert_eq!(parse_verdict(report), Verdict::FixNeeded); } #[test] fn test_parse_verdict_embedded_in_line() { let report = "Verdict: [VERDICT: NO_FIX] - no code change needed."; assert_eq!(parse_verdict(report), Verdict::NoFix); } }