From 467aebc0af3bf5a30dfd39dad3fe30c1c0faece4 Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Thu, 16 Apr 2026 16:48:12 +0200 Subject: [PATCH] fix: rendre l'annulation des tickets et taches interruptive closes #2 --- src-tauri/src/commands/orchestrator.rs | 16 ++- src-tauri/src/commands/task.rs | 19 +++- src-tauri/src/lib.rs | 11 +- src-tauri/src/services/agent_runtime.rs | 126 +++++++++++++++++++++ src-tauri/src/services/mod.rs | 1 + src-tauri/src/services/orchestrator.rs | 92 ++++++++++++--- src-tauri/src/services/process_registry.rs | 104 +++++++++++++++++ src-tauri/src/services/task_runner.rs | 30 ++++- 8 files changed, 376 insertions(+), 23 deletions(-) create mode 100644 src-tauri/src/services/process_registry.rs diff --git a/src-tauri/src/commands/orchestrator.rs b/src-tauri/src/commands/orchestrator.rs index da14504..7b32c44 100644 --- a/src-tauri/src/commands/orchestrator.rs +++ b/src-tauri/src/commands/orchestrator.rs @@ -3,6 +3,7 @@ use crate::models::ticket::ProcessedTicket; use crate::models::worktree::Worktree; use crate::AppState; use serde::Serialize; +use tauri::async_runtime; use tauri::State; #[derive(Debug, Clone, Serialize)] @@ -63,9 +64,22 @@ pub fn retry_ticket(state: State<'_, AppState>, ticket_id: String) -> Result<(), #[tauri::command] pub fn cancel_ticket(state: State<'_, AppState>, ticket_id: String) -> Result<(), AppError> { + { + let conn = state.db.lock().map_err(|e| AppError::from(e.to_string()))?; + let ticket = ProcessedTicket::get_by_id(&conn, &ticket_id)?; + + if ticket.status == "Done" || ticket.status == "Cancelled" { + return Err(AppError::from(format!( + "Cannot cancel ticket with status '{}'", + ticket.status + ))); + } + } + + async_runtime::block_on(state.process_registry.cancel_ticket(&ticket_id)); + let conn = state.db.lock().map_err(|e| AppError::from(e.to_string()))?; let ticket = ProcessedTicket::get_by_id(&conn, &ticket_id)?; - if ticket.status == "Done" || ticket.status == "Cancelled" { return Err(AppError::from(format!( "Cannot cancel ticket with status '{}'", diff --git a/src-tauri/src/commands/task.rs b/src-tauri/src/commands/task.rs index e7ea954..198e05e 100644 --- a/src-tauri/src/commands/task.rs +++ b/src-tauri/src/commands/task.rs @@ -4,6 +4,7 @@ use crate::models::agent_task::AgentTask; use crate::models::module::{ProjectModule, MODULE_AGENT_TASK_RUNNER}; use crate::models::project::Project; use crate::AppState; +use tauri::async_runtime; use tauri::State; #[tauri::command] @@ -87,11 +88,27 @@ pub fn retry_agent_task(state: State<'_, AppState>, task_id: String) -> Result<( #[tauri::command] pub fn cancel_agent_task(state: State<'_, AppState>, task_id: String) -> Result<(), AppError> { + { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + let task = AgentTask::get_by_id(&db, &task_id)?; + if task.status == "done" || task.status == "cancelled" { + return Err(AppError::from(format!( + "Impossible d'annuler une tâche avec le statut '{}'", + task.status + ))); + } + } + + async_runtime::block_on(state.process_registry.cancel_task(&task_id)); + let db = state .db .lock() .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; - let task = AgentTask::get_by_id(&db, &task_id)?; if task.status == "done" || task.status == "cancelled" { return Err(AppError::from(format!( diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 21e3184..f55c340 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -11,6 +11,7 @@ pub struct AppState { pub db: Arc>, pub encryption_key: [u8; 32], pub http_client: reqwest::Client, + pub process_registry: services::process_registry::ProcessRegistry, } #[cfg_attr(mobile, tauri::mobile_entry_point)] @@ -29,12 +30,14 @@ pub fn run() { let encryption_key = load_or_generate_key(&key_path)?; let http_client = reqwest::Client::new(); + let process_registry = services::process_registry::ProcessRegistry::default(); let db_arc = Arc::new(Mutex::new(conn)); app.manage(AppState { db: db_arc.clone(), encryption_key, http_client: http_client.clone(), + process_registry: process_registry.clone(), }); // Start background poller @@ -46,10 +49,14 @@ pub fn run() { ); // Start agent orchestrator - services::orchestrator::start(db_arc.clone(), app.handle().clone()); + services::orchestrator::start( + db_arc.clone(), + app.handle().clone(), + process_registry.clone(), + ); // Start agent task runner - services::task_runner::start(db_arc, app.handle().clone()); + services::task_runner::start(db_arc, app.handle().clone(), process_registry); Ok(()) }) diff --git a/src-tauri/src/services/agent_runtime.rs b/src-tauri/src/services/agent_runtime.rs index 63e6903..c2c4d6b 100644 --- a/src-tauri/src/services/agent_runtime.rs +++ b/src-tauri/src/services/agent_runtime.rs @@ -1,5 +1,9 @@ +use crate::services::process_registry::ProcessRegistry; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; +use tokio::sync::Mutex as AsyncMutex; use tokio::time::{timeout, Duration}; fn normalize_process_stderr(stderr: &str) -> String { @@ -60,6 +64,128 @@ pub async fn run_agent_command( Ok(stdout) } +pub async fn run_agent_command_for_task( + command: &str, + args: &[String], + prompt: &str, + working_dir: &str, + timeout_secs: u64, + process_registry: &ProcessRegistry, + task_id: &str, +) -> Result { + let child = Command::new(command) + .args(args) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .current_dir(working_dir) + .spawn() + .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?; + let child = Arc::new(AsyncMutex::new(child)); + let cancellation_requested = Arc::new(AtomicBool::new(false)); + process_registry.register_task(task_id, child.clone(), cancellation_requested.clone()); + + let (stdin, stdout, stderr) = { + let mut child_guard = child.lock().await; + let stdin = child_guard.stdin.take(); + let stdout = child_guard.stdout.take(); + let stderr = child_guard.stderr.take(); + (stdin, stdout, stderr) + }; + + if let Some(mut stdin) = stdin { + stdin.write_all(prompt.as_bytes()).await.map_err(|e| { + process_registry.unregister_task(task_id); + format!("Failed to write prompt to stdin: {}", e) + })?; + } + + let stdout = stdout.ok_or_else(|| { + process_registry.unregister_task(task_id); + "Failed to capture stdout".to_string() + })?; + let stderr = stderr.ok_or_else(|| { + process_registry.unregister_task(task_id); + "Failed to capture stderr".to_string() + })?; + + let stderr_task = tokio::spawn(async move { + let mut stderr_reader = BufReader::new(stderr); + let mut stderr_output = String::new(); + stderr_reader + .read_to_string(&mut stderr_output) + .await + .map_err(|e| format!("Failed to read stderr: {}", e))?; + Ok::(stderr_output) + }); + + let read_future = async { + let mut reader = BufReader::new(stdout).lines(); + let mut output = String::new(); + + while let Ok(Some(line)) = reader.next_line().await { + output.push_str(&line); + output.push('\n'); + } + + let status = { + let mut child_guard = child.lock().await; + child_guard + .wait() + .await + .map_err(|e| format!("Failed to wait for process: {}", e)) + }?; + + let stderr_output = stderr_task + .await + .map_err(|e| format!("Failed to join stderr reader: {}", e))??; + + Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status)) + }; + + let (output, stderr_output, status) = + match timeout(Duration::from_secs(timeout_secs), read_future).await { + Ok(result) => match result { + Ok(values) => values, + Err(e) => { + process_registry.unregister_task(task_id); + return Err(e); + } + }, + Err(_) => { + cancellation_requested.store(true, Ordering::SeqCst); + { + let mut child_guard = child.lock().await; + let _ = child_guard.start_kill(); + } + process_registry.unregister_task(task_id); + return Err(format!("CLI command timed out after {}s", timeout_secs)); + } + }; + + process_registry.unregister_task(task_id); + + if cancellation_requested.load(Ordering::SeqCst) { + return Err("CLI command cancelled".to_string()); + } + + if !status.success() { + let stderr = normalize_process_stderr(&stderr_output); + let code = status.code().unwrap_or(-1); + if stderr.is_empty() { + return Err(format!("CLI command exited with code {}", code)); + } + return Err(format!("CLI command exited with code {}: {}", code, stderr)); + } + + let stdout = output.trim().to_string(); + if stdout.is_empty() { + return Ok("(empty response)".to_string()); + } + + Ok(stdout) +} + pub async fn run_agent_command_streaming( command: &str, args: &[String], diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index ef59180..add191c 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -4,6 +4,7 @@ pub mod filter_engine; pub mod notifier; pub mod orchestrator; pub mod poller; +pub mod process_registry; pub mod task_runner; pub mod tuleap_client; pub mod worktree_manager; diff --git a/src-tauri/src/services/orchestrator.rs b/src-tauri/src/services/orchestrator.rs index 9cfeae0..3734873 100644 --- a/src-tauri/src/services/orchestrator.rs +++ b/src-tauri/src/services/orchestrator.rs @@ -4,12 +4,15 @@ use crate::models::project::Project; use crate::models::ticket::ProcessedTicket; use crate::models::tracker::WatchedTracker; use crate::models::worktree::Worktree; +use crate::services::process_registry::ProcessRegistry; use crate::services::{notifier, worktree_manager}; use rusqlite::Connection; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use tokio::sync::Mutex as AsyncMutex; use tokio::time::{interval, timeout, Duration}; #[derive(Debug, Clone, PartialEq)] @@ -142,8 +145,9 @@ pub async fn run_cli_command( timeout_secs: u64, app_handle: &AppHandle, ticket_id: &str, + process_registry: &ProcessRegistry, ) -> Result { - let mut child = Command::new(command) + let child = Command::new(command) .args(args) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) @@ -151,16 +155,29 @@ pub async fn run_cli_command( .current_dir(working_dir) .spawn() .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?; + let child = Arc::new(AsyncMutex::new(child)); + let cancellation_requested = Arc::new(AtomicBool::new(false)); + process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone()); - if let Some(mut stdin) = child.stdin.take() { + let (stdin, stdout) = { + let mut child_guard = child.lock().await; + let stdin = child_guard.stdin.take(); + let stdout = child_guard.stdout.take(); + (stdin, stdout) + }; + + if let Some(mut stdin) = stdin { use tokio::io::AsyncWriteExt; - stdin - .write_all(prompt.as_bytes()) - .await - .map_err(|e| format!("Failed to write to stdin: {}", e))?; + stdin.write_all(prompt.as_bytes()).await.map_err(|e| { + process_registry.unregister_ticket(ticket_id); + format!("Failed to write to stdin: {}", e) + })?; } - let stdout = child.stdout.take().ok_or("Failed to capture stdout")?; + let stdout = stdout.ok_or_else(|| { + process_registry.unregister_ticket(ticket_id); + "Failed to capture stdout".to_string() + })?; let mut reader = BufReader::new(stdout).lines(); let mut output = String::new(); @@ -179,14 +196,29 @@ pub async fn run_cli_command( output }; - let result = timeout(Duration::from_secs(timeout_secs), read_future) - .await - .map_err(|_| format!("CLI command timed out after {}s", timeout_secs))?; + let result = match timeout(Duration::from_secs(timeout_secs), read_future).await { + Ok(output) => output, + Err(_) => { + cancellation_requested.store(true, Ordering::SeqCst); + { + let mut child_guard = child.lock().await; + let _ = child_guard.start_kill(); + } + process_registry.unregister_ticket(ticket_id); + return Err(format!("CLI command timed out after {}s", timeout_secs)); + } + }; - let status = child - .wait() - .await - .map_err(|e| format!("Failed to wait for process: {}", e))?; + let wait_result = { + let mut child_guard = child.lock().await; + child_guard.wait().await + }; + process_registry.unregister_ticket(ticket_id); + let status = wait_result.map_err(|e| format!("Failed to wait for process: {}", e))?; + + if cancellation_requested.load(Ordering::SeqCst) { + return Err("CLI command cancelled".to_string()); + } if !status.success() { let code = status.code().unwrap_or(-1); @@ -196,9 +228,17 @@ pub async fn run_cli_command( Ok(result) } +fn is_ticket_cancelled(db: &Arc>, ticket_id: &str) -> Result { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + let current = + ProcessedTicket::get_by_id(&conn, ticket_id).map_err(|e| format!("get_by_id: {}", e))?; + Ok(current.status == "Cancelled") +} + async fn process_ticket( db: &Arc>, app_handle: &AppHandle, + process_registry: &ProcessRegistry, ) -> Result { let (ticket, tracker, project) = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; @@ -361,12 +401,16 @@ async fn process_ticket( 600, app_handle, &ticket.id, + process_registry, ) .await; let analyst_report = match analyst_result { Ok(report) => report, Err(e) => { + if is_ticket_cancelled(db, &ticket.id)? { + return Ok(true); + } record_ticket_error( db, app_handle, @@ -379,6 +423,10 @@ async fn process_ticket( } }; + if is_ticket_cancelled(db, &ticket.id)? { + return Ok(true); + } + { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::set_analyst_report(&conn, &ticket.id, &analyst_report) @@ -387,6 +435,10 @@ async fn process_ticket( let verdict = parse_verdict(&analyst_report); if verdict == Verdict::NoFix { + if is_ticket_cancelled(db, &ticket.id)? { + return Ok(true); + } + let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::update_status(&conn, &ticket.id, "Done") .map_err(|e| format!("update_status: {}", e))?; @@ -460,12 +512,16 @@ async fn process_ticket( 600, app_handle, &ticket.id, + process_registry, ) .await; let developer_report = match developer_result { Ok(report) => report, Err(e) => { + if is_ticket_cancelled(db, &ticket.id)? { + return Ok(true); + } record_ticket_error( db, app_handle, @@ -478,6 +534,10 @@ async fn process_ticket( } }; + if is_ticket_cancelled(db, &ticket.id)? { + return Ok(true); + } + { let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?; ProcessedTicket::set_developer_report(&conn, &ticket.id, &developer_report) @@ -499,12 +559,12 @@ async fn process_ticket( Ok(true) } -pub fn start(db: Arc>, app_handle: AppHandle) { +pub fn start(db: Arc>, app_handle: AppHandle, process_registry: ProcessRegistry) { tauri::async_runtime::spawn(async move { let mut tick = interval(Duration::from_secs(10)); loop { tick.tick().await; - match process_ticket(&db, &app_handle).await { + match process_ticket(&db, &app_handle, &process_registry).await { Ok(true) => { continue; } diff --git a/src-tauri/src/services/process_registry.rs b/src-tauri/src/services/process_registry.rs new file mode 100644 index 0000000..ef74c03 --- /dev/null +++ b/src-tauri/src/services/process_registry.rs @@ -0,0 +1,104 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use tokio::process::Child; +use tokio::sync::Mutex as AsyncMutex; + +#[derive(Clone)] +struct ActiveProcess { + child: Arc>, + cancellation_requested: Arc, +} + +impl ActiveProcess { + fn new(child: Arc>, cancellation_requested: Arc) -> Self { + Self { + child, + cancellation_requested, + } + } +} + +#[derive(Clone, Default)] +pub struct ProcessRegistry { + ticket_processes: Arc>>, + task_processes: Arc>>, +} + +impl ProcessRegistry { + pub fn register_ticket( + &self, + ticket_id: &str, + child: Arc>, + cancellation_requested: Arc, + ) { + let mut processes = self + .ticket_processes + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + processes.insert( + ticket_id.to_string(), + ActiveProcess::new(child, cancellation_requested), + ); + } + + pub fn unregister_ticket(&self, ticket_id: &str) { + let mut processes = self + .ticket_processes + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + processes.remove(ticket_id); + } + + pub async fn cancel_ticket(&self, ticket_id: &str) -> bool { + self.cancel_process(&self.ticket_processes, ticket_id).await + } + + pub fn register_task( + &self, + task_id: &str, + child: Arc>, + cancellation_requested: Arc, + ) { + let mut processes = self + .task_processes + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + processes.insert( + task_id.to_string(), + ActiveProcess::new(child, cancellation_requested), + ); + } + + pub fn unregister_task(&self, task_id: &str) { + let mut processes = self + .task_processes + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + processes.remove(task_id); + } + + pub async fn cancel_task(&self, task_id: &str) -> bool { + self.cancel_process(&self.task_processes, task_id).await + } + + async fn cancel_process( + &self, + map: &Arc>>, + entity_id: &str, + ) -> bool { + let process = { + let processes = map.lock().unwrap_or_else(|poison| poison.into_inner()); + processes.get(entity_id).cloned() + }; + + let Some(process) = process else { + return false; + }; + + process.cancellation_requested.store(true, Ordering::SeqCst); + let mut child = process.child.lock().await; + let _ = child.start_kill(); + true + } +} diff --git a/src-tauri/src/services/task_runner.rs b/src-tauri/src/services/task_runner.rs index 84f8e1c..85a3f15 100644 --- a/src-tauri/src/services/task_runner.rs +++ b/src-tauri/src/services/task_runner.rs @@ -3,6 +3,7 @@ use crate::models::agent_task::AgentTask; use crate::models::module::{ProjectModule, MODULE_AGENT_TASK_RUNNER}; use crate::models::project::Project; use crate::services::agent_runtime; +use crate::services::process_registry::ProcessRegistry; use rusqlite::Connection; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter}; @@ -60,6 +61,7 @@ fn emit_status( async fn process_next_task( db: &Arc>, app_handle: &AppHandle, + process_registry: &ProcessRegistry, ) -> Result { let next_task = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; @@ -102,23 +104,45 @@ async fn process_next_task( let prompt = build_task_prompt(&task, &project); let args = agent.tool.to_non_interactive_args(); - let result = agent_runtime::run_agent_command( + let result = agent_runtime::run_agent_command_for_task( agent.tool.to_command(), &args, &prompt, &project.path, 900, + process_registry, + &task.id, ) .await; match result { Ok(report) => { + let current = { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + AgentTask::get_by_id(&conn, &task.id) + .map_err(|e| format!("task lookup failed: {}", e))? + }; + if current.status == "cancelled" { + emit_status(app_handle, &task.project_id, &task.id, "cancelled", None); + return Ok(true); + } + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; AgentTask::mark_done(&conn, &task.id, &report) .map_err(|e| format!("mark task done failed: {}", e))?; emit_status(app_handle, &task.project_id, &task.id, "done", None); } Err(error) => { + let current = { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + AgentTask::get_by_id(&conn, &task.id) + .map_err(|e| format!("task lookup failed: {}", e))? + }; + if current.status == "cancelled" { + emit_status(app_handle, &task.project_id, &task.id, "cancelled", None); + return Ok(true); + } + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; AgentTask::mark_error(&conn, &task.id, &error) .map_err(|e| format!("mark task error failed: {}", e))?; @@ -135,12 +159,12 @@ async fn process_next_task( Ok(true) } -pub fn start(db: Arc>, app_handle: AppHandle) { +pub fn start(db: Arc>, app_handle: AppHandle, process_registry: ProcessRegistry) { tauri::async_runtime::spawn(async move { let mut tick = interval(Duration::from_secs(8)); loop { tick.tick().await; - match process_next_task(&db, &app_handle).await { + match process_next_task(&db, &app_handle, &process_registry).await { Ok(true) => continue, Ok(false) => {} Err(e) => {