fix(dashboard): resync live polling and agent counters

This commit is contained in:
thibaud-lclr 2026-04-20 10:57:15 +02:00
parent b5ba10d857
commit 1f146a3a95
11 changed files with 313 additions and 3 deletions

View file

@ -172,6 +172,7 @@ pub async fn manual_graylog_poll(
&state.http_client, &state.http_client,
&app_handle, &app_handle,
&project_id, &project_id,
&state.activity_state,
) )
.await .await
.map_err(AppError::from) .map_err(AppError::from)

View file

@ -6,8 +6,21 @@ use crate::models::tracker::WatchedTracker;
use crate::services::tuleap_client::TuleapClient; use crate::services::tuleap_client::TuleapClient;
use crate::services::{crypto, filter_engine, notifier}; use crate::services::{crypto, filter_engine, notifier};
use crate::AppState; use crate::AppState;
use serde::Serialize;
use std::collections::HashMap;
use tauri::{Emitter, State}; use tauri::{Emitter, State};
struct PollActivityGuard {
activity_state: crate::services::activity_state::ActivityState,
key: String,
}
impl Drop for PollActivityGuard {
fn drop(&mut self) {
self.activity_state.finish_poll(&self.key);
}
}
#[tauri::command] #[tauri::command]
pub async fn manual_poll( pub async fn manual_poll(
state: State<'_, AppState>, state: State<'_, AppState>,
@ -51,6 +64,15 @@ pub async fn manual_poll(
(tracker, client) (tracker, client)
}; // lock dropped here }; // lock dropped here
let poll_key = format!("tuleap:{}", tracker.id);
state
.activity_state
.start_poll(&poll_key, &tracker.project_id, &tracker.tracker_label);
let _poll_guard = PollActivityGuard {
activity_state: state.activity_state.clone(),
key: poll_key.clone(),
};
let _ = app_handle.emit( let _ = app_handle.emit(
"polling-started", "polling-started",
serde_json::json!({ serde_json::json!({
@ -179,3 +201,98 @@ pub fn get_project_throughput(
let stats = ProcessedTicket::get_project_throughput_stats(&db, &project_id)?; let stats = ProcessedTicket::get_project_throughput_stats(&db, &project_id)?;
Ok(stats) Ok(stats)
} }
#[derive(Debug, Serialize)]
pub struct RuntimeActivePoll {
pub key: String,
pub label: String,
}
#[derive(Debug, Serialize)]
pub struct RuntimeActiveAgent {
pub ticket_id: String,
pub artifact_id: i32,
pub step: String,
}
#[derive(Debug, Serialize)]
pub struct RuntimeActivitySnapshot {
pub active_polls: Vec<RuntimeActivePoll>,
pub active_agents: Vec<RuntimeActiveAgent>,
}
fn step_from_status(status: &str) -> &'static str {
match status {
"Developing" => "developer",
"Analyzing" => "analyst",
_ => "processing",
}
}
#[tauri::command]
pub fn get_runtime_activity(
state: State<'_, AppState>,
project_id: String,
) -> Result<RuntimeActivitySnapshot, AppError> {
let mut active_polls = state
.activity_state
.list_project_polls(&project_id)
.into_iter()
.map(|(key, label)| RuntimeActivePoll { key, label })
.collect::<Vec<_>>();
active_polls.sort_by(|a, b| a.label.to_lowercase().cmp(&b.label.to_lowercase()));
let db = state
.db
.lock()
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
let project_tickets = ProcessedTicket::list_by_project(&db, &project_id)?;
let mut agents_by_ticket_id: HashMap<String, RuntimeActiveAgent> = HashMap::new();
for ticket in &project_tickets {
if ticket.status == "Analyzing" || ticket.status == "Developing" {
agents_by_ticket_id.insert(
ticket.id.clone(),
RuntimeActiveAgent {
ticket_id: ticket.id.clone(),
artifact_id: ticket.artifact_id,
step: step_from_status(&ticket.status).to_string(),
},
);
}
}
for ticket_id in state.process_registry.list_ticket_ids() {
if agents_by_ticket_id.contains_key(&ticket_id) {
continue;
}
let Ok(ticket) = ProcessedTicket::get_by_id(&db, &ticket_id) else {
continue;
};
if ticket.project_id != project_id {
continue;
}
agents_by_ticket_id.insert(
ticket.id.clone(),
RuntimeActiveAgent {
ticket_id: ticket.id.clone(),
artifact_id: ticket.artifact_id,
step: step_from_status(&ticket.status).to_string(),
},
);
}
let mut active_agents = agents_by_ticket_id.into_values().collect::<Vec<_>>();
active_agents.sort_by(|a, b| {
a.artifact_id
.cmp(&b.artifact_id)
.then_with(|| a.ticket_id.cmp(&b.ticket_id))
});
Ok(RuntimeActivitySnapshot {
active_polls,
active_agents,
})
}

View file

@ -12,6 +12,7 @@ pub struct AppState {
pub encryption_key: [u8; 32], pub encryption_key: [u8; 32],
pub http_client: reqwest::Client, pub http_client: reqwest::Client,
pub process_registry: services::process_registry::ProcessRegistry, pub process_registry: services::process_registry::ProcessRegistry,
pub activity_state: services::activity_state::ActivityState,
} }
#[cfg_attr(mobile, tauri::mobile_entry_point)] #[cfg_attr(mobile, tauri::mobile_entry_point)]
@ -31,6 +32,7 @@ pub fn run() {
let http_client = reqwest::Client::new(); let http_client = reqwest::Client::new();
let process_registry = services::process_registry::ProcessRegistry::default(); let process_registry = services::process_registry::ProcessRegistry::default();
let activity_state = services::activity_state::ActivityState::default();
let db_arc = Arc::new(Mutex::new(conn)); let db_arc = Arc::new(Mutex::new(conn));
app.manage(AppState { app.manage(AppState {
@ -38,6 +40,7 @@ pub fn run() {
encryption_key, encryption_key,
http_client: http_client.clone(), http_client: http_client.clone(),
process_registry: process_registry.clone(), process_registry: process_registry.clone(),
activity_state: activity_state.clone(),
}); });
// Start background poller // Start background poller
@ -46,6 +49,7 @@ pub fn run() {
encryption_key, encryption_key,
http_client.clone(), http_client.clone(),
app.handle().clone(), app.handle().clone(),
activity_state.clone(),
); );
services::graylog_poller::start( services::graylog_poller::start(
@ -53,6 +57,7 @@ pub fn run() {
encryption_key, encryption_key,
http_client, http_client,
app.handle().clone(), app.handle().clone(),
activity_state,
); );
// Start agent orchestrator // Start agent orchestrator
@ -99,6 +104,7 @@ pub fn run() {
commands::poller::manual_poll, commands::poller::manual_poll,
commands::poller::get_queue_status, commands::poller::get_queue_status,
commands::poller::get_project_throughput, commands::poller::get_project_throughput,
commands::poller::get_runtime_activity,
commands::notification::list_notifications, commands::notification::list_notifications,
commands::notification::mark_notification_read, commands::notification::mark_notification_read,
commands::notification::mark_all_notifications_read, commands::notification::mark_all_notifications_read,

View file

@ -0,0 +1,78 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
struct ActivePollEntry {
project_id: String,
label: String,
count: usize,
}
#[derive(Clone, Default)]
pub struct ActivityState {
active_polls: Arc<Mutex<HashMap<String, ActivePollEntry>>>,
}
impl ActivityState {
pub fn start_poll(&self, key: &str, project_id: &str, label: &str) {
let mut polls = self
.active_polls
.lock()
.unwrap_or_else(|poison| poison.into_inner());
let entry = polls.entry(key.to_string()).or_insert_with(|| ActivePollEntry {
project_id: project_id.to_string(),
label: label.to_string(),
count: 0,
});
entry.project_id = project_id.to_string();
entry.label = label.to_string();
entry.count += 1;
}
pub fn finish_poll(&self, key: &str) {
let mut polls = self
.active_polls
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if let Some(entry) = polls.get_mut(key) {
if entry.count > 1 {
entry.count -= 1;
} else {
polls.remove(key);
}
}
}
pub fn list_project_polls(&self, project_id: &str) -> Vec<(String, String)> {
let polls = self
.active_polls
.lock()
.unwrap_or_else(|poison| poison.into_inner());
polls
.iter()
.filter(|(_, entry)| entry.project_id == project_id)
.map(|(key, entry)| (key.clone(), entry.label.clone()))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::ActivityState;
#[test]
fn test_activity_state_tracks_concurrent_polls() {
let state = ActivityState::default();
state.start_poll("tuleap:tracker-1", "project-1", "Tracker A");
state.start_poll("tuleap:tracker-1", "project-1", "Tracker A");
assert_eq!(state.list_project_polls("project-1").len(), 1);
state.finish_poll("tuleap:tracker-1");
assert_eq!(state.list_project_polls("project-1").len(), 1);
state.finish_poll("tuleap:tracker-1");
assert_eq!(state.list_project_polls("project-1").len(), 0);
}
}

View file

@ -1,6 +1,7 @@
use crate::models::graylog::{GraylogCredentials, GraylogDetection, GraylogSubject}; use crate::models::graylog::{GraylogCredentials, GraylogDetection, GraylogSubject};
use crate::models::module::{ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE}; use crate::models::module::{ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE};
use crate::models::ticket::ProcessedTicket; use crate::models::ticket::ProcessedTicket;
use crate::services::activity_state::ActivityState;
use crate::services::crypto; use crate::services::crypto;
use crate::services::graylog_client::GraylogClient; use crate::services::graylog_client::GraylogClient;
use crate::services::graylog_scoring::{group_subjects, SubjectAggregate}; use crate::services::graylog_scoring::{group_subjects, SubjectAggregate};
@ -99,11 +100,23 @@ fn emit_polling_error(app_handle: &AppHandle, project_id: &str, error: &str) {
); );
} }
struct PollActivityGuard {
activity_state: ActivityState,
key: String,
}
impl Drop for PollActivityGuard {
fn drop(&mut self) {
self.activity_state.finish_poll(&self.key);
}
}
pub fn start( pub fn start(
db: Arc<Mutex<Connection>>, db: Arc<Mutex<Connection>>,
encryption_key: [u8; 32], encryption_key: [u8; 32],
http_client: reqwest::Client, http_client: reqwest::Client,
app_handle: AppHandle, app_handle: AppHandle,
activity_state: ActivityState,
) { ) {
tauri::async_runtime::spawn(async move { tauri::async_runtime::spawn(async move {
let mut tick = interval(Duration::from_secs(60)); let mut tick = interval(Duration::from_secs(60));
@ -156,6 +169,7 @@ pub fn start(
&http_client, &http_client,
&app_handle, &app_handle,
&project_id, &project_id,
&activity_state,
) )
.await .await
{ {
@ -175,6 +189,7 @@ pub async fn poll_project_once(
http_client: &reqwest::Client, http_client: &reqwest::Client,
app_handle: &AppHandle, app_handle: &AppHandle,
project_id: &str, project_id: &str,
activity_state: &ActivityState,
) -> Result<i32, String> { ) -> Result<i32, String> {
let credentials = { let credentials = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
@ -206,6 +221,12 @@ pub async fn poll_project_once(
"graylog-polling-started", "graylog-polling-started",
serde_json::json!({ "project_id": project_id }), serde_json::json!({ "project_id": project_id }),
); );
let poll_key = format!("graylog:{}", project_id);
activity_state.start_poll(&poll_key, project_id, "Graylog");
let _poll_guard = PollActivityGuard {
activity_state: activity_state.clone(),
key: poll_key,
};
let events = match client let events = match client
.search_relative(&query, credentials.stream_id.as_deref(), range_seconds) .search_relative(&query, credentials.stream_id.as_deref(), range_seconds)

View file

@ -1,3 +1,4 @@
pub mod activity_state;
pub mod agent_runtime; pub mod agent_runtime;
pub mod cli_process; pub mod cli_process;
pub mod crypto; pub mod crypto;

View file

@ -1,6 +1,7 @@
use crate::models::credential::TuleapCredentials; use crate::models::credential::TuleapCredentials;
use crate::models::ticket::ProcessedTicket; use crate::models::ticket::ProcessedTicket;
use crate::models::tracker::WatchedTracker; use crate::models::tracker::WatchedTracker;
use crate::services::activity_state::ActivityState;
use crate::services::tuleap_client::TuleapClient; use crate::services::tuleap_client::TuleapClient;
use crate::services::{crypto, filter_engine, notifier}; use crate::services::{crypto, filter_engine, notifier};
use rusqlite::Connection; use rusqlite::Connection;
@ -13,12 +14,14 @@ pub fn start(
encryption_key: [u8; 32], encryption_key: [u8; 32],
http_client: reqwest::Client, http_client: reqwest::Client,
app_handle: AppHandle, app_handle: AppHandle,
activity_state: ActivityState,
) { ) {
tauri::async_runtime::spawn(async move { tauri::async_runtime::spawn(async move {
let mut tick = interval(Duration::from_secs(60)); let mut tick = interval(Duration::from_secs(60));
loop { loop {
tick.tick().await; tick.tick().await;
poll_all_trackers(&db, &encryption_key, &http_client, &app_handle).await; poll_all_trackers(&db, &encryption_key, &http_client, &app_handle, &activity_state)
.await;
} }
}); });
} }
@ -28,6 +31,7 @@ async fn poll_all_trackers(
encryption_key: &[u8; 32], encryption_key: &[u8; 32],
http_client: &reqwest::Client, http_client: &reqwest::Client,
app_handle: &AppHandle, app_handle: &AppHandle,
activity_state: &ActivityState,
) { ) {
// 1. Read all enabled trackers from DB // 1. Read all enabled trackers from DB
let trackers = { let trackers = {
@ -97,7 +101,7 @@ async fn poll_all_trackers(
TuleapClient::new(http_client, &creds.tuleap_url, &creds.username, &password) TuleapClient::new(http_client, &creds.tuleap_url, &creds.username, &password)
}; };
poll_single_tracker(db, &client, tracker, app_handle).await; poll_single_tracker(db, &client, tracker, app_handle, activity_state).await;
} }
} }
@ -142,7 +146,11 @@ async fn poll_single_tracker(
client: &TuleapClient, client: &TuleapClient,
tracker: &WatchedTracker, tracker: &WatchedTracker,
app_handle: &AppHandle, app_handle: &AppHandle,
activity_state: &ActivityState,
) { ) {
let poll_key = format!("tuleap:{}", tracker.id);
activity_state.start_poll(&poll_key, &tracker.project_id, &tracker.tracker_label);
let _ = app_handle.emit( let _ = app_handle.emit(
"polling-started", "polling-started",
serde_json::json!({ serde_json::json!({
@ -171,6 +179,7 @@ async fn poll_single_tracker(
"error": e, "error": e,
}), }),
); );
activity_state.finish_poll(&poll_key);
return; return;
} }
}; };
@ -184,6 +193,17 @@ async fn poll_single_tracker(
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
eprintln!("poller: failed to lock db for insert: {}", e); eprintln!("poller: failed to lock db for insert: {}", e);
let _ = app_handle.emit(
"polling-error",
serde_json::json!({
"project_id": &tracker.project_id,
"tracker_id": &tracker.id,
"tracker_label": &tracker.tracker_label,
"source": "scheduled",
"error": format!("database lock failed while inserting: {}", e),
}),
);
activity_state.finish_poll(&poll_key);
return; return;
} }
}; };
@ -268,6 +288,7 @@ async fn poll_single_tracker(
"new_tickets_count": new_tickets.len(), "new_tickets_count": new_tickets.len(),
}), }),
); );
activity_state.finish_poll(&poll_key);
} }
#[cfg(test)] #[cfg(test)]

View file

@ -54,6 +54,14 @@ impl ProcessRegistry {
self.cancel_process(&self.ticket_processes, ticket_id).await self.cancel_process(&self.ticket_processes, ticket_id).await
} }
pub fn list_ticket_ids(&self) -> Vec<String> {
let processes = self
.ticket_processes
.lock()
.unwrap_or_else(|poison| poison.into_inner());
processes.keys().cloned().collect()
}
pub fn register_task( pub fn register_task(
&self, &self,
task_id: &str, task_id: &str,

View file

@ -7,6 +7,7 @@ import {
listTrackers, listTrackers,
listProcessedTickets, listProcessedTickets,
getProjectThroughput, getProjectThroughput,
getRuntimeActivity,
} from "../../lib/api"; } from "../../lib/api";
import type { import type {
Project, Project,
@ -95,10 +96,37 @@ export default function ProjectDashboard() {
setThroughput(stats); setThroughput(stats);
}, [projectId]); }, [projectId]);
const syncRuntimeActivity = useCallback(async () => {
if (!projectId) return;
try {
const snapshot = await getRuntimeActivity(projectId);
setActivePolls(
Object.fromEntries(snapshot.active_polls.map((poll) => [poll.key, poll.label]))
);
setActiveAgents(
Object.fromEntries(snapshot.active_agents.map((agent) => [agent.ticket_id, agent.step]))
);
} catch (error) {
console.error("Failed to sync runtime activity", error);
}
}, [projectId]);
useEffect(() => { useEffect(() => {
void loadData(); void loadData();
}, [loadData]); }, [loadData]);
useEffect(() => {
if (!projectId) return;
void syncRuntimeActivity();
const intervalId = window.setInterval(() => {
void syncRuntimeActivity();
}, 5_000);
return () => window.clearInterval(intervalId);
}, [projectId, syncRuntimeActivity]);
useEffect(() => { useEffect(() => {
if (!projectId) return; if (!projectId) return;
@ -140,6 +168,7 @@ export default function ProjectDashboard() {
...prev, ...prev,
[payload.tracker_id]: payload.tracker_label, [payload.tracker_id]: payload.tracker_label,
})); }));
void syncRuntimeActivity();
appendActivity( appendActivity(
"info", "info",
`Polling ${payload.source === "manual" ? "manuel" : "auto"} lancé sur "${payload.tracker_label}".` `Polling ${payload.source === "manual" ? "manuel" : "auto"} lancé sur "${payload.tracker_label}".`
@ -154,6 +183,7 @@ export default function ProjectDashboard() {
delete next[payload.tracker_id]; delete next[payload.tracker_id];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"success", "success",
@ -170,6 +200,7 @@ export default function ProjectDashboard() {
delete next[payload.tracker_id]; delete next[payload.tracker_id];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"error", "error",
@ -193,6 +224,7 @@ export default function ProjectDashboard() {
...prev, ...prev,
[payload.ticket_id]: payload.step ?? "processing", [payload.ticket_id]: payload.step ?? "processing",
})); }));
void syncRuntimeActivity();
appendActivity( appendActivity(
"info", "info",
`Agent ${payload.step ?? "processing"} lancé pour le ticket #${payload.artifact_id}.` `Agent ${payload.step ?? "processing"} lancé pour le ticket #${payload.artifact_id}.`
@ -208,6 +240,7 @@ export default function ProjectDashboard() {
delete next[payload.ticket_id]; delete next[payload.ticket_id];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"success", "success",
`Pipeline agent terminé pour le ticket #${payload.artifact_id}.` `Pipeline agent terminé pour le ticket #${payload.artifact_id}.`
@ -223,6 +256,7 @@ export default function ProjectDashboard() {
delete next[payload.ticket_id]; delete next[payload.ticket_id];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"error", "error",
`Erreur agent sur le ticket #${payload.artifact_id}: ${payload.error ?? "erreur inconnue"}.` `Erreur agent sur le ticket #${payload.artifact_id}: ${payload.error ?? "erreur inconnue"}.`
@ -237,6 +271,7 @@ export default function ProjectDashboard() {
...prev, ...prev,
[graylogPollKey]: "Graylog", [graylogPollKey]: "Graylog",
})); }));
void syncRuntimeActivity();
appendActivity("info", "Polling Graylog lancé."); appendActivity("info", "Polling Graylog lancé.");
}), }),
listen<GraylogSubjectTriggeredPayload>("graylog-subject-triggered", (event) => { listen<GraylogSubjectTriggeredPayload>("graylog-subject-triggered", (event) => {
@ -258,6 +293,7 @@ export default function ProjectDashboard() {
delete next[graylogPollKey]; delete next[graylogPollKey];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"success", "success",
`Polling Graylog terminé (${payload.triggered_count ?? 0} sujet(s) déclenché(s)).` `Polling Graylog terminé (${payload.triggered_count ?? 0} sujet(s) déclenché(s)).`
@ -272,6 +308,7 @@ export default function ProjectDashboard() {
delete next[graylogPollKey]; delete next[graylogPollKey];
return next; return next;
}); });
void syncRuntimeActivity();
appendActivity( appendActivity(
"error", "error",
`Erreur Graylog: ${payload.error ?? "erreur inconnue"}.` `Erreur Graylog: ${payload.error ?? "erreur inconnue"}.`
@ -323,7 +360,7 @@ export default function ProjectDashboard() {
unlisten(); unlisten();
} }
}; };
}, [projectId, loadData]); }, [projectId, loadData, syncRuntimeActivity]);
async function handleDelete() { async function handleDelete() {
if (!projectId) return; if (!projectId) return;

View file

@ -12,6 +12,7 @@ import type {
ProjectThroughputStats, ProjectThroughputStats,
Worktree, Worktree,
TicketResult, TicketResult,
RuntimeActivitySnapshot,
OrchaiNotification, OrchaiNotification,
ProjectModule, ProjectModule,
LiveSession, LiveSession,
@ -232,6 +233,9 @@ export async function getQueueStatus(projectId: string): Promise<ProcessedTicket
export async function getProjectThroughput(projectId: string): Promise<ProjectThroughputStats> { export async function getProjectThroughput(projectId: string): Promise<ProjectThroughputStats> {
return invoke("get_project_throughput", { projectId }); return invoke("get_project_throughput", { projectId });
} }
export async function getRuntimeActivity(projectId: string): Promise<RuntimeActivitySnapshot> {
return invoke("get_runtime_activity", { projectId });
}
// Orchestrator // Orchestrator
export async function getTicketResult(ticketId: string): Promise<TicketResult> { export async function getTicketResult(ticketId: string): Promise<TicketResult> {

View file

@ -136,6 +136,22 @@ export interface ProjectThroughputStats {
avg_lead_time_seconds: number | null; avg_lead_time_seconds: number | null;
} }
export interface RuntimeActivePoll {
key: string;
label: string;
}
export interface RuntimeActiveAgent {
ticket_id: string;
artifact_id: number;
step: string;
}
export interface RuntimeActivitySnapshot {
active_polls: RuntimeActivePoll[];
active_agents: RuntimeActiveAgent[];
}
export interface Worktree { export interface Worktree {
id: string; id: string;
ticket_id: string; ticket_id: string;