From d561578e78046993e5794c863aff77314351d12f Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Fri, 17 Apr 2026 15:35:57 +0200 Subject: [PATCH] feat(graylog): add client commands and source-aware poller flow --- src-tauri/src/commands/graylog.rs | 209 ++++++++++++++++ src-tauri/src/commands/mod.rs | 1 + src-tauri/src/lib.rs | 14 ++ src-tauri/src/models/graylog.rs | 25 ++ src-tauri/src/services/graylog_client.rs | 258 +++++++++++++++++++ src-tauri/src/services/graylog_poller.rs | 305 +++++++++++++++++++++++ src-tauri/src/services/mod.rs | 2 + src-tauri/src/services/orchestrator.rs | 194 ++++++++++---- 8 files changed, 958 insertions(+), 50 deletions(-) create mode 100644 src-tauri/src/commands/graylog.rs create mode 100644 src-tauri/src/services/graylog_client.rs create mode 100644 src-tauri/src/services/graylog_poller.rs diff --git a/src-tauri/src/commands/graylog.rs b/src-tauri/src/commands/graylog.rs new file mode 100644 index 0000000..fd0c4b0 --- /dev/null +++ b/src-tauri/src/commands/graylog.rs @@ -0,0 +1,209 @@ +use crate::error::AppError; +use crate::models::graylog::{ + GraylogCredentials, GraylogCredentialsSafe, GraylogDetection, GraylogSubject, +}; +use crate::services::crypto; +use crate::services::graylog_client::GraylogClient; +use crate::services::graylog_poller; +use crate::AppState; +use tauri::State; + +fn validate_input( + base_url: &str, + analyst_agent_id: &str, + developer_agent_id: &str, + polling_interval_minutes: i32, + lookback_minutes: i32, + score_threshold: i32, +) -> Result<(String, String, String), AppError> { + let base_url = base_url.trim().to_string(); + let analyst_agent_id = analyst_agent_id.trim().to_string(); + let developer_agent_id = developer_agent_id.trim().to_string(); + + if base_url.is_empty() { + return Err(AppError::from("Graylog URL is required".to_string())); + } + if analyst_agent_id.is_empty() { + return Err(AppError::from("Analyst agent is required".to_string())); + } + if developer_agent_id.is_empty() { + return Err(AppError::from("Developer agent is required".to_string())); + } + if polling_interval_minutes <= 0 { + return Err(AppError::from( + "Polling interval must be strictly positive".to_string(), + )); + } + if lookback_minutes <= 0 { + return Err(AppError::from( + "Lookback window must be strictly positive".to_string(), + )); + } + if !(1..=100).contains(&score_threshold) { + return Err(AppError::from( + "Score threshold must be between 1 and 100".to_string(), + )); + } + + Ok((base_url, analyst_agent_id, developer_agent_id)) +} + +#[tauri::command] +#[allow(clippy::too_many_arguments)] +pub fn set_graylog_credentials( + state: State<'_, AppState>, + project_id: String, + base_url: String, + api_token: String, + analyst_agent_id: String, + developer_agent_id: String, + stream_id: Option, + query_filter: String, + polling_interval_minutes: i32, + lookback_minutes: i32, + score_threshold: i32, +) -> Result { + let (base_url, analyst_agent_id, developer_agent_id) = validate_input( + &base_url, + &analyst_agent_id, + &developer_agent_id, + polling_interval_minutes, + lookback_minutes, + score_threshold, + )?; + + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + let token_encrypted = if api_token.trim().is_empty() { + match GraylogCredentials::get_by_project(&db, &project_id)? { + Some(existing) => existing.api_token_encrypted, + None => { + return Err(AppError::from( + "API token is required for initial setup".to_string(), + )); + } + } + } else { + crypto::encrypt(&state.encryption_key, api_token.trim()).map_err(AppError::from)? + }; + + let creds = GraylogCredentials::upsert_for_project( + &db, + &project_id, + &base_url, + &token_encrypted, + &analyst_agent_id, + &developer_agent_id, + stream_id.as_deref().map(str::trim).filter(|v| !v.is_empty()), + query_filter.trim(), + polling_interval_minutes, + lookback_minutes, + score_threshold, + )?; + + Ok(creds.to_safe()) +} + +#[tauri::command] +pub fn get_graylog_credentials( + state: State<'_, AppState>, + project_id: String, +) -> Result, AppError> { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + Ok(GraylogCredentials::get_by_project(&db, &project_id)?.map(|creds| creds.to_safe())) +} + +#[tauri::command] +pub fn delete_graylog_credentials( + state: State<'_, AppState>, + project_id: String, +) -> Result<(), AppError> { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + GraylogCredentials::delete_for_project(&db, &project_id)?; + Ok(()) +} + +#[tauri::command] +pub async fn test_graylog_connection( + state: State<'_, AppState>, + project_id: String, +) -> Result { + let (base_url, token) = { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + let creds = GraylogCredentials::get_by_project(&db, &project_id)? + .ok_or_else(|| AppError::from("No Graylog credentials configured".to_string()))?; + + let token = crypto::decrypt(&state.encryption_key, &creds.api_token_encrypted) + .map_err(AppError::from)?; + + (creds.base_url, token) + }; + + let client = GraylogClient::new(&state.http_client, &base_url, &token); + client.test_connection().await.map_err(AppError::from)?; + + Ok("Connection successful".to_string()) +} + +#[tauri::command] +pub async fn manual_graylog_poll( + state: State<'_, AppState>, + app_handle: tauri::AppHandle, + project_id: String, +) -> Result { + graylog_poller::poll_project_once( + &state.db, + &state.encryption_key, + &state.http_client, + &app_handle, + &project_id, + ) + .await + .map_err(AppError::from) +} + +#[tauri::command] +pub fn list_graylog_subjects( + state: State<'_, AppState>, + project_id: String, +) -> Result, AppError> { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + Ok(GraylogSubject::list_by_project(&db, &project_id)?) +} + +#[tauri::command] +pub fn list_graylog_detections( + state: State<'_, AppState>, + project_id: String, + subject_id: Option, +) -> Result, AppError> { + let db = state + .db + .lock() + .map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?; + + Ok(GraylogDetection::list_by_project( + &db, + &project_id, + subject_id.as_deref(), + )?) +} diff --git a/src-tauri/src/commands/mod.rs b/src-tauri/src/commands/mod.rs index 24271a7..f1b0e85 100644 --- a/src-tauri/src/commands/mod.rs +++ b/src-tauri/src/commands/mod.rs @@ -1,5 +1,6 @@ pub mod agent; pub mod credential; +pub mod graylog; pub mod live_agent; pub mod module; pub mod notification; diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 20d0830..5dbf874 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -42,6 +42,13 @@ pub fn run() { // Start background poller services::poller::start( + db_arc.clone(), + encryption_key, + http_client.clone(), + app.handle().clone(), + ); + + services::graylog_poller::start( db_arc.clone(), encryption_key, http_client, @@ -76,6 +83,13 @@ pub fn run() { commands::credential::get_tuleap_credentials, commands::credential::delete_tuleap_credentials, commands::credential::test_tuleap_connection, + commands::graylog::set_graylog_credentials, + commands::graylog::get_graylog_credentials, + commands::graylog::delete_graylog_credentials, + commands::graylog::test_graylog_connection, + commands::graylog::manual_graylog_poll, + commands::graylog::list_graylog_subjects, + commands::graylog::list_graylog_detections, commands::tracker::add_tracker, commands::tracker::list_trackers, commands::tracker::update_tracker, diff --git a/src-tauri/src/models/graylog.rs b/src-tauri/src/models/graylog.rs index a8cc541..f29f4a3 100644 --- a/src-tauri/src/models/graylog.rs +++ b/src-tauri/src/models/graylog.rs @@ -128,6 +128,7 @@ fn detection_from_row(row: &rusqlite::Row) -> rusqlite::Result } impl GraylogCredentials { + #[allow(clippy::too_many_arguments)] pub fn upsert_for_project( conn: &Connection, project_id: &str, @@ -219,6 +220,30 @@ impl GraylogCredentials { .optional() } + pub fn list_all(conn: &Connection) -> Result> { + let mut stmt = conn.prepare( + "SELECT + id, + project_id, + base_url, + api_token_encrypted, + analyst_agent_id, + developer_agent_id, + stream_id, + query_filter, + polling_interval_minutes, + lookback_minutes, + score_threshold, + created_at, + updated_at + FROM graylog_credentials + ORDER BY updated_at DESC, id DESC", + )?; + + let rows = stmt.query_map([], credentials_from_row)?; + rows.collect() + } + pub fn delete_for_project(conn: &Connection, project_id: &str) -> Result<()> { conn.execute( "DELETE FROM graylog_credentials WHERE project_id = ?1", diff --git a/src-tauri/src/services/graylog_client.rs b/src-tauri/src/services/graylog_client.rs new file mode 100644 index 0000000..a65d4e7 --- /dev/null +++ b/src-tauri/src/services/graylog_client.rs @@ -0,0 +1,258 @@ +use crate::services::graylog_scoring::GraylogEvent; +use serde_json::Value; +use std::time::Instant; +use tokio::time::{sleep, Duration}; + +pub struct GraylogClient { + http: reqwest::Client, + base_url: String, + token: String, +} + +impl GraylogClient { + pub fn new(http: &reqwest::Client, base_url: &str, token: &str) -> Self { + Self { + http: http.clone(), + base_url: base_url.trim_end_matches('/').to_string(), + token: token.to_string(), + } + } + + async fn send_get(&self, url: &str) -> Result { + const MAX_ATTEMPTS: u32 = 3; + const BASE_DELAY_MS: u64 = 500; + + for attempt in 1..=MAX_ATTEMPTS { + let started_at = Instant::now(); + eprintln!( + "[graylog] -> GET {} (attempt {}/{})", + url, attempt, MAX_ATTEMPTS + ); + + let response = self + .http + .get(url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("X-Requested-By", "orchai") + .send() + .await; + + match response { + Ok(resp) => { + let status = resp.status(); + eprintln!( + "[graylog] <- GET {} | status={} | {}ms", + url, + status, + started_at.elapsed().as_millis() + ); + + if (status == reqwest::StatusCode::TOO_MANY_REQUESTS + || status.is_server_error()) + && attempt < MAX_ATTEMPTS + { + let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt - 1); + eprintln!( + "[graylog] ~~ retry GET {} in {}ms (status={})", + url, delay_ms, status + ); + sleep(Duration::from_millis(delay_ms)).await; + continue; + } + + return Ok(resp); + } + Err(err) => { + eprintln!( + "[graylog] xx GET {} | error={} | {}ms", + url, + err, + started_at.elapsed().as_millis() + ); + + if attempt < MAX_ATTEMPTS { + let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt - 1); + eprintln!( + "[graylog] ~~ retry GET {} in {}ms (error={})", + url, delay_ms, err + ); + sleep(Duration::from_millis(delay_ms)).await; + continue; + } + + return Err(format!("graylog request failed: {}", err)); + } + } + } + + Err("graylog request failed after retries".to_string()) + } + + pub async fn test_connection(&self) -> Result<(), String> { + let url = format!("{}/api/system", self.base_url); + let resp = self.send_get(&url).await?; + + if resp.status().is_success() { + Ok(()) + } else { + Err(format!("graylog connection test failed: HTTP {}", resp.status())) + } + } + + pub async fn search_relative( + &self, + query: &str, + stream_id: Option<&str>, + range_seconds: i32, + ) -> Result, String> { + let normalized_range = range_seconds.max(60); + let mut url = format!( + "{}/api/search/universal/relative?query={}&range={}&limit=500", + self.base_url, + urlencoding::encode(query), + normalized_range + ); + + if let Some(stream) = stream_id.map(str::trim).filter(|value| !value.is_empty()) { + url.push_str("&streams="); + url.push_str(&urlencoding::encode(stream)); + } + + let resp = self.send_get(&url).await?; + if !resp.status().is_success() { + return Err(format!("graylog search failed: HTTP {}", resp.status())); + } + + let body: Value = resp + .json() + .await + .map_err(|e| format!("invalid graylog JSON: {}", e))?; + + Ok(parse_search_response(&body)) + } +} + +fn level_to_string(value: &Value) -> String { + match value { + Value::String(s) => s.to_string(), + Value::Number(n) => { + let level = n.as_i64().unwrap_or(6); + match level { + i64::MIN..=2 => "critical".to_string(), + 3 => "error".to_string(), + 4 => "warning".to_string(), + _ => "info".to_string(), + } + } + other => other.to_string(), + } +} + +pub fn parse_search_response(body: &Value) -> Vec { + let rows = body + .get("messages") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + + rows.into_iter() + .filter_map(|row| { + let message = row.get("message")?; + + let timestamp = message + .get("timestamp") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + let source = message + .get("source") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + let level = message + .get("level") + .map(level_to_string) + .unwrap_or_else(|| "".to_string()); + let msg = message + .get("message") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + let service = message + .get("service") + .and_then(Value::as_str) + .map(str::to_string); + + if timestamp.is_empty() || source.is_empty() || msg.is_empty() { + return None; + } + + Some(GraylogEvent { + timestamp, + source, + service, + level, + message: msg, + raw: message.clone(), + }) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_search_response_extracts_events() { + let payload = serde_json::json!({ + "messages": [ + { "message": { + "timestamp": "2026-04-17T10:00:00.000Z", + "source": "api-1", + "level": "error", + "message": "timeout id=42", + "service": "api" + } } + ] + }); + + let events = parse_search_response(&payload); + assert_eq!(events.len(), 1); + assert_eq!(events[0].source, "api-1"); + assert_eq!(events[0].service.as_deref(), Some("api")); + } + + #[test] + fn test_parse_search_response_maps_numeric_level() { + let payload = serde_json::json!({ + "messages": [ + { "message": { + "timestamp": "2026-04-17T10:00:00.000Z", + "source": "worker-1", + "level": 4, + "message": "queue lag" + } } + ] + }); + + let events = parse_search_response(&payload); + assert_eq!(events.len(), 1); + assert_eq!(events[0].level, "warning"); + } + + #[test] + fn test_parse_search_response_skips_incomplete_rows() { + let payload = serde_json::json!({ + "messages": [ + { "message": { + "timestamp": "2026-04-17T10:00:00.000Z", + "source": "api-1" + } } + ] + }); + + let events = parse_search_response(&payload); + assert!(events.is_empty()); + } +} diff --git a/src-tauri/src/services/graylog_poller.rs b/src-tauri/src/services/graylog_poller.rs new file mode 100644 index 0000000..076c230 --- /dev/null +++ b/src-tauri/src/services/graylog_poller.rs @@ -0,0 +1,305 @@ +use crate::models::graylog::{GraylogCredentials, GraylogDetection, GraylogSubject}; +use crate::models::module::{ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE}; +use crate::models::ticket::ProcessedTicket; +use crate::services::crypto; +use crate::services::graylog_client::GraylogClient; +use crate::services::graylog_scoring::{group_subjects, SubjectAggregate}; +use rusqlite::Connection; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; +use tauri::{AppHandle, Emitter}; +use tokio::time::{interval, Duration}; + +fn is_ticket_active(status: &str) -> bool { + matches!(status, "Pending" | "Analyzing" | "Developing") +} + +fn should_trigger_subject(score: i32, threshold: i32, has_active_ticket: bool) -> bool { + score >= threshold && !has_active_ticket +} + +fn synthetic_artifact_id(subject_key: &str) -> i32 { + // Stable FNV-1a hash so the synthetic id remains deterministic across runs. + const FNV_OFFSET: u64 = 0xcbf29ce484222325; + const FNV_PRIME: u64 = 0x100000001b3; + + let mut hash = FNV_OFFSET; + for byte in subject_key.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + + let raw = (hash & 0x3fff_ffff) as i32; + -raw.max(1) +} + +fn subject_payload(aggregate: &SubjectAggregate) -> String { + serde_json::json!({ + "source": aggregate.source, + "normalized_message": aggregate.normalized_message, + "counts": { + "critical": aggregate.counts.critical, + "error": aggregate.counts.error, + "warning": aggregate.counts.warning, + "total": aggregate.total_count, + }, + "last_seen_at": aggregate.last_seen_at, + "score": aggregate.score, + "samples": aggregate.sample_events, + }) + .to_string() +} + +fn emit_polling_error(app_handle: &AppHandle, project_id: &str, error: &str) { + let _ = app_handle.emit( + "graylog-polling-error", + serde_json::json!({ + "project_id": project_id, + "error": error, + }), + ); +} + +pub fn start( + db: Arc>, + encryption_key: [u8; 32], + http_client: reqwest::Client, + app_handle: AppHandle, +) { + tauri::async_runtime::spawn(async move { + let mut tick = interval(Duration::from_secs(60)); + let mut last_polled_at: HashMap> = HashMap::new(); + + loop { + tick.tick().await; + + let credentials = { + let conn = match db.lock() { + Ok(conn) => conn, + Err(err) => { + eprintln!("graylog_poller: failed to lock db: {}", err); + continue; + } + }; + + match GraylogCredentials::list_all(&conn) { + Ok(values) => values, + Err(err) => { + eprintln!("graylog_poller: failed to load credentials: {}", err); + continue; + } + } + }; + + let now = chrono::Utc::now(); + let known_projects: HashSet = + credentials.iter().map(|item| item.project_id.clone()).collect(); + last_polled_at.retain(|project_id, _| known_projects.contains(project_id)); + + for credential in credentials { + let project_id = credential.project_id.clone(); + let polling_interval_minutes = credential.polling_interval_minutes.max(1) as i64; + + let due = match last_polled_at.get(&project_id) { + Some(last_at) => { + now.signed_duration_since(*last_at).num_minutes() >= polling_interval_minutes + } + None => true, + }; + + if !due { + continue; + } + + if let Err(err) = poll_project_once( + &db, + &encryption_key, + &http_client, + &app_handle, + &project_id, + ) + .await + { + eprintln!("graylog_poller: project {} failed: {}", project_id, err); + emit_polling_error(&app_handle, &project_id, &err); + } + + last_polled_at.insert(project_id, now); + } + } + }); +} + +pub async fn poll_project_once( + db: &Arc>, + encryption_key: &[u8; 32], + http_client: &reqwest::Client, + app_handle: &AppHandle, + project_id: &str, +) -> Result { + let credentials = { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + + let module_enabled = ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE) + .map_err(|e| format!("module lookup failed: {}", e))?; + + if !module_enabled { + return Ok(0); + } + + GraylogCredentials::get_by_project(&conn, project_id) + .map_err(|e| format!("credentials lookup failed: {}", e))? + .ok_or_else(|| "No Graylog credentials configured".to_string())? + }; + + let token = crypto::decrypt(encryption_key, &credentials.api_token_encrypted) + .map_err(|e| format!("token decrypt failed: {}", e))?; + + let client = GraylogClient::new(http_client, &credentials.base_url, &token); + let query = if credentials.query_filter.trim().is_empty() { + "level:(critical OR error OR warning)".to_string() + } else { + credentials.query_filter.clone() + }; + let range_seconds = credentials.lookback_minutes.max(1) * 60; + + let _ = app_handle.emit( + "graylog-polling-started", + serde_json::json!({ "project_id": project_id }), + ); + + let events = match client + .search_relative(&query, credentials.stream_id.as_deref(), range_seconds) + .await + { + Ok(events) => events, + Err(err) => { + emit_polling_error(app_handle, project_id, &err); + return Err(err); + } + }; + + let now = chrono::Utc::now(); + let window_start = (now - chrono::Duration::minutes(credentials.lookback_minutes as i64)) + .to_rfc3339(); + let window_end = now.to_rfc3339(); + + let aggregates = group_subjects(&events, now); + let mut triggered_count = 0i32; + + for aggregate in aggregates { + let (subject, has_active_ticket) = { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + + let mut subject = GraylogSubject::upsert_subject( + &conn, + project_id, + &aggregate.subject_key, + &aggregate.source, + &aggregate.normalized_message, + &aggregate.last_seen_at, + aggregate.score, + ) + .map_err(|e| format!("upsert subject failed: {}", e))?; + + let mut has_active_ticket = false; + if let Some(active_ticket_id) = subject.active_ticket_id.clone() { + match ProcessedTicket::get_by_id(&conn, &active_ticket_id) { + Ok(ticket) if is_ticket_active(&ticket.status) => { + has_active_ticket = true; + } + _ => { + GraylogSubject::set_active_ticket(&conn, &subject.id, None, "idle") + .map_err(|e| format!("clear stale subject ticket failed: {}", e))?; + subject.active_ticket_id = None; + subject.status = "idle".to_string(); + } + } + } + + (subject, has_active_ticket) + }; + + let should_trigger = should_trigger_subject( + aggregate.score, + credentials.score_threshold, + has_active_ticket, + ); + + let triggered_ticket_id = if should_trigger { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + let ticket = ProcessedTicket::insert_external( + &conn, + project_id, + "graylog", + Some(&subject.id), + synthetic_artifact_id(&aggregate.subject_key), + &format!( + "[Graylog] {} - {}", + aggregate.source, aggregate.normalized_message + ), + &subject_payload(&aggregate), + ) + .map_err(|e| format!("insert graylog ticket failed: {}", e))?; + + GraylogSubject::set_active_ticket(&conn, &subject.id, Some(&ticket.id), "queued") + .map_err(|e| format!("set active ticket failed: {}", e))?; + + triggered_count += 1; + let _ = app_handle.emit( + "graylog-subject-triggered", + serde_json::json!({ + "project_id": project_id, + "subject_id": subject.id, + "ticket_id": ticket.id, + "score": aggregate.score, + }), + ); + + Some(ticket.id) + } else { + None + }; + + { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + GraylogDetection::insert( + &conn, + &subject.id, + &window_start, + &window_end, + aggregate.counts.critical, + aggregate.counts.error, + aggregate.counts.warning, + aggregate.total_count, + &aggregate.last_seen_at, + aggregate.score, + should_trigger, + triggered_ticket_id.as_deref(), + ) + .map_err(|e| format!("insert detection failed: {}", e))?; + } + } + + let _ = app_handle.emit( + "graylog-polling-finished", + serde_json::json!({ + "project_id": project_id, + "triggered_count": triggered_count, + }), + ); + + Ok(triggered_count) +} + +#[cfg(test)] +mod tests { + use super::should_trigger_subject; + + #[test] + fn test_should_trigger_subject_respects_active_ticket() { + assert!(should_trigger_subject(82, 70, false)); + assert!(!should_trigger_subject(82, 70, true)); + assert!(!should_trigger_subject(60, 70, false)); + } +} diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index 9944d1e..6cfe0f9 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -2,6 +2,8 @@ pub mod agent_runtime; pub mod cli_process; pub mod crypto; pub mod filter_engine; +pub mod graylog_client; +pub mod graylog_poller; pub mod graylog_scoring; pub mod notifier; pub mod orchestrator; diff --git a/src-tauri/src/services/orchestrator.rs b/src-tauri/src/services/orchestrator.rs index 3b06a1f..b710d50 100644 --- a/src-tauri/src/services/orchestrator.rs +++ b/src-tauri/src/services/orchestrator.rs @@ -1,5 +1,8 @@ use crate::models::agent::{Agent, AgentRole}; -use crate::models::module::{ProjectModule, MODULE_TULEAP_AUTO_RESOLVE}; +use crate::models::graylog::GraylogCredentials; +use crate::models::module::{ + ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE, MODULE_TULEAP_AUTO_RESOLVE, +}; use crate::models::project::Project; use crate::models::ticket::ProcessedTicket; use crate::models::tracker::WatchedTracker; @@ -21,13 +24,16 @@ pub enum Verdict { } pub fn build_analyst_prompt(ticket: &ProcessedTicket, project: &Project) -> String { + let source_ref = ticket.source_ref.as_deref().unwrap_or("-"); format!( - r#"Tu es un analyste technique. Voici un ticket Tuleap a analyser. + r#"Tu es un analyste technique. Voici un ticket a analyser. ## Ticket - ID: {artifact_id} - Titre: {title} - Donnees: {data} +- Source: {source} +- Source ref: {source_ref} ## Contexte - Projet: {project_name} @@ -46,6 +52,8 @@ Termine ton rapport par un de ces verdicts sur une ligne separee: artifact_id = ticket.artifact_id, title = ticket.artifact_title, data = ticket.artifact_data, + source = ticket.source, + source_ref = source_ref, project_name = project.name, project_path = project.path, base_branch = project.base_branch, @@ -58,6 +66,7 @@ pub fn build_developer_prompt( analyst_report: &str, worktree_path: &str, ) -> String { + let source_ref = ticket.source_ref.as_deref().unwrap_or("-"); format!( r#"Tu es un developpeur. Tu dois corriger un bug ou implementer une fonctionnalite d'apres l'analyse suivante. @@ -67,6 +76,8 @@ pub fn build_developer_prompt( ## Ticket - ID: {artifact_id} - Titre: {title} +- Source: {source} +- Source ref: {source_ref} ## Contexte - Projet: {project_name} @@ -80,6 +91,8 @@ pub fn build_developer_prompt( analyst_report = analyst_report, artifact_id = ticket.artifact_id, title = ticket.artifact_title, + source = ticket.source, + source_ref = source_ref, project_name = project.name, worktree_path = worktree_path, base_branch = project.base_branch, @@ -254,28 +267,56 @@ async fn process_ticket( app_handle: &AppHandle, process_registry: &ProcessRegistry, ) -> Result { - let (ticket, tracker, project) = { + let (ticket, project, tracker) = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let pending = ProcessedTicket::list_pending(&conn) .map_err(|e| format!("list_pending failed: {}", e))?; - let mut selected: Option<(ProcessedTicket, WatchedTracker, Project)> = None; + let mut selected: Option<(ProcessedTicket, Project, Option)> = None; for ticket in pending { - let Some(tracker_id) = ticket.tracker_id.as_deref() else { - continue; - }; - - let tracker = WatchedTracker::get_by_id(&conn, tracker_id) - .map_err(|e| format!("get tracker failed: {}", e))?; - let project = Project::get_by_id(&conn, &tracker.project_id) + let project = Project::get_by_id(&conn, &ticket.project_id) .map_err(|e| format!("get project failed: {}", e))?; - let enabled = ProjectModule::is_enabled(&conn, &project.id, MODULE_TULEAP_AUTO_RESOLVE) - .map_err(|e| format!("module lookup failed: {}", e))?; - if enabled { - selected = Some((ticket, tracker, project)); - break; + match ticket.source.as_str() { + "tuleap" => { + let tracker = match ticket.tracker_id.as_deref() { + Some(tracker_id) => Some( + WatchedTracker::get_by_id(&conn, tracker_id) + .map_err(|e| format!("get tracker failed: {}", e))?, + ), + None => None, + }; + + let enabled = ProjectModule::is_enabled( + &conn, + &project.id, + MODULE_TULEAP_AUTO_RESOLVE, + ) + .map_err(|e| format!("module lookup failed: {}", e))?; + if enabled { + selected = Some((ticket, project, tracker)); + break; + } + } + "graylog" => { + let enabled = ProjectModule::is_enabled( + &conn, + &project.id, + MODULE_GRAYLOG_AUTO_RESOLVE, + ) + .map_err(|e| format!("module lookup failed: {}", e))?; + if enabled { + selected = Some((ticket, project, None)); + break; + } + } + _ => { + eprintln!( + "orchestrator: unsupported ticket source '{}' for ticket {}", + ticket.source, ticket.id + ); + } } } @@ -286,50 +327,103 @@ async fn process_ticket( }; let (analyst_agent, developer_agent) = { - if tracker.status != "valid" { + let (analyst_id, developer_id) = if ticket.source == "graylog" { + let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; + let config = match GraylogCredentials::get_by_project(&conn, &project.id) + .map_err(|e| format!("graylog credentials lookup failed: {}", e))? + { + Some(value) => value, + None => { + drop(conn); + record_ticket_error( + db, + app_handle, + &project.id, + &ticket.id, + ticket.artifact_id, + "Graylog credentials are missing.", + ); + return Ok(true); + } + }; + drop(conn); + ( + config.analyst_agent_id.to_string(), + config.developer_agent_id.to_string(), + ) + } else if ticket.source == "tuleap" { + let tracker = match &tracker { + Some(value) => value, + None => { + record_ticket_error( + db, + app_handle, + &project.id, + &ticket.id, + ticket.artifact_id, + "Missing tracker reference for Tuleap ticket.", + ); + return Ok(true); + } + }; + + if tracker.status != "valid" { + record_ticket_error( + db, + app_handle, + &project.id, + &ticket.id, + ticket.artifact_id, + "Tracker is invalid. Configure analyst and developer agents.", + ); + return Ok(true); + } + + let analyst_id = match tracker.analyst_agent_id.as_deref() { + Some(id) => id.to_string(), + None => { + record_ticket_error( + db, + app_handle, + &project.id, + &ticket.id, + ticket.artifact_id, + "Tracker has no analyst agent configured.", + ); + return Ok(true); + } + }; + + let developer_id = match tracker.developer_agent_id.as_deref() { + Some(id) => id.to_string(), + None => { + record_ticket_error( + db, + app_handle, + &project.id, + &ticket.id, + ticket.artifact_id, + "Tracker has no developer agent configured.", + ); + return Ok(true); + } + }; + + (analyst_id, developer_id) + } else { record_ticket_error( db, app_handle, &project.id, &ticket.id, ticket.artifact_id, - "Tracker is invalid. Configure analyst and developer agents.", + &format!("Unsupported ticket source '{}'.", ticket.source), ); return Ok(true); - } - - let analyst_id = match tracker.analyst_agent_id.as_deref() { - Some(id) => id, - None => { - record_ticket_error( - db, - app_handle, - &project.id, - &ticket.id, - ticket.artifact_id, - "Tracker has no analyst agent configured.", - ); - return Ok(true); - } - }; - - let developer_id = match tracker.developer_agent_id.as_deref() { - Some(id) => id, - None => { - record_ticket_error( - db, - app_handle, - &project.id, - &ticket.id, - ticket.artifact_id, - "Tracker has no developer agent configured.", - ); - return Ok(true); - } }; let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; - let analyst_agent = match Agent::get_by_id(&conn, analyst_id) { + let analyst_agent = match Agent::get_by_id(&conn, &analyst_id) { Ok(agent) => agent, Err(_) => { drop(conn); @@ -345,7 +439,7 @@ async fn process_ticket( } }; - let developer_agent = match Agent::get_by_id(&conn, developer_id) { + let developer_agent = match Agent::get_by_id(&conn, &developer_id) { Ok(agent) => agent, Err(_) => { drop(conn);