fix(orchestrator): capture stderr for cli execution

closes #3
This commit is contained in:
thibaud-lclr 2026-04-16 17:01:06 +02:00
parent 467aebc0af
commit 8d0b345751
4 changed files with 129 additions and 70 deletions

View file

@ -1,3 +1,4 @@
use crate::services::cli_process;
use crate::services::process_registry::ProcessRegistry;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -109,39 +110,8 @@ pub async fn run_agent_command_for_task(
"Failed to capture stderr".to_string()
})?;
let stderr_task = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr);
let mut stderr_output = String::new();
stderr_reader
.read_to_string(&mut stderr_output)
.await
.map_err(|e| format!("Failed to read stderr: {}", e))?;
Ok::<String, String>(stderr_output)
});
let read_future = async {
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
while let Ok(Some(line)) = reader.next_line().await {
output.push_str(&line);
output.push('\n');
}
let status = {
let mut child_guard = child.lock().await;
child_guard
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))
}?;
let stderr_output = stderr_task
.await
.map_err(|e| format!("Failed to join stderr reader: {}", e))??;
Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status))
};
let read_future =
cli_process::collect_process_output(child.clone(), stdout, stderr, |_| Ok(()));
let (output, stderr_output, status) =
match timeout(Duration::from_secs(timeout_secs), read_future).await {

View file

@ -0,0 +1,84 @@
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::process::{Child, ChildStderr, ChildStdout};
use tokio::sync::Mutex as AsyncMutex;
pub async fn collect_process_output<F>(
child: Arc<AsyncMutex<Child>>,
stdout: ChildStdout,
stderr: ChildStderr,
mut on_stdout_line: F,
) -> Result<(String, String, std::process::ExitStatus), String>
where
F: FnMut(&str) -> Result<(), String>,
{
let stderr_task = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr);
let mut stderr_output = String::new();
stderr_reader
.read_to_string(&mut stderr_output)
.await
.map_err(|e| format!("Failed to read stderr: {}", e))?;
Ok::<String, String>(stderr_output)
});
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
while let Ok(Some(line)) = reader.next_line().await {
on_stdout_line(&line)?;
output.push_str(&line);
output.push('\n');
}
let status = {
let mut child_guard = child.lock().await;
child_guard
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))?
};
let stderr_output = stderr_task
.await
.map_err(|e| format!("Failed to join stderr reader: {}", e))??;
Ok((output, stderr_output, status))
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::Stdio;
use tokio::process::Command;
#[tokio::test]
async fn test_collect_process_output_reads_stdout_and_stderr() {
let mut child = Command::new("sh")
.args([
"-c",
"printf 'out-1\\n'; printf 'err-1\\n' >&2; printf 'out-2\\n'; printf 'err-2\\n' >&2",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("process should spawn");
let stdout = child.stdout.take().expect("stdout must be captured");
let stderr = child.stderr.take().expect("stderr must be captured");
let child = Arc::new(AsyncMutex::new(child));
let mut lines = Vec::new();
let (stdout_output, stderr_output, status) =
collect_process_output(child, stdout, stderr, |line| {
lines.push(line.to_string());
Ok(())
})
.await
.expect("collection should succeed");
assert!(status.success());
assert_eq!(lines, vec!["out-1".to_string(), "out-2".to_string()]);
assert_eq!(stdout_output, "out-1\nout-2\n");
assert_eq!(stderr_output, "err-1\nerr-2\n");
}
}

View file

@ -1,4 +1,5 @@
pub mod agent_runtime;
pub mod cli_process;
pub mod crypto;
pub mod filter_engine;
pub mod notifier;

View file

@ -5,12 +5,11 @@ use crate::models::ticket::ProcessedTicket;
use crate::models::tracker::WatchedTracker;
use crate::models::worktree::Worktree;
use crate::services::process_registry::ProcessRegistry;
use crate::services::{notifier, worktree_manager};
use crate::services::{cli_process, notifier, worktree_manager};
use rusqlite::Connection;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::{interval, timeout, Duration};
@ -159,11 +158,12 @@ pub async fn run_cli_command(
let cancellation_requested = Arc::new(AtomicBool::new(false));
process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone());
let (stdin, stdout) = {
let (stdin, stdout, stderr) = {
let mut child_guard = child.lock().await;
let stdin = child_guard.stdin.take();
let stdout = child_guard.stdout.take();
(stdin, stdout)
let stderr = child_guard.stderr.take();
(stdin, stdout, stderr)
};
if let Some(mut stdin) = stdin {
@ -178,51 +178,55 @@ pub async fn run_cli_command(
process_registry.unregister_ticket(ticket_id);
"Failed to capture stdout".to_string()
})?;
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
let stderr = stderr.ok_or_else(|| {
process_registry.unregister_ticket(ticket_id);
"Failed to capture stderr".to_string()
})?;
let read_future = async {
while let Ok(Some(line)) = reader.next_line().await {
let _ = app_handle.emit(
"ticket-processing-progress",
serde_json::json!({
"ticket_id": ticket_id,
"output_chunk": line,
}),
);
output.push_str(&line);
output.push('\n');
}
output
};
let read_future = cli_process::collect_process_output(child.clone(), stdout, stderr, |line| {
let _ = app_handle.emit(
"ticket-processing-progress",
serde_json::json!({
"ticket_id": ticket_id,
"output_chunk": line,
}),
);
Ok(())
});
let result = match timeout(Duration::from_secs(timeout_secs), read_future).await {
Ok(output) => output,
Err(_) => {
cancellation_requested.store(true, Ordering::SeqCst);
{
let mut child_guard = child.lock().await;
let _ = child_guard.start_kill();
let (result, stderr_output, status) =
match timeout(Duration::from_secs(timeout_secs), read_future).await {
Ok(result) => match result {
Ok(values) => values,
Err(e) => {
process_registry.unregister_ticket(ticket_id);
return Err(e);
}
},
Err(_) => {
cancellation_requested.store(true, Ordering::SeqCst);
{
let mut child_guard = child.lock().await;
let _ = child_guard.start_kill();
}
process_registry.unregister_ticket(ticket_id);
return Err(format!("CLI command timed out after {}s", timeout_secs));
}
process_registry.unregister_ticket(ticket_id);
return Err(format!("CLI command timed out after {}s", timeout_secs));
}
};
};
let wait_result = {
let mut child_guard = child.lock().await;
child_guard.wait().await
};
process_registry.unregister_ticket(ticket_id);
let status = wait_result.map_err(|e| format!("Failed to wait for process: {}", e))?;
if cancellation_requested.load(Ordering::SeqCst) {
return Err("CLI command cancelled".to_string());
}
if !status.success() {
let stderr = stderr_output.trim();
let code = status.code().unwrap_or(-1);
return Err(format!("CLI command exited with code {}", code));
if stderr.is_empty() {
return Err(format!("CLI command exited with code {}", code));
}
return Err(format!("CLI command exited with code {}: {}", code, stderr));
}
Ok(result)