diff --git a/src-tauri/src/commands/graylog.rs b/src-tauri/src/commands/graylog.rs index fd0c4b0..ab2c329 100644 --- a/src-tauri/src/commands/graylog.rs +++ b/src-tauri/src/commands/graylog.rs @@ -172,6 +172,7 @@ pub async fn manual_graylog_poll( &state.http_client, &app_handle, &project_id, + &state.activity_state, ) .await .map_err(AppError::from) diff --git a/src-tauri/src/commands/poller.rs b/src-tauri/src/commands/poller.rs index 3379d96..1ef2e29 100644 --- a/src-tauri/src/commands/poller.rs +++ b/src-tauri/src/commands/poller.rs @@ -6,8 +6,21 @@ use crate::models::tracker::WatchedTracker; use crate::services::tuleap_client::TuleapClient; use crate::services::{crypto, filter_engine, notifier}; use crate::AppState; +use serde::Serialize; +use std::collections::HashMap; use tauri::{Emitter, State}; +struct PollActivityGuard { + activity_state: crate::services::activity_state::ActivityState, + key: String, +} + +impl Drop for PollActivityGuard { + fn drop(&mut self) { + self.activity_state.finish_poll(&self.key); + } +} + #[tauri::command] pub async fn manual_poll( state: State<'_, AppState>, @@ -51,6 +64,15 @@ pub async fn manual_poll( (tracker, client) }; // lock dropped here + let poll_key = format!("tuleap:{}", tracker.id); + state + .activity_state + .start_poll(&poll_key, &tracker.project_id, &tracker.tracker_label); + let _poll_guard = PollActivityGuard { + activity_state: state.activity_state.clone(), + key: poll_key.clone(), + }; + let _ = app_handle.emit( "polling-started", serde_json::json!({ @@ -179,3 +201,98 @@ pub fn get_project_throughput( let stats = ProcessedTicket::get_project_throughput_stats(&db, &project_id)?; Ok(stats) } + +#[derive(Debug, Serialize)] +pub struct RuntimeActivePoll { + pub key: String, + pub label: String, +} + +#[derive(Debug, Serialize)] +pub struct RuntimeActiveAgent { + pub ticket_id: String, + pub artifact_id: i32, + pub step: String, +} + +#[derive(Debug, Serialize)] +pub struct RuntimeActivitySnapshot { + pub active_polls: Vec, + pub active_agents: Vec, +} + +fn step_from_status(status: &str) -> &'static str { + match status { + "Developing" => "developer", + "Analyzing" => "analyst", + _ => "processing", + } +} + +#[tauri::command] +pub fn get_runtime_activity( + state: State<'_, AppState>, + project_id: String, +) -> Result { + let mut active_polls = state + .activity_state + .list_project_polls(&project_id) + .into_iter() + .map(|(key, label)| RuntimeActivePoll { key, label }) + .collect::>(); + active_polls.sort_by(|a, b| a.label.to_lowercase().cmp(&b.label.to_lowercase())); + + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + let project_tickets = ProcessedTicket::list_by_project(&db, &project_id)?; + + let mut agents_by_ticket_id: HashMap = HashMap::new(); + for ticket in &project_tickets { + if ticket.status == "Analyzing" || ticket.status == "Developing" { + agents_by_ticket_id.insert( + ticket.id.clone(), + RuntimeActiveAgent { + ticket_id: ticket.id.clone(), + artifact_id: ticket.artifact_id, + step: step_from_status(&ticket.status).to_string(), + }, + ); + } + } + + for ticket_id in state.process_registry.list_ticket_ids() { + if agents_by_ticket_id.contains_key(&ticket_id) { + continue; + } + + let Ok(ticket) = ProcessedTicket::get_by_id(&db, &ticket_id) else { + continue; + }; + if ticket.project_id != project_id { + continue; + } + + agents_by_ticket_id.insert( + ticket.id.clone(), + RuntimeActiveAgent { + ticket_id: ticket.id.clone(), + artifact_id: ticket.artifact_id, + step: step_from_status(&ticket.status).to_string(), + }, + ); + } + + let mut active_agents = agents_by_ticket_id.into_values().collect::>(); + active_agents.sort_by(|a, b| { + a.artifact_id + .cmp(&b.artifact_id) + .then_with(|| a.ticket_id.cmp(&b.ticket_id)) + }); + + Ok(RuntimeActivitySnapshot { + active_polls, + active_agents, + }) +} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 5dbf874..7517569 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -12,6 +12,7 @@ pub struct AppState { pub encryption_key: [u8; 32], pub http_client: reqwest::Client, pub process_registry: services::process_registry::ProcessRegistry, + pub activity_state: services::activity_state::ActivityState, } #[cfg_attr(mobile, tauri::mobile_entry_point)] @@ -31,6 +32,7 @@ pub fn run() { let http_client = reqwest::Client::new(); let process_registry = services::process_registry::ProcessRegistry::default(); + let activity_state = services::activity_state::ActivityState::default(); let db_arc = Arc::new(Mutex::new(conn)); app.manage(AppState { @@ -38,6 +40,7 @@ pub fn run() { encryption_key, http_client: http_client.clone(), process_registry: process_registry.clone(), + activity_state: activity_state.clone(), }); // Start background poller @@ -46,6 +49,7 @@ pub fn run() { encryption_key, http_client.clone(), app.handle().clone(), + activity_state.clone(), ); services::graylog_poller::start( @@ -53,6 +57,7 @@ pub fn run() { encryption_key, http_client, app.handle().clone(), + activity_state, ); // Start agent orchestrator @@ -99,6 +104,7 @@ pub fn run() { commands::poller::manual_poll, commands::poller::get_queue_status, commands::poller::get_project_throughput, + commands::poller::get_runtime_activity, commands::notification::list_notifications, commands::notification::mark_notification_read, commands::notification::mark_all_notifications_read, diff --git a/src-tauri/src/services/activity_state.rs b/src-tauri/src/services/activity_state.rs new file mode 100644 index 0000000..69f8d97 --- /dev/null +++ b/src-tauri/src/services/activity_state.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +#[derive(Clone, Debug)] +struct ActivePollEntry { + project_id: String, + label: String, + count: usize, +} + +#[derive(Clone, Default)] +pub struct ActivityState { + active_polls: Arc>>, +} + +impl ActivityState { + pub fn start_poll(&self, key: &str, project_id: &str, label: &str) { + let mut polls = self + .active_polls + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + let entry = polls.entry(key.to_string()).or_insert_with(|| ActivePollEntry { + project_id: project_id.to_string(), + label: label.to_string(), + count: 0, + }); + entry.project_id = project_id.to_string(); + entry.label = label.to_string(); + entry.count += 1; + } + + pub fn finish_poll(&self, key: &str) { + let mut polls = self + .active_polls + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + if let Some(entry) = polls.get_mut(key) { + if entry.count > 1 { + entry.count -= 1; + } else { + polls.remove(key); + } + } + } + + pub fn list_project_polls(&self, project_id: &str) -> Vec<(String, String)> { + let polls = self + .active_polls + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + + polls + .iter() + .filter(|(_, entry)| entry.project_id == project_id) + .map(|(key, entry)| (key.clone(), entry.label.clone())) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::ActivityState; + + #[test] + fn test_activity_state_tracks_concurrent_polls() { + let state = ActivityState::default(); + + state.start_poll("tuleap:tracker-1", "project-1", "Tracker A"); + state.start_poll("tuleap:tracker-1", "project-1", "Tracker A"); + assert_eq!(state.list_project_polls("project-1").len(), 1); + + state.finish_poll("tuleap:tracker-1"); + assert_eq!(state.list_project_polls("project-1").len(), 1); + + state.finish_poll("tuleap:tracker-1"); + assert_eq!(state.list_project_polls("project-1").len(), 0); + } +} diff --git a/src-tauri/src/services/graylog_poller.rs b/src-tauri/src/services/graylog_poller.rs index 9bbdf77..cae97d1 100644 --- a/src-tauri/src/services/graylog_poller.rs +++ b/src-tauri/src/services/graylog_poller.rs @@ -1,6 +1,7 @@ use crate::models::graylog::{GraylogCredentials, GraylogDetection, GraylogSubject}; use crate::models::module::{ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE}; use crate::models::ticket::ProcessedTicket; +use crate::services::activity_state::ActivityState; use crate::services::crypto; use crate::services::graylog_client::GraylogClient; use crate::services::graylog_scoring::{group_subjects, SubjectAggregate}; @@ -99,11 +100,23 @@ fn emit_polling_error(app_handle: &AppHandle, project_id: &str, error: &str) { ); } +struct PollActivityGuard { + activity_state: ActivityState, + key: String, +} + +impl Drop for PollActivityGuard { + fn drop(&mut self) { + self.activity_state.finish_poll(&self.key); + } +} + pub fn start( db: Arc>, encryption_key: [u8; 32], http_client: reqwest::Client, app_handle: AppHandle, + activity_state: ActivityState, ) { tauri::async_runtime::spawn(async move { let mut tick = interval(Duration::from_secs(60)); @@ -156,6 +169,7 @@ pub fn start( &http_client, &app_handle, &project_id, + &activity_state, ) .await { @@ -175,6 +189,7 @@ pub async fn poll_project_once( http_client: &reqwest::Client, app_handle: &AppHandle, project_id: &str, + activity_state: &ActivityState, ) -> Result { let credentials = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; @@ -206,6 +221,12 @@ pub async fn poll_project_once( "graylog-polling-started", serde_json::json!({ "project_id": project_id }), ); + let poll_key = format!("graylog:{}", project_id); + activity_state.start_poll(&poll_key, project_id, "Graylog"); + let _poll_guard = PollActivityGuard { + activity_state: activity_state.clone(), + key: poll_key, + }; let events = match client .search_relative(&query, credentials.stream_id.as_deref(), range_seconds) diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index 6cfe0f9..9303c1e 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -1,3 +1,4 @@ +pub mod activity_state; pub mod agent_runtime; pub mod cli_process; pub mod crypto; diff --git a/src-tauri/src/services/poller.rs b/src-tauri/src/services/poller.rs index d820906..3568916 100644 --- a/src-tauri/src/services/poller.rs +++ b/src-tauri/src/services/poller.rs @@ -1,6 +1,7 @@ use crate::models::credential::TuleapCredentials; use crate::models::ticket::ProcessedTicket; use crate::models::tracker::WatchedTracker; +use crate::services::activity_state::ActivityState; use crate::services::tuleap_client::TuleapClient; use crate::services::{crypto, filter_engine, notifier}; use rusqlite::Connection; @@ -13,12 +14,14 @@ pub fn start( encryption_key: [u8; 32], http_client: reqwest::Client, app_handle: AppHandle, + activity_state: ActivityState, ) { tauri::async_runtime::spawn(async move { let mut tick = interval(Duration::from_secs(60)); loop { tick.tick().await; - poll_all_trackers(&db, &encryption_key, &http_client, &app_handle).await; + poll_all_trackers(&db, &encryption_key, &http_client, &app_handle, &activity_state) + .await; } }); } @@ -28,6 +31,7 @@ async fn poll_all_trackers( encryption_key: &[u8; 32], http_client: &reqwest::Client, app_handle: &AppHandle, + activity_state: &ActivityState, ) { // 1. Read all enabled trackers from DB let trackers = { @@ -97,7 +101,7 @@ async fn poll_all_trackers( TuleapClient::new(http_client, &creds.tuleap_url, &creds.username, &password) }; - poll_single_tracker(db, &client, tracker, app_handle).await; + poll_single_tracker(db, &client, tracker, app_handle, activity_state).await; } } @@ -142,7 +146,11 @@ async fn poll_single_tracker( client: &TuleapClient, tracker: &WatchedTracker, app_handle: &AppHandle, + activity_state: &ActivityState, ) { + let poll_key = format!("tuleap:{}", tracker.id); + activity_state.start_poll(&poll_key, &tracker.project_id, &tracker.tracker_label); + let _ = app_handle.emit( "polling-started", serde_json::json!({ @@ -171,6 +179,7 @@ async fn poll_single_tracker( "error": e, }), ); + activity_state.finish_poll(&poll_key); return; } }; @@ -184,6 +193,17 @@ async fn poll_single_tracker( Ok(c) => c, Err(e) => { eprintln!("poller: failed to lock db for insert: {}", e); + let _ = app_handle.emit( + "polling-error", + serde_json::json!({ + "project_id": &tracker.project_id, + "tracker_id": &tracker.id, + "tracker_label": &tracker.tracker_label, + "source": "scheduled", + "error": format!("database lock failed while inserting: {}", e), + }), + ); + activity_state.finish_poll(&poll_key); return; } }; @@ -268,6 +288,7 @@ async fn poll_single_tracker( "new_tickets_count": new_tickets.len(), }), ); + activity_state.finish_poll(&poll_key); } #[cfg(test)] diff --git a/src-tauri/src/services/process_registry.rs b/src-tauri/src/services/process_registry.rs index ef74c03..9959db4 100644 --- a/src-tauri/src/services/process_registry.rs +++ b/src-tauri/src/services/process_registry.rs @@ -54,6 +54,14 @@ impl ProcessRegistry { self.cancel_process(&self.ticket_processes, ticket_id).await } + pub fn list_ticket_ids(&self) -> Vec { + let processes = self + .ticket_processes + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + processes.keys().cloned().collect() + } + pub fn register_task( &self, task_id: &str, diff --git a/src/components/projects/ProjectDashboard.tsx b/src/components/projects/ProjectDashboard.tsx index 404a035..49e2e87 100644 --- a/src/components/projects/ProjectDashboard.tsx +++ b/src/components/projects/ProjectDashboard.tsx @@ -7,6 +7,7 @@ import { listTrackers, listProcessedTickets, getProjectThroughput, + getRuntimeActivity, } from "../../lib/api"; import type { Project, @@ -95,10 +96,37 @@ export default function ProjectDashboard() { setThroughput(stats); }, [projectId]); + const syncRuntimeActivity = useCallback(async () => { + if (!projectId) return; + + try { + const snapshot = await getRuntimeActivity(projectId); + setActivePolls( + Object.fromEntries(snapshot.active_polls.map((poll) => [poll.key, poll.label])) + ); + setActiveAgents( + Object.fromEntries(snapshot.active_agents.map((agent) => [agent.ticket_id, agent.step])) + ); + } catch (error) { + console.error("Failed to sync runtime activity", error); + } + }, [projectId]); + useEffect(() => { void loadData(); }, [loadData]); + useEffect(() => { + if (!projectId) return; + void syncRuntimeActivity(); + + const intervalId = window.setInterval(() => { + void syncRuntimeActivity(); + }, 5_000); + + return () => window.clearInterval(intervalId); + }, [projectId, syncRuntimeActivity]); + useEffect(() => { if (!projectId) return; @@ -140,6 +168,7 @@ export default function ProjectDashboard() { ...prev, [payload.tracker_id]: payload.tracker_label, })); + void syncRuntimeActivity(); appendActivity( "info", `Polling ${payload.source === "manual" ? "manuel" : "auto"} lancé sur "${payload.tracker_label}".` @@ -154,6 +183,7 @@ export default function ProjectDashboard() { delete next[payload.tracker_id]; return next; }); + void syncRuntimeActivity(); appendActivity( "success", @@ -170,6 +200,7 @@ export default function ProjectDashboard() { delete next[payload.tracker_id]; return next; }); + void syncRuntimeActivity(); appendActivity( "error", @@ -193,6 +224,7 @@ export default function ProjectDashboard() { ...prev, [payload.ticket_id]: payload.step ?? "processing", })); + void syncRuntimeActivity(); appendActivity( "info", `Agent ${payload.step ?? "processing"} lancé pour le ticket #${payload.artifact_id}.` @@ -208,6 +240,7 @@ export default function ProjectDashboard() { delete next[payload.ticket_id]; return next; }); + void syncRuntimeActivity(); appendActivity( "success", `Pipeline agent terminé pour le ticket #${payload.artifact_id}.` @@ -223,6 +256,7 @@ export default function ProjectDashboard() { delete next[payload.ticket_id]; return next; }); + void syncRuntimeActivity(); appendActivity( "error", `Erreur agent sur le ticket #${payload.artifact_id}: ${payload.error ?? "erreur inconnue"}.` @@ -237,6 +271,7 @@ export default function ProjectDashboard() { ...prev, [graylogPollKey]: "Graylog", })); + void syncRuntimeActivity(); appendActivity("info", "Polling Graylog lancé."); }), listen("graylog-subject-triggered", (event) => { @@ -258,6 +293,7 @@ export default function ProjectDashboard() { delete next[graylogPollKey]; return next; }); + void syncRuntimeActivity(); appendActivity( "success", `Polling Graylog terminé (${payload.triggered_count ?? 0} sujet(s) déclenché(s)).` @@ -272,6 +308,7 @@ export default function ProjectDashboard() { delete next[graylogPollKey]; return next; }); + void syncRuntimeActivity(); appendActivity( "error", `Erreur Graylog: ${payload.error ?? "erreur inconnue"}.` @@ -323,7 +360,7 @@ export default function ProjectDashboard() { unlisten(); } }; - }, [projectId, loadData]); + }, [projectId, loadData, syncRuntimeActivity]); async function handleDelete() { if (!projectId) return; diff --git a/src/lib/api.ts b/src/lib/api.ts index f4e5de7..3477cea 100644 --- a/src/lib/api.ts +++ b/src/lib/api.ts @@ -12,6 +12,7 @@ import type { ProjectThroughputStats, Worktree, TicketResult, + RuntimeActivitySnapshot, OrchaiNotification, ProjectModule, LiveSession, @@ -232,6 +233,9 @@ export async function getQueueStatus(projectId: string): Promise { return invoke("get_project_throughput", { projectId }); } +export async function getRuntimeActivity(projectId: string): Promise { + return invoke("get_runtime_activity", { projectId }); +} // Orchestrator export async function getTicketResult(ticketId: string): Promise { diff --git a/src/lib/types.ts b/src/lib/types.ts index f39fa10..98529e3 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -136,6 +136,22 @@ export interface ProjectThroughputStats { avg_lead_time_seconds: number | null; } +export interface RuntimeActivePoll { + key: string; + label: string; +} + +export interface RuntimeActiveAgent { + ticket_id: string; + artifact_id: number; + step: string; +} + +export interface RuntimeActivitySnapshot { + active_polls: RuntimeActivePoll[]; + active_agents: RuntimeActiveAgent[]; +} + export interface Worktree { id: string; ticket_id: string;