From 8de5a328a1db92809dd75bf46daa84d4cf5de7ef Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Wed, 15 Apr 2026 17:42:45 +0200 Subject: [PATCH] feat: stream live agent responses in chat --- src-tauri/src/commands/live_agent.rs | 96 +++++++++++++++- src-tauri/src/models/live_agent.rs | 84 ++++++++++++++ src-tauri/src/services/agent_runtime.rs | 110 ++++++++++++++++++- src/components/projects/ProjectLiveAgent.tsx | 104 +++++++++++++++--- 4 files changed, 373 insertions(+), 21 deletions(-) diff --git a/src-tauri/src/commands/live_agent.rs b/src-tauri/src/commands/live_agent.rs index 792c143..0f2f28e 100644 --- a/src-tauri/src/commands/live_agent.rs +++ b/src-tauri/src/commands/live_agent.rs @@ -14,6 +14,20 @@ pub struct LiveAgentExchange { pub agent_message: LiveMessage, } +#[derive(Debug, Clone, Serialize)] +pub struct LiveAgentStreamChunkPayload { + pub project_id: String, + pub session_id: String, + pub chunk: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct LiveAgentStreamStatusPayload { + pub project_id: String, + pub session_id: String, + pub error: Option, +} + fn build_live_prompt(project: &Project, history: &[LiveMessage], user_message: &str) -> String { let mut prompt = format!( "Tu es un agent assistant pour un projet logiciel.\\n\\n## Projet\\n- Nom: {}\\n- Repo: {}\\n- Branche de base: {}\\n\\n## Consignes\\n- Réponds de manière actionnable et concise.\\n- Si tu proposes du code ou des commandes, explique brièvement pourquoi.\\n- Réponds en français.\\n\\n## Historique récent\\n", @@ -143,22 +157,87 @@ pub async fn send_live_message( let prompt = build_live_prompt(&project, &history, content); let args: Vec = Vec::new(); - let response = agent_runtime::run_agent_command( + let placeholder_agent_message = { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + LiveMessage::insert(&db, &session_id, "agent", "")? + }; + + let _ = app_handle.emit( + "live-agent-stream-started", + LiveAgentStreamStatusPayload { + project_id: session.project_id.clone(), + session_id: session.id.clone(), + error: None, + }, + ); + + let _ = app_handle.emit( + "live-agent-message", + serde_json::json!({ + "project_id": &session.project_id, + "session_id": &session.id, + "message": &placeholder_agent_message, + }), + ); + + let stream_project_id = session.project_id.clone(); + let stream_session_id = session.id.clone(); + let response = agent_runtime::run_agent_command_streaming( agent.tool.to_command(), &args, &prompt, &project.path, 600, + |chunk| { + app_handle + .emit( + "live-agent-stream-chunk", + LiveAgentStreamChunkPayload { + project_id: stream_project_id.clone(), + session_id: stream_session_id.clone(), + chunk: chunk.to_string(), + }, + ) + .map_err(|e| e.to_string()) + }, ) - .await - .map_err(AppError::from)?; + .await; + + let response = match response { + Ok(response) => response, + Err(error) => { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + LiveMessage::delete(&db, &placeholder_agent_message.id)?; + + let _ = app_handle.emit( + "live-agent-stream-error", + LiveAgentStreamStatusPayload { + project_id: session.project_id.clone(), + session_id: session.id.clone(), + error: Some(error.clone()), + }, + ); + + return Err(AppError::from(error)); + } + }; let agent_message = { let db = state .db .lock() .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; - LiveMessage::insert(&db, &session_id, "agent", &response)? + LiveMessage::update_content(&db, &placeholder_agent_message.id, &response)?; + LiveMessage { + content: response.clone(), + ..placeholder_agent_message.clone() + } }; let _ = app_handle.emit( @@ -170,6 +249,15 @@ pub async fn send_live_message( }), ); + let _ = app_handle.emit( + "live-agent-stream-finished", + LiveAgentStreamStatusPayload { + project_id: session.project_id.clone(), + session_id: session.id.clone(), + error: None, + }, + ); + Ok(LiveAgentExchange { user_message, agent_message, diff --git a/src-tauri/src/models/live_agent.rs b/src-tauri/src/models/live_agent.rs index fa3ae46..48ec800 100644 --- a/src-tauri/src/models/live_agent.rs +++ b/src-tauri/src/models/live_agent.rs @@ -158,4 +158,88 @@ impl LiveMessage { messages.reverse(); Ok(messages) } + + pub fn update_content(conn: &Connection, id: &str, content: &str) -> Result<()> { + conn.execute( + "UPDATE project_live_messages SET content = ?1 WHERE id = ?2", + params![content, id], + )?; + Ok(()) + } + + pub fn delete(conn: &Connection, id: &str) -> Result<()> { + conn.execute( + "DELETE FROM project_live_messages WHERE id = ?1", + params![id], + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db; + use crate::models::agent::{Agent, AgentRole, AgentTool}; + use crate::models::project::Project; + + fn setup() -> (Connection, String, String) { + let conn = db::init_in_memory().expect("db init should succeed"); + let project = Project::insert(&conn, "Live Project", "/tmp/live-project", None, "main") + .expect("project insert should succeed"); + let agent = Agent::insert( + &conn, + "Live Agent", + AgentRole::Analyst, + AgentTool::Codex, + "", + ) + .expect("agent insert should succeed"); + + (conn, project.id, agent.id) + } + + #[test] + fn test_create_session_and_list_messages() { + let (conn, project_id, agent_id) = setup(); + let session = LiveSession::create(&conn, &project_id, &agent_id, "Session 1") + .expect("session create"); + + let user_message = + LiveMessage::insert(&conn, &session.id, "user", "Bonjour").expect("message insert"); + let agent_message = + LiveMessage::insert(&conn, &session.id, "agent", "Salut").expect("message insert"); + + let sessions = + LiveSession::list_by_project(&conn, &project_id).expect("session list should work"); + let messages = + LiveMessage::list_by_session(&conn, &session.id).expect("message list should work"); + + assert_eq!(sessions.len(), 1); + assert_eq!(messages.len(), 2); + assert_eq!(messages[0].id, user_message.id); + assert_eq!(messages[1].id, agent_message.id); + } + + #[test] + fn test_update_and_delete_message() { + let (conn, project_id, agent_id) = setup(); + let session = LiveSession::create(&conn, &project_id, &agent_id, "Session 2") + .expect("session create"); + let message = LiveMessage::insert(&conn, &session.id, "agent", "") + .expect("message insert should work"); + + LiveMessage::update_content(&conn, &message.id, "Streaming done") + .expect("message update should work"); + + let messages = + LiveMessage::list_by_session(&conn, &session.id).expect("message list should work"); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].content, "Streaming done"); + + LiveMessage::delete(&conn, &message.id).expect("message delete should work"); + let messages_after_delete = + LiveMessage::list_by_session(&conn, &session.id).expect("message list should work"); + assert!(messages_after_delete.is_empty()); + } } diff --git a/src-tauri/src/services/agent_runtime.rs b/src-tauri/src/services/agent_runtime.rs index 7b5d464..b4dc58c 100644 --- a/src-tauri/src/services/agent_runtime.rs +++ b/src-tauri/src/services/agent_runtime.rs @@ -1,4 +1,4 @@ -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use tokio::time::{timeout, Duration}; @@ -46,3 +46,111 @@ pub async fn run_agent_command( Ok(stdout) } + +pub async fn run_agent_command_streaming( + command: &str, + args: &[String], + prompt: &str, + working_dir: &str, + timeout_secs: u64, + mut on_chunk: F, +) -> Result +where + F: FnMut(&str) -> Result<(), String>, +{ + let mut child = Command::new(command) + .args(args) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .current_dir(working_dir) + .spawn() + .map_err(|e| format!("Failed to spawn '{}': {}", command, e))?; + + if let Some(mut stdin) = child.stdin.take() { + stdin + .write_all(prompt.as_bytes()) + .await + .map_err(|e| format!("Failed to write prompt to stdin: {}", e))?; + } + + let stdout = child.stdout.take().ok_or("Failed to capture stdout")?; + let stderr = child.stderr.take().ok_or("Failed to capture stderr")?; + + let stderr_task = tokio::spawn(async move { + let mut stderr_reader = BufReader::new(stderr); + let mut stderr_output = String::new(); + stderr_reader + .read_to_string(&mut stderr_output) + .await + .map_err(|e| format!("Failed to read stderr: {}", e))?; + Ok::(stderr_output) + }); + + let read_future = async { + let mut reader = BufReader::new(stdout).lines(); + let mut output = String::new(); + + while let Ok(Some(line)) = reader.next_line().await { + let chunk = if output.is_empty() { + line + } else { + format!("\n{}", line) + }; + on_chunk(&chunk)?; + output.push_str(&chunk); + } + + let status = child + .wait() + .await + .map_err(|e| format!("Failed to wait for process: {}", e))?; + + let stderr_output = stderr_task + .await + .map_err(|e| format!("Failed to join stderr reader: {}", e))??; + + Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status)) + }; + + let (output, stderr_output, status) = timeout(Duration::from_secs(timeout_secs), read_future) + .await + .map_err(|_| format!("CLI command timed out after {}s", timeout_secs))??; + + if !status.success() { + let stderr = stderr_output.trim().to_string(); + let code = status.code().unwrap_or(-1); + if stderr.is_empty() { + return Err(format!("CLI command exited with code {}", code)); + } + return Err(format!("CLI command exited with code {}: {}", code, stderr)); + } + + let stdout = output.trim().to_string(); + if stdout.is_empty() { + return Ok("(empty response)".to_string()); + } + + Ok(stdout) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_run_agent_command_streaming_collects_chunks() { + let args = vec!["-c".to_string(), "cat".to_string()]; + let mut chunks = Vec::new(); + + let output = run_agent_command_streaming("sh", &args, "hello\nworld\n", ".", 5, |chunk| { + chunks.push(chunk.to_string()); + Ok(()) + }) + .await + .expect("streaming command should succeed"); + + assert_eq!(output, "hello\nworld"); + assert_eq!(chunks, vec!["hello".to_string(), "\nworld".to_string()]); + } +} diff --git a/src/components/projects/ProjectLiveAgent.tsx b/src/components/projects/ProjectLiveAgent.tsx index 9f83bc6..ec469c3 100644 --- a/src/components/projects/ProjectLiveAgent.tsx +++ b/src/components/projects/ProjectLiveAgent.tsx @@ -18,6 +18,18 @@ interface LiveEventPayload { message: LiveMessage; } +interface LiveStreamChunkPayload { + project_id: string; + session_id: string; + chunk: string; +} + +interface LiveStreamStatusPayload { + project_id: string; + session_id: string; + error?: string | null; +} + export default function ProjectLiveAgent() { const { projectId } = useParams<{ projectId: string }>(); const [agents, setAgents] = useState([]); @@ -30,6 +42,7 @@ export default function ProjectLiveAgent() { const [sending, setSending] = useState(false); const [creatingSession, setCreatingSession] = useState(false); const [moduleEnabled, setModuleEnabled] = useState(true); + const [streamingAgentResponse, setStreamingAgentResponse] = useState(null); const [error, setError] = useState(null); const usableAgents = useMemo( @@ -97,20 +110,67 @@ export default function ProjectLiveAgent() { let stop: (() => void) | null = null; void (async () => { - const unlisten = await listen("live-agent-message", (event) => { - const payload = event.payload; - if (payload.project_id !== projectId) return; - if (payload.session_id !== selectedSessionId) return; + const [unlistenMessage, unlistenStarted, unlistenChunk, unlistenFinished, unlistenError] = + await Promise.all([ + listen("live-agent-message", (event) => { + const payload = event.payload; + if (payload.project_id !== projectId) return; + if (payload.session_id !== selectedSessionId) return; - setMessages((prev) => { - if (prev.some((msg) => msg.id === payload.message.id)) { - return prev; - } - return [...prev, payload.message]; - }); - }); + setMessages((prev) => { + const existingIndex = prev.findIndex((msg) => msg.id === payload.message.id); + if (existingIndex === -1) { + return [...prev, payload.message]; + } - stop = unlisten; + const next = [...prev]; + next[existingIndex] = payload.message; + return next; + }); + + if (payload.message.sender === "agent") { + setStreamingAgentResponse(null); + } + }), + listen("live-agent-stream-started", (event) => { + const payload = event.payload; + if (payload.project_id !== projectId) return; + if (payload.session_id !== selectedSessionId) return; + setStreamingAgentResponse(""); + }), + listen("live-agent-stream-chunk", (event) => { + const payload = event.payload; + if (payload.project_id !== projectId) return; + if (payload.session_id !== selectedSessionId) return; + setStreamingAgentResponse((prev) => `${prev ?? ""}${payload.chunk}`); + }), + listen("live-agent-stream-finished", (event) => { + const payload = event.payload; + if (payload.project_id !== projectId) return; + if (payload.session_id !== selectedSessionId) return; + setStreamingAgentResponse(null); + }), + listen("live-agent-stream-error", (event) => { + const payload = event.payload; + if (payload.project_id !== projectId) return; + if (payload.session_id !== selectedSessionId) return; + setStreamingAgentResponse(null); + setMessages((prev) => + prev.filter((msg) => !(msg.sender === "agent" && msg.content.trim() === "")) + ); + if (payload.error) { + setError(payload.error); + } + }), + ]); + + stop = () => { + unlistenMessage(); + unlistenStarted(); + unlistenChunk(); + unlistenFinished(); + unlistenError(); + }; })(); return () => { @@ -154,10 +214,9 @@ export default function ProjectLiveAgent() { try { await sendLiveMessage(selectedSessionId, content); - const updated = await listLiveMessages(selectedSessionId); - setMessages(updated); } catch (err: unknown) { setDraft(content); + setStreamingAgentResponse(null); setError(getErrorMessage(err)); } finally { setSending(false); @@ -166,6 +225,7 @@ export default function ProjectLiveAgent() { async function handleSessionChange(sessionId: string) { setSelectedSessionId(sessionId); + setStreamingAgentResponse(null); if (!sessionId) { setMessages([]); return; @@ -258,7 +318,9 @@ export default function ProjectLiveAgent() {
Discussion
- {messages.map((msg) => ( + {messages + .filter((msg) => !(msg.sender === "agent" && msg.content.trim() === "")) + .map((msg) => (
{msg.content}
))} - {messages.length === 0 && ( + {streamingAgentResponse !== null && ( +
+
+ agent +
+
+ {streamingAgentResponse || "En train de repondre..."} +
+
+ )} + {messages.length === 0 && streamingAgentResponse === null && (
Pas encore de message.
)}