fix: rendre l'annulation des tickets et taches interruptive

closes #2
This commit is contained in:
thibaud-lclr 2026-04-16 16:48:12 +02:00
parent 906c44ef22
commit 467aebc0af
8 changed files with 376 additions and 23 deletions

View file

@ -3,6 +3,7 @@ use crate::models::ticket::ProcessedTicket;
use crate::models::worktree::Worktree;
use crate::AppState;
use serde::Serialize;
use tauri::async_runtime;
use tauri::State;
#[derive(Debug, Clone, Serialize)]
@ -63,9 +64,22 @@ pub fn retry_ticket(state: State<'_, AppState>, ticket_id: String) -> Result<(),
#[tauri::command]
pub fn cancel_ticket(state: State<'_, AppState>, ticket_id: String) -> Result<(), AppError> {
{
let conn = state.db.lock().map_err(|e| AppError::from(e.to_string()))?;
let ticket = ProcessedTicket::get_by_id(&conn, &ticket_id)?;
if ticket.status == "Done" || ticket.status == "Cancelled" {
return Err(AppError::from(format!(
"Cannot cancel ticket with status '{}'",
ticket.status
)));
}
}
async_runtime::block_on(state.process_registry.cancel_ticket(&ticket_id));
let conn = state.db.lock().map_err(|e| AppError::from(e.to_string()))?;
let ticket = ProcessedTicket::get_by_id(&conn, &ticket_id)?;
if ticket.status == "Done" || ticket.status == "Cancelled" {
return Err(AppError::from(format!(
"Cannot cancel ticket with status '{}'",

View file

@ -4,6 +4,7 @@ use crate::models::agent_task::AgentTask;
use crate::models::module::{ProjectModule, MODULE_AGENT_TASK_RUNNER};
use crate::models::project::Project;
use crate::AppState;
use tauri::async_runtime;
use tauri::State;
#[tauri::command]
@ -87,11 +88,27 @@ pub fn retry_agent_task(state: State<'_, AppState>, task_id: String) -> Result<(
#[tauri::command]
pub fn cancel_agent_task(state: State<'_, AppState>, task_id: String) -> Result<(), AppError> {
{
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
let task = AgentTask::get_by_id(&db, &task_id)?;
if task.status == "done" || task.status == "cancelled" {
return Err(AppError::from(format!(
"Impossible d'annuler une tâche avec le statut '{}'",
task.status
)));
}
}
async_runtime::block_on(state.process_registry.cancel_task(&task_id));
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
let task = AgentTask::get_by_id(&db, &task_id)?;
if task.status == "done" || task.status == "cancelled" {
return Err(AppError::from(format!(

View file

@ -11,6 +11,7 @@ pub struct AppState {
pub db: Arc<Mutex<rusqlite::Connection>>,
pub encryption_key: [u8; 32],
pub http_client: reqwest::Client,
pub process_registry: services::process_registry::ProcessRegistry,
}
#[cfg_attr(mobile, tauri::mobile_entry_point)]
@ -29,12 +30,14 @@ pub fn run() {
let encryption_key = load_or_generate_key(&key_path)?;
let http_client = reqwest::Client::new();
let process_registry = services::process_registry::ProcessRegistry::default();
let db_arc = Arc::new(Mutex::new(conn));
app.manage(AppState {
db: db_arc.clone(),
encryption_key,
http_client: http_client.clone(),
process_registry: process_registry.clone(),
});
// Start background poller
@ -46,10 +49,14 @@ pub fn run() {
);
// Start agent orchestrator
services::orchestrator::start(db_arc.clone(), app.handle().clone());
services::orchestrator::start(
db_arc.clone(),
app.handle().clone(),
process_registry.clone(),
);
// Start agent task runner
services::task_runner::start(db_arc, app.handle().clone());
services::task_runner::start(db_arc, app.handle().clone(), process_registry);
Ok(())
})

View file

@ -1,5 +1,9 @@
use crate::services::process_registry::ProcessRegistry;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::{timeout, Duration};
fn normalize_process_stderr(stderr: &str) -> String {
@ -60,6 +64,128 @@ pub async fn run_agent_command(
Ok(stdout)
}
pub async fn run_agent_command_for_task(
command: &str,
args: &[String],
prompt: &str,
working_dir: &str,
timeout_secs: u64,
process_registry: &ProcessRegistry,
task_id: &str,
) -> Result<String, String> {
let child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(working_dir)
.spawn()
.map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
let child = Arc::new(AsyncMutex::new(child));
let cancellation_requested = Arc::new(AtomicBool::new(false));
process_registry.register_task(task_id, child.clone(), cancellation_requested.clone());
let (stdin, stdout, stderr) = {
let mut child_guard = child.lock().await;
let stdin = child_guard.stdin.take();
let stdout = child_guard.stdout.take();
let stderr = child_guard.stderr.take();
(stdin, stdout, stderr)
};
if let Some(mut stdin) = stdin {
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
process_registry.unregister_task(task_id);
format!("Failed to write prompt to stdin: {}", e)
})?;
}
let stdout = stdout.ok_or_else(|| {
process_registry.unregister_task(task_id);
"Failed to capture stdout".to_string()
})?;
let stderr = stderr.ok_or_else(|| {
process_registry.unregister_task(task_id);
"Failed to capture stderr".to_string()
})?;
let stderr_task = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr);
let mut stderr_output = String::new();
stderr_reader
.read_to_string(&mut stderr_output)
.await
.map_err(|e| format!("Failed to read stderr: {}", e))?;
Ok::<String, String>(stderr_output)
});
let read_future = async {
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
while let Ok(Some(line)) = reader.next_line().await {
output.push_str(&line);
output.push('\n');
}
let status = {
let mut child_guard = child.lock().await;
child_guard
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))
}?;
let stderr_output = stderr_task
.await
.map_err(|e| format!("Failed to join stderr reader: {}", e))??;
Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status))
};
let (output, stderr_output, status) =
match timeout(Duration::from_secs(timeout_secs), read_future).await {
Ok(result) => match result {
Ok(values) => values,
Err(e) => {
process_registry.unregister_task(task_id);
return Err(e);
}
},
Err(_) => {
cancellation_requested.store(true, Ordering::SeqCst);
{
let mut child_guard = child.lock().await;
let _ = child_guard.start_kill();
}
process_registry.unregister_task(task_id);
return Err(format!("CLI command timed out after {}s", timeout_secs));
}
};
process_registry.unregister_task(task_id);
if cancellation_requested.load(Ordering::SeqCst) {
return Err("CLI command cancelled".to_string());
}
if !status.success() {
let stderr = normalize_process_stderr(&stderr_output);
let code = status.code().unwrap_or(-1);
if stderr.is_empty() {
return Err(format!("CLI command exited with code {}", code));
}
return Err(format!("CLI command exited with code {}: {}", code, stderr));
}
let stdout = output.trim().to_string();
if stdout.is_empty() {
return Ok("(empty response)".to_string());
}
Ok(stdout)
}
pub async fn run_agent_command_streaming<F>(
command: &str,
args: &[String],

View file

@ -4,6 +4,7 @@ pub mod filter_engine;
pub mod notifier;
pub mod orchestrator;
pub mod poller;
pub mod process_registry;
pub mod task_runner;
pub mod tuleap_client;
pub mod worktree_manager;

View file

@ -4,12 +4,15 @@ use crate::models::project::Project;
use crate::models::ticket::ProcessedTicket;
use crate::models::tracker::WatchedTracker;
use crate::models::worktree::Worktree;
use crate::services::process_registry::ProcessRegistry;
use crate::services::{notifier, worktree_manager};
use rusqlite::Connection;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::{interval, timeout, Duration};
#[derive(Debug, Clone, PartialEq)]
@ -142,8 +145,9 @@ pub async fn run_cli_command(
timeout_secs: u64,
app_handle: &AppHandle,
ticket_id: &str,
process_registry: &ProcessRegistry,
) -> Result<String, String> {
let mut child = Command::new(command)
let child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
@ -151,16 +155,29 @@ pub async fn run_cli_command(
.current_dir(working_dir)
.spawn()
.map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
let child = Arc::new(AsyncMutex::new(child));
let cancellation_requested = Arc::new(AtomicBool::new(false));
process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone());
if let Some(mut stdin) = child.stdin.take() {
let (stdin, stdout) = {
let mut child_guard = child.lock().await;
let stdin = child_guard.stdin.take();
let stdout = child_guard.stdout.take();
(stdin, stdout)
};
if let Some(mut stdin) = stdin {
use tokio::io::AsyncWriteExt;
stdin
.write_all(prompt.as_bytes())
.await
.map_err(|e| format!("Failed to write to stdin: {}", e))?;
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
process_registry.unregister_ticket(ticket_id);
format!("Failed to write to stdin: {}", e)
})?;
}
let stdout = child.stdout.take().ok_or("Failed to capture stdout")?;
let stdout = stdout.ok_or_else(|| {
process_registry.unregister_ticket(ticket_id);
"Failed to capture stdout".to_string()
})?;
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
@ -179,14 +196,29 @@ pub async fn run_cli_command(
output
};
let result = timeout(Duration::from_secs(timeout_secs), read_future)
.await
.map_err(|_| format!("CLI command timed out after {}s", timeout_secs))?;
let result = match timeout(Duration::from_secs(timeout_secs), read_future).await {
Ok(output) => output,
Err(_) => {
cancellation_requested.store(true, Ordering::SeqCst);
{
let mut child_guard = child.lock().await;
let _ = child_guard.start_kill();
}
process_registry.unregister_ticket(ticket_id);
return Err(format!("CLI command timed out after {}s", timeout_secs));
}
};
let status = child
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))?;
let wait_result = {
let mut child_guard = child.lock().await;
child_guard.wait().await
};
process_registry.unregister_ticket(ticket_id);
let status = wait_result.map_err(|e| format!("Failed to wait for process: {}", e))?;
if cancellation_requested.load(Ordering::SeqCst) {
return Err("CLI command cancelled".to_string());
}
if !status.success() {
let code = status.code().unwrap_or(-1);
@ -196,9 +228,17 @@ pub async fn run_cli_command(
Ok(result)
}
fn is_ticket_cancelled(db: &Arc<Mutex<Connection>>, ticket_id: &str) -> Result<bool, String> {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let current =
ProcessedTicket::get_by_id(&conn, ticket_id).map_err(|e| format!("get_by_id: {}", e))?;
Ok(current.status == "Cancelled")
}
async fn process_ticket(
db: &Arc<Mutex<Connection>>,
app_handle: &AppHandle,
process_registry: &ProcessRegistry,
) -> Result<bool, String> {
let (ticket, tracker, project) = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
@ -361,12 +401,16 @@ async fn process_ticket(
600,
app_handle,
&ticket.id,
process_registry,
)
.await;
let analyst_report = match analyst_result {
Ok(report) => report,
Err(e) => {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
record_ticket_error(
db,
app_handle,
@ -379,6 +423,10 @@ async fn process_ticket(
}
};
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_analyst_report(&conn, &ticket.id, &analyst_report)
@ -387,6 +435,10 @@ async fn process_ticket(
let verdict = parse_verdict(&analyst_report);
if verdict == Verdict::NoFix {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Done")
.map_err(|e| format!("update_status: {}", e))?;
@ -460,12 +512,16 @@ async fn process_ticket(
600,
app_handle,
&ticket.id,
process_registry,
)
.await;
let developer_report = match developer_result {
Ok(report) => report,
Err(e) => {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
record_ticket_error(
db,
app_handle,
@ -478,6 +534,10 @@ async fn process_ticket(
}
};
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_developer_report(&conn, &ticket.id, &developer_report)
@ -499,12 +559,12 @@ async fn process_ticket(
Ok(true)
}
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle) {
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle, process_registry: ProcessRegistry) {
tauri::async_runtime::spawn(async move {
let mut tick = interval(Duration::from_secs(10));
loop {
tick.tick().await;
match process_ticket(&db, &app_handle).await {
match process_ticket(&db, &app_handle, &process_registry).await {
Ok(true) => {
continue;
}

View file

@ -0,0 +1,104 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::process::Child;
use tokio::sync::Mutex as AsyncMutex;
#[derive(Clone)]
struct ActiveProcess {
child: Arc<AsyncMutex<Child>>,
cancellation_requested: Arc<AtomicBool>,
}
impl ActiveProcess {
fn new(child: Arc<AsyncMutex<Child>>, cancellation_requested: Arc<AtomicBool>) -> Self {
Self {
child,
cancellation_requested,
}
}
}
#[derive(Clone, Default)]
pub struct ProcessRegistry {
ticket_processes: Arc<Mutex<HashMap<String, ActiveProcess>>>,
task_processes: Arc<Mutex<HashMap<String, ActiveProcess>>>,
}
impl ProcessRegistry {
pub fn register_ticket(
&self,
ticket_id: &str,
child: Arc<AsyncMutex<Child>>,
cancellation_requested: Arc<AtomicBool>,
) {
let mut processes = self
.ticket_processes
.lock()
.unwrap_or_else(|poison| poison.into_inner());
processes.insert(
ticket_id.to_string(),
ActiveProcess::new(child, cancellation_requested),
);
}
pub fn unregister_ticket(&self, ticket_id: &str) {
let mut processes = self
.ticket_processes
.lock()
.unwrap_or_else(|poison| poison.into_inner());
processes.remove(ticket_id);
}
pub async fn cancel_ticket(&self, ticket_id: &str) -> bool {
self.cancel_process(&self.ticket_processes, ticket_id).await
}
pub fn register_task(
&self,
task_id: &str,
child: Arc<AsyncMutex<Child>>,
cancellation_requested: Arc<AtomicBool>,
) {
let mut processes = self
.task_processes
.lock()
.unwrap_or_else(|poison| poison.into_inner());
processes.insert(
task_id.to_string(),
ActiveProcess::new(child, cancellation_requested),
);
}
pub fn unregister_task(&self, task_id: &str) {
let mut processes = self
.task_processes
.lock()
.unwrap_or_else(|poison| poison.into_inner());
processes.remove(task_id);
}
pub async fn cancel_task(&self, task_id: &str) -> bool {
self.cancel_process(&self.task_processes, task_id).await
}
async fn cancel_process(
&self,
map: &Arc<Mutex<HashMap<String, ActiveProcess>>>,
entity_id: &str,
) -> bool {
let process = {
let processes = map.lock().unwrap_or_else(|poison| poison.into_inner());
processes.get(entity_id).cloned()
};
let Some(process) = process else {
return false;
};
process.cancellation_requested.store(true, Ordering::SeqCst);
let mut child = process.child.lock().await;
let _ = child.start_kill();
true
}
}

View file

@ -3,6 +3,7 @@ use crate::models::agent_task::AgentTask;
use crate::models::module::{ProjectModule, MODULE_AGENT_TASK_RUNNER};
use crate::models::project::Project;
use crate::services::agent_runtime;
use crate::services::process_registry::ProcessRegistry;
use rusqlite::Connection;
use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter};
@ -60,6 +61,7 @@ fn emit_status(
async fn process_next_task(
db: &Arc<Mutex<Connection>>,
app_handle: &AppHandle,
process_registry: &ProcessRegistry,
) -> Result<bool, String> {
let next_task = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
@ -102,23 +104,45 @@ async fn process_next_task(
let prompt = build_task_prompt(&task, &project);
let args = agent.tool.to_non_interactive_args();
let result = agent_runtime::run_agent_command(
let result = agent_runtime::run_agent_command_for_task(
agent.tool.to_command(),
&args,
&prompt,
&project.path,
900,
process_registry,
&task.id,
)
.await;
match result {
Ok(report) => {
let current = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::get_by_id(&conn, &task.id)
.map_err(|e| format!("task lookup failed: {}", e))?
};
if current.status == "cancelled" {
emit_status(app_handle, &task.project_id, &task.id, "cancelled", None);
return Ok(true);
}
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::mark_done(&conn, &task.id, &report)
.map_err(|e| format!("mark task done failed: {}", e))?;
emit_status(app_handle, &task.project_id, &task.id, "done", None);
}
Err(error) => {
let current = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::get_by_id(&conn, &task.id)
.map_err(|e| format!("task lookup failed: {}", e))?
};
if current.status == "cancelled" {
emit_status(app_handle, &task.project_id, &task.id, "cancelled", None);
return Ok(true);
}
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::mark_error(&conn, &task.id, &error)
.map_err(|e| format!("mark task error failed: {}", e))?;
@ -135,12 +159,12 @@ async fn process_next_task(
Ok(true)
}
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle) {
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle, process_registry: ProcessRegistry) {
tauri::async_runtime::spawn(async move {
let mut tick = interval(Duration::from_secs(8));
loop {
tick.tick().await;
match process_next_task(&db, &app_handle).await {
match process_next_task(&db, &app_handle, &process_registry).await {
Ok(true) => continue,
Ok(false) => {}
Err(e) => {