diff --git a/src-tauri/src/services/orchestrator.rs b/src-tauri/src/services/orchestrator.rs index 40f20be..a73b982 100644 --- a/src-tauri/src/services/orchestrator.rs +++ b/src-tauri/src/services/orchestrator.rs @@ -10,6 +10,7 @@ use crate::models::worktree::Worktree; use crate::services::process_registry::ProcessRegistry; use crate::services::{cli_process, notifier, worktree_manager}; use rusqlite::Connection; +use std::mem; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tauri::{AppHandle, Emitter}; @@ -17,6 +18,9 @@ use tokio::process::Command; use tokio::sync::Mutex as AsyncMutex; use tokio::time::{interval, timeout, Duration}; +const PROGRESS_BUFFER_MAX_BYTES: usize = 2048; +const PROGRESS_EMIT_INTERVAL_MS: u128 = 250; + #[derive(Debug, Clone, PartialEq)] pub enum Verdict { FixNeeded, @@ -207,6 +211,15 @@ fn validate_developer_completion( evaluate_developer_completion(developer_report, commits.len(), !diff.trim().is_empty()) } +fn should_flush_progress_buffer(buffer: &str, elapsed_since_last_emit: std::time::Duration) -> bool { + if buffer.is_empty() { + return false; + } + + buffer.len() >= PROGRESS_BUFFER_MAX_BYTES + || elapsed_since_last_emit.as_millis() >= PROGRESS_EMIT_INTERVAL_MS +} + fn recover_interrupted_tickets(db: &Arc>) -> Result { let inflight = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; @@ -305,14 +318,27 @@ pub async fn run_cli_command( "Failed to capture stderr".to_string() })?; + let mut progress_buffer = String::new(); + let mut last_progress_emit = std::time::Instant::now(); let read_future = cli_process::collect_process_output(child.clone(), stdout, stderr, |line| { - let _ = app_handle.emit( - "ticket-processing-progress", - serde_json::json!({ - "ticket_id": ticket_id, - "output_chunk": line, - }), - ); + if !line.is_empty() { + if !progress_buffer.is_empty() { + progress_buffer.push('\n'); + } + progress_buffer.push_str(line); + } + + if should_flush_progress_buffer(&progress_buffer, last_progress_emit.elapsed()) { + let chunk = mem::take(&mut progress_buffer); + let _ = app_handle.emit( + "ticket-processing-progress", + serde_json::json!({ + "ticket_id": ticket_id, + "output_chunk": chunk, + }), + ); + last_progress_emit = std::time::Instant::now(); + } Ok(()) }); @@ -336,6 +362,17 @@ pub async fn run_cli_command( } }; + if !progress_buffer.is_empty() { + let chunk = mem::take(&mut progress_buffer); + let _ = app_handle.emit( + "ticket-processing-progress", + serde_json::json!({ + "ticket_id": ticket_id, + "output_chunk": chunk, + }), + ); + } + process_registry.unregister_ticket(ticket_id); if cancellation_requested.load(Ordering::SeqCst) { @@ -959,6 +996,31 @@ mod tests { assert_eq!(updated.status, "Pending"); } + #[test] + fn test_should_flush_progress_buffer_when_elapsed_interval_exceeded() { + let should_flush = should_flush_progress_buffer( + "some progress", + std::time::Duration::from_millis(300), + ); + assert!(should_flush); + } + + #[test] + fn test_should_flush_progress_buffer_when_buffer_reaches_size_limit() { + let payload = "x".repeat(2048); + let should_flush = should_flush_progress_buffer( + &payload, + std::time::Duration::from_millis(10), + ); + assert!(should_flush); + } + + #[test] + fn test_should_not_flush_progress_buffer_when_empty() { + let should_flush = should_flush_progress_buffer("", std::time::Duration::from_secs(1)); + assert!(!should_flush); + } + #[test] fn test_evaluate_developer_completion_rejects_permission_block_report() { let result = evaluate_developer_completion(