feat: stream live agent responses in chat

This commit is contained in:
thibaud-lclr 2026-04-15 17:42:45 +02:00
parent 1952a139ae
commit 8de5a328a1
4 changed files with 373 additions and 21 deletions

View file

@ -14,6 +14,20 @@ pub struct LiveAgentExchange {
pub agent_message: LiveMessage,
}
#[derive(Debug, Clone, Serialize)]
pub struct LiveAgentStreamChunkPayload {
pub project_id: String,
pub session_id: String,
pub chunk: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct LiveAgentStreamStatusPayload {
pub project_id: String,
pub session_id: String,
pub error: Option<String>,
}
fn build_live_prompt(project: &Project, history: &[LiveMessage], user_message: &str) -> String {
let mut prompt = format!(
"Tu es un agent assistant pour un projet logiciel.\\n\\n## Projet\\n- Nom: {}\\n- Repo: {}\\n- Branche de base: {}\\n\\n## Consignes\\n- Réponds de manière actionnable et concise.\\n- Si tu proposes du code ou des commandes, explique brièvement pourquoi.\\n- Réponds en français.\\n\\n## Historique récent\\n",
@ -143,22 +157,87 @@ pub async fn send_live_message(
let prompt = build_live_prompt(&project, &history, content);
let args: Vec<String> = Vec::new();
let response = agent_runtime::run_agent_command(
let placeholder_agent_message = {
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
LiveMessage::insert(&db, &session_id, "agent", "")?
};
let _ = app_handle.emit(
"live-agent-stream-started",
LiveAgentStreamStatusPayload {
project_id: session.project_id.clone(),
session_id: session.id.clone(),
error: None,
},
);
let _ = app_handle.emit(
"live-agent-message",
serde_json::json!({
"project_id": &session.project_id,
"session_id": &session.id,
"message": &placeholder_agent_message,
}),
);
let stream_project_id = session.project_id.clone();
let stream_session_id = session.id.clone();
let response = agent_runtime::run_agent_command_streaming(
agent.tool.to_command(),
&args,
&prompt,
&project.path,
600,
|chunk| {
app_handle
.emit(
"live-agent-stream-chunk",
LiveAgentStreamChunkPayload {
project_id: stream_project_id.clone(),
session_id: stream_session_id.clone(),
chunk: chunk.to_string(),
},
)
.await
.map_err(AppError::from)?;
.map_err(|e| e.to_string())
},
)
.await;
let response = match response {
Ok(response) => response,
Err(error) => {
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
LiveMessage::delete(&db, &placeholder_agent_message.id)?;
let _ = app_handle.emit(
"live-agent-stream-error",
LiveAgentStreamStatusPayload {
project_id: session.project_id.clone(),
session_id: session.id.clone(),
error: Some(error.clone()),
},
);
return Err(AppError::from(error));
}
};
let agent_message = {
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
LiveMessage::insert(&db, &session_id, "agent", &response)?
LiveMessage::update_content(&db, &placeholder_agent_message.id, &response)?;
LiveMessage {
content: response.clone(),
..placeholder_agent_message.clone()
}
};
let _ = app_handle.emit(
@ -170,6 +249,15 @@ pub async fn send_live_message(
}),
);
let _ = app_handle.emit(
"live-agent-stream-finished",
LiveAgentStreamStatusPayload {
project_id: session.project_id.clone(),
session_id: session.id.clone(),
error: None,
},
);
Ok(LiveAgentExchange {
user_message,
agent_message,

View file

@ -158,4 +158,88 @@ impl LiveMessage {
messages.reverse();
Ok(messages)
}
pub fn update_content(conn: &Connection, id: &str, content: &str) -> Result<()> {
conn.execute(
"UPDATE project_live_messages SET content = ?1 WHERE id = ?2",
params![content, id],
)?;
Ok(())
}
pub fn delete(conn: &Connection, id: &str) -> Result<()> {
conn.execute(
"DELETE FROM project_live_messages WHERE id = ?1",
params![id],
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db;
use crate::models::agent::{Agent, AgentRole, AgentTool};
use crate::models::project::Project;
fn setup() -> (Connection, String, String) {
let conn = db::init_in_memory().expect("db init should succeed");
let project = Project::insert(&conn, "Live Project", "/tmp/live-project", None, "main")
.expect("project insert should succeed");
let agent = Agent::insert(
&conn,
"Live Agent",
AgentRole::Analyst,
AgentTool::Codex,
"",
)
.expect("agent insert should succeed");
(conn, project.id, agent.id)
}
#[test]
fn test_create_session_and_list_messages() {
let (conn, project_id, agent_id) = setup();
let session = LiveSession::create(&conn, &project_id, &agent_id, "Session 1")
.expect("session create");
let user_message =
LiveMessage::insert(&conn, &session.id, "user", "Bonjour").expect("message insert");
let agent_message =
LiveMessage::insert(&conn, &session.id, "agent", "Salut").expect("message insert");
let sessions =
LiveSession::list_by_project(&conn, &project_id).expect("session list should work");
let messages =
LiveMessage::list_by_session(&conn, &session.id).expect("message list should work");
assert_eq!(sessions.len(), 1);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].id, user_message.id);
assert_eq!(messages[1].id, agent_message.id);
}
#[test]
fn test_update_and_delete_message() {
let (conn, project_id, agent_id) = setup();
let session = LiveSession::create(&conn, &project_id, &agent_id, "Session 2")
.expect("session create");
let message = LiveMessage::insert(&conn, &session.id, "agent", "")
.expect("message insert should work");
LiveMessage::update_content(&conn, &message.id, "Streaming done")
.expect("message update should work");
let messages =
LiveMessage::list_by_session(&conn, &session.id).expect("message list should work");
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].content, "Streaming done");
LiveMessage::delete(&conn, &message.id).expect("message delete should work");
let messages_after_delete =
LiveMessage::list_by_session(&conn, &session.id).expect("message list should work");
assert!(messages_after_delete.is_empty());
}
}

View file

@ -1,4 +1,4 @@
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::time::{timeout, Duration};
@ -46,3 +46,111 @@ pub async fn run_agent_command(
Ok(stdout)
}
pub async fn run_agent_command_streaming<F>(
command: &str,
args: &[String],
prompt: &str,
working_dir: &str,
timeout_secs: u64,
mut on_chunk: F,
) -> Result<String, String>
where
F: FnMut(&str) -> Result<(), String>,
{
let mut child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(working_dir)
.spawn()
.map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
if let Some(mut stdin) = child.stdin.take() {
stdin
.write_all(prompt.as_bytes())
.await
.map_err(|e| format!("Failed to write prompt to stdin: {}", e))?;
}
let stdout = child.stdout.take().ok_or("Failed to capture stdout")?;
let stderr = child.stderr.take().ok_or("Failed to capture stderr")?;
let stderr_task = tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr);
let mut stderr_output = String::new();
stderr_reader
.read_to_string(&mut stderr_output)
.await
.map_err(|e| format!("Failed to read stderr: {}", e))?;
Ok::<String, String>(stderr_output)
});
let read_future = async {
let mut reader = BufReader::new(stdout).lines();
let mut output = String::new();
while let Ok(Some(line)) = reader.next_line().await {
let chunk = if output.is_empty() {
line
} else {
format!("\n{}", line)
};
on_chunk(&chunk)?;
output.push_str(&chunk);
}
let status = child
.wait()
.await
.map_err(|e| format!("Failed to wait for process: {}", e))?;
let stderr_output = stderr_task
.await
.map_err(|e| format!("Failed to join stderr reader: {}", e))??;
Ok::<(String, String, std::process::ExitStatus), String>((output, stderr_output, status))
};
let (output, stderr_output, status) = timeout(Duration::from_secs(timeout_secs), read_future)
.await
.map_err(|_| format!("CLI command timed out after {}s", timeout_secs))??;
if !status.success() {
let stderr = stderr_output.trim().to_string();
let code = status.code().unwrap_or(-1);
if stderr.is_empty() {
return Err(format!("CLI command exited with code {}", code));
}
return Err(format!("CLI command exited with code {}: {}", code, stderr));
}
let stdout = output.trim().to_string();
if stdout.is_empty() {
return Ok("(empty response)".to_string());
}
Ok(stdout)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_run_agent_command_streaming_collects_chunks() {
let args = vec!["-c".to_string(), "cat".to_string()];
let mut chunks = Vec::new();
let output = run_agent_command_streaming("sh", &args, "hello\nworld\n", ".", 5, |chunk| {
chunks.push(chunk.to_string());
Ok(())
})
.await
.expect("streaming command should succeed");
assert_eq!(output, "hello\nworld");
assert_eq!(chunks, vec!["hello".to_string(), "\nworld".to_string()]);
}
}

View file

@ -18,6 +18,18 @@ interface LiveEventPayload {
message: LiveMessage;
}
interface LiveStreamChunkPayload {
project_id: string;
session_id: string;
chunk: string;
}
interface LiveStreamStatusPayload {
project_id: string;
session_id: string;
error?: string | null;
}
export default function ProjectLiveAgent() {
const { projectId } = useParams<{ projectId: string }>();
const [agents, setAgents] = useState<Agent[]>([]);
@ -30,6 +42,7 @@ export default function ProjectLiveAgent() {
const [sending, setSending] = useState(false);
const [creatingSession, setCreatingSession] = useState(false);
const [moduleEnabled, setModuleEnabled] = useState(true);
const [streamingAgentResponse, setStreamingAgentResponse] = useState<string | null>(null);
const [error, setError] = useState<string | null>(null);
const usableAgents = useMemo(
@ -97,20 +110,67 @@ export default function ProjectLiveAgent() {
let stop: (() => void) | null = null;
void (async () => {
const unlisten = await listen<LiveEventPayload>("live-agent-message", (event) => {
const [unlistenMessage, unlistenStarted, unlistenChunk, unlistenFinished, unlistenError] =
await Promise.all([
listen<LiveEventPayload>("live-agent-message", (event) => {
const payload = event.payload;
if (payload.project_id !== projectId) return;
if (payload.session_id !== selectedSessionId) return;
setMessages((prev) => {
if (prev.some((msg) => msg.id === payload.message.id)) {
return prev;
}
const existingIndex = prev.findIndex((msg) => msg.id === payload.message.id);
if (existingIndex === -1) {
return [...prev, payload.message];
});
}
const next = [...prev];
next[existingIndex] = payload.message;
return next;
});
stop = unlisten;
if (payload.message.sender === "agent") {
setStreamingAgentResponse(null);
}
}),
listen<LiveStreamStatusPayload>("live-agent-stream-started", (event) => {
const payload = event.payload;
if (payload.project_id !== projectId) return;
if (payload.session_id !== selectedSessionId) return;
setStreamingAgentResponse("");
}),
listen<LiveStreamChunkPayload>("live-agent-stream-chunk", (event) => {
const payload = event.payload;
if (payload.project_id !== projectId) return;
if (payload.session_id !== selectedSessionId) return;
setStreamingAgentResponse((prev) => `${prev ?? ""}${payload.chunk}`);
}),
listen<LiveStreamStatusPayload>("live-agent-stream-finished", (event) => {
const payload = event.payload;
if (payload.project_id !== projectId) return;
if (payload.session_id !== selectedSessionId) return;
setStreamingAgentResponse(null);
}),
listen<LiveStreamStatusPayload>("live-agent-stream-error", (event) => {
const payload = event.payload;
if (payload.project_id !== projectId) return;
if (payload.session_id !== selectedSessionId) return;
setStreamingAgentResponse(null);
setMessages((prev) =>
prev.filter((msg) => !(msg.sender === "agent" && msg.content.trim() === ""))
);
if (payload.error) {
setError(payload.error);
}
}),
]);
stop = () => {
unlistenMessage();
unlistenStarted();
unlistenChunk();
unlistenFinished();
unlistenError();
};
})();
return () => {
@ -154,10 +214,9 @@ export default function ProjectLiveAgent() {
try {
await sendLiveMessage(selectedSessionId, content);
const updated = await listLiveMessages(selectedSessionId);
setMessages(updated);
} catch (err: unknown) {
setDraft(content);
setStreamingAgentResponse(null);
setError(getErrorMessage(err));
} finally {
setSending(false);
@ -166,6 +225,7 @@ export default function ProjectLiveAgent() {
async function handleSessionChange(sessionId: string) {
setSelectedSessionId(sessionId);
setStreamingAgentResponse(null);
if (!sessionId) {
setMessages([]);
return;
@ -258,7 +318,9 @@ export default function ProjectLiveAgent() {
<div className="rounded-lg border border-gray-200 bg-white p-4">
<div className="mb-3 text-sm font-semibold text-gray-800">Discussion</div>
<div className="max-h-[360px] space-y-2 overflow-y-auto rounded border border-gray-100 bg-gray-50 p-3">
{messages.map((msg) => (
{messages
.filter((msg) => !(msg.sender === "agent" && msg.content.trim() === ""))
.map((msg) => (
<div
key={msg.id}
className={`rounded px-3 py-2 text-sm ${
@ -275,7 +337,17 @@ export default function ProjectLiveAgent() {
<div className="whitespace-pre-wrap">{msg.content}</div>
</div>
))}
{messages.length === 0 && (
{streamingAgentResponse !== null && (
<div className="rounded bg-blue-100 px-3 py-2 text-sm text-blue-900">
<div className="mb-1 text-[11px] font-semibold uppercase tracking-wide opacity-70">
agent
</div>
<div className="whitespace-pre-wrap">
{streamingAgentResponse || "En train de repondre..."}
</div>
</div>
)}
{messages.length === 0 && streamingAgentResponse === null && (
<div className="text-sm text-gray-400">Pas encore de message.</div>
)}
</div>