From 8d0b3457519657aab64239f3047b4503be64fc1c Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Thu, 16 Apr 2026 17:01:06 +0200 Subject: [PATCH] fix(orchestrator): capture stderr for cli execution closes #3 --- src-tauri/src/services/agent_runtime.rs | 36 +---------- src-tauri/src/services/cli_process.rs | 84 +++++++++++++++++++++++++ src-tauri/src/services/mod.rs | 1 + src-tauri/src/services/orchestrator.rs | 78 ++++++++++++----------- 4 files changed, 129 insertions(+), 70 deletions(-) create mode 100644 src-tauri/src/services/cli_process.rs diff --git a/src-tauri/src/services/agent_runtime.rs b/src-tauri/src/services/agent_runtime.rs index c2c4d6b..4aee7de 100644 --- a/src-tauri/src/services/agent_runtime.rs +++ b/src-tauri/src/services/agent_runtime.rs @@ -1,3 +1,4 @@ +use crate::services::cli_process; use crate::services::process_registry::ProcessRegistry; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -109,39 +110,8 @@ pub async fn run_agent_command_for_task( "Failed to capture stderr".to_string() })?; - let stderr_task = tokio::spawn(async move { - let mut stderr_reader = BufReader::new(stderr); - let mut stderr_output = String::new(); - stderr_reader - .read_to_string(&mut stderr_output) - .await - .map_err(|e| format!("Failed to read stderr: {}", e))?; - Ok::(stderr_output) - }); - - let read_future = async { - let mut reader = BufReader::new(stdout).lines(); - let mut output = String::new(); - - while let Ok(Some(line)) = reader.next_line().await { - output.push_str(&line); - output.push('\n'); - } - - let status = { - let mut child_guard = child.lock().await; - child_guard - .wait() - .await - .map_err(|e| format!("Failed to wait for process: {}", e)) - }?; - - let stderr_output = stderr_task - .await - .map_err(|e| format!("Failed to join stderr reader: {}", e))??; - - Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status)) - }; + let read_future = + cli_process::collect_process_output(child.clone(), stdout, stderr, |_| Ok(())); let (output, stderr_output, status) = match timeout(Duration::from_secs(timeout_secs), read_future).await { diff --git a/src-tauri/src/services/cli_process.rs b/src-tauri/src/services/cli_process.rs new file mode 100644 index 0000000..e363308 --- /dev/null +++ b/src-tauri/src/services/cli_process.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; +use tokio::process::{Child, ChildStderr, ChildStdout}; +use tokio::sync::Mutex as AsyncMutex; + +pub async fn collect_process_output( + child: Arc>, + stdout: ChildStdout, + stderr: ChildStderr, + mut on_stdout_line: F, +) -> Result<(String, String, std::process::ExitStatus), String> +where + F: FnMut(&str) -> Result<(), String>, +{ + let stderr_task = tokio::spawn(async move { + let mut stderr_reader = BufReader::new(stderr); + let mut stderr_output = String::new(); + stderr_reader + .read_to_string(&mut stderr_output) + .await + .map_err(|e| format!("Failed to read stderr: {}", e))?; + Ok::(stderr_output) + }); + + let mut reader = BufReader::new(stdout).lines(); + let mut output = String::new(); + while let Ok(Some(line)) = reader.next_line().await { + on_stdout_line(&line)?; + output.push_str(&line); + output.push('\n'); + } + + let status = { + let mut child_guard = child.lock().await; + child_guard + .wait() + .await + .map_err(|e| format!("Failed to wait for process: {}", e))? + }; + + let stderr_output = stderr_task + .await + .map_err(|e| format!("Failed to join stderr reader: {}", e))??; + + Ok((output, stderr_output, status)) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::process::Stdio; + use tokio::process::Command; + + #[tokio::test] + async fn test_collect_process_output_reads_stdout_and_stderr() { + let mut child = Command::new("sh") + .args([ + "-c", + "printf 'out-1\\n'; printf 'err-1\\n' >&2; printf 'out-2\\n'; printf 'err-2\\n' >&2", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("process should spawn"); + + let stdout = child.stdout.take().expect("stdout must be captured"); + let stderr = child.stderr.take().expect("stderr must be captured"); + let child = Arc::new(AsyncMutex::new(child)); + + let mut lines = Vec::new(); + let (stdout_output, stderr_output, status) = + collect_process_output(child, stdout, stderr, |line| { + lines.push(line.to_string()); + Ok(()) + }) + .await + .expect("collection should succeed"); + + assert!(status.success()); + assert_eq!(lines, vec!["out-1".to_string(), "out-2".to_string()]); + assert_eq!(stdout_output, "out-1\nout-2\n"); + assert_eq!(stderr_output, "err-1\nerr-2\n"); + } +} diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index add191c..dc26457 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -1,4 +1,5 @@ pub mod agent_runtime; +pub mod cli_process; pub mod crypto; pub mod filter_engine; pub mod notifier; diff --git a/src-tauri/src/services/orchestrator.rs b/src-tauri/src/services/orchestrator.rs index 3734873..fcd580e 100644 --- a/src-tauri/src/services/orchestrator.rs +++ b/src-tauri/src/services/orchestrator.rs @@ -5,12 +5,11 @@ use crate::models::ticket::ProcessedTicket; use crate::models::tracker::WatchedTracker; use crate::models::worktree::Worktree; use crate::services::process_registry::ProcessRegistry; -use crate::services::{notifier, worktree_manager}; +use crate::services::{cli_process, notifier, worktree_manager}; use rusqlite::Connection; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter}; -use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::sync::Mutex as AsyncMutex; use tokio::time::{interval, timeout, Duration}; @@ -159,11 +158,12 @@ pub async fn run_cli_command( let cancellation_requested = Arc::new(AtomicBool::new(false)); process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone()); - let (stdin, stdout) = { + let (stdin, stdout, stderr) = { let mut child_guard = child.lock().await; let stdin = child_guard.stdin.take(); let stdout = child_guard.stdout.take(); - (stdin, stdout) + let stderr = child_guard.stderr.take(); + (stdin, stdout, stderr) }; if let Some(mut stdin) = stdin { @@ -178,51 +178,55 @@ pub async fn run_cli_command( process_registry.unregister_ticket(ticket_id); "Failed to capture stdout".to_string() })?; - let mut reader = BufReader::new(stdout).lines(); - let mut output = String::new(); + let stderr = stderr.ok_or_else(|| { + process_registry.unregister_ticket(ticket_id); + "Failed to capture stderr".to_string() + })?; - let read_future = async { - while let Ok(Some(line)) = reader.next_line().await { - let _ = app_handle.emit( - "ticket-processing-progress", - serde_json::json!({ - "ticket_id": ticket_id, - "output_chunk": line, - }), - ); - output.push_str(&line); - output.push('\n'); - } - output - }; + let read_future = cli_process::collect_process_output(child.clone(), stdout, stderr, |line| { + let _ = app_handle.emit( + "ticket-processing-progress", + serde_json::json!({ + "ticket_id": ticket_id, + "output_chunk": line, + }), + ); + Ok(()) + }); - let result = match timeout(Duration::from_secs(timeout_secs), read_future).await { - Ok(output) => output, - Err(_) => { - cancellation_requested.store(true, Ordering::SeqCst); - { - let mut child_guard = child.lock().await; - let _ = child_guard.start_kill(); + let (result, stderr_output, status) = + match timeout(Duration::from_secs(timeout_secs), read_future).await { + Ok(result) => match result { + Ok(values) => values, + Err(e) => { + process_registry.unregister_ticket(ticket_id); + return Err(e); + } + }, + Err(_) => { + cancellation_requested.store(true, Ordering::SeqCst); + { + let mut child_guard = child.lock().await; + let _ = child_guard.start_kill(); + } + process_registry.unregister_ticket(ticket_id); + return Err(format!("CLI command timed out after {}s", timeout_secs)); } - process_registry.unregister_ticket(ticket_id); - return Err(format!("CLI command timed out after {}s", timeout_secs)); - } - }; + }; - let wait_result = { - let mut child_guard = child.lock().await; - child_guard.wait().await - }; process_registry.unregister_ticket(ticket_id); - let status = wait_result.map_err(|e| format!("Failed to wait for process: {}", e))?; if cancellation_requested.load(Ordering::SeqCst) { return Err("CLI command cancelled".to_string()); } if !status.success() { + let stderr = stderr_output.trim(); let code = status.code().unwrap_or(-1); - return Err(format!("CLI command exited with code {}", code)); + if stderr.is_empty() { + return Err(format!("CLI command exited with code {}", code)); + } + return Err(format!("CLI command exited with code {}: {}", code, stderr)); } Ok(result)