From ff0921217732468d4a2864d48dd27c0fd3a41e69 Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Fri, 17 Apr 2026 15:28:19 +0200 Subject: [PATCH] feat(graylog): add deterministic scoring and subject grouping --- src-tauri/Cargo.lock | 2 + src-tauri/Cargo.toml | 2 + src-tauri/src/services/graylog_scoring.rs | 296 ++++++++++++++++++++++ src-tauri/src/services/mod.rs | 1 + 4 files changed, 301 insertions(+) create mode 100644 src-tauri/src/services/graylog_scoring.rs diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index d8471b8..0aa7009 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -2686,6 +2686,7 @@ dependencies = [ "chrono", "dirs 5.0.1", "rand 0.8.5", + "regex", "reqwest 0.12.28", "rusqlite", "serde", @@ -2696,6 +2697,7 @@ dependencies = [ "tauri-plugin-notification", "tempfile", "tokio", + "urlencoding", "uuid", ] diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index f6a9c83..25fab7a 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -32,6 +32,8 @@ tokio = { version = "1", features = ["time", "sync", "macros", "process", "io-ut aes-gcm = "0.10" rand = "0.8" base64 = "0.22" +regex = "1" +urlencoding = "2" [dev-dependencies] tempfile = "3" diff --git a/src-tauri/src/services/graylog_scoring.rs b/src-tauri/src/services/graylog_scoring.rs new file mode 100644 index 0000000..c42002a --- /dev/null +++ b/src-tauri/src/services/graylog_scoring.rs @@ -0,0 +1,296 @@ +use regex::Regex; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::OnceLock; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GraylogEvent { + pub timestamp: String, + pub source: String, + pub service: Option, + pub level: String, + pub message: String, + pub raw: serde_json::Value, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct SeverityCounts { + pub critical: i32, + pub error: i32, + pub warning: i32, +} + +#[derive(Debug, Clone)] +pub struct SubjectAggregate { + pub subject_key: String, + pub source: String, + pub normalized_message: String, + pub counts: SeverityCounts, + pub total_count: i32, + pub last_seen_age_minutes: i64, + pub last_seen_at: String, + pub sample_events: Vec, + pub score: i32, +} + +fn uuid_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| { + Regex::new(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\b") + .expect("valid uuid regex") + }) +} + +fn ip_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").expect("valid ip regex")) +} + +fn ts_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| { + Regex::new(r"\b\d{4}-\d{2}-\d{2}[t ]\d{2}:\d{2}:\d{2}(?:\.\d+)?z?\b") + .expect("valid timestamp regex") + }) +} + +fn hash_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\b[0-9a-f]{12,}\b").expect("valid hash regex")) +} + +fn num_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\b\d+\b").expect("valid number regex")) +} + +fn ws_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\s+").expect("valid whitespace regex")) +} + +pub fn normalize_message(input: &str) -> String { + let mut value = input.to_lowercase(); + + value = uuid_re().replace_all(&value, "").to_string(); + value = ip_re().replace_all(&value, "").to_string(); + value = ts_re().replace_all(&value, "").to_string(); + value = hash_re().replace_all(&value, "").to_string(); + value = num_re().replace_all(&value, "").to_string(); + + ws_re().replace_all(value.trim(), " ").to_string() +} + +pub fn compute_score(counts: SeverityCounts, total_count: i32, last_seen_age_minutes: i64) -> i32 { + let severity_score = if counts.critical > 0 { + 50 + } else if counts.error > 0 { + 35 + } else if counts.warning > 0 { + 20 + } else { + 0 + }; + + let frequency_score = match total_count { + i32::MIN..=0 => 0, + 1 => 5, + 2..=3 => 12, + 4..=7 => 22, + 8..=15 => 30, + _ => 35, + }; + + let recency_score = if last_seen_age_minutes <= 2 { + 15 + } else if last_seen_age_minutes <= 10 { + 12 + } else if last_seen_age_minutes <= 30 { + 8 + } else if last_seen_age_minutes <= 120 { + 4 + } else { + 0 + }; + + severity_score + frequency_score + recency_score +} + +fn parse_event_timestamp(timestamp: &str, fallback: chrono::DateTime) -> chrono::DateTime { + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) { + return dt.with_timezone(&chrono::Utc); + } + + if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S%.f") { + return chrono::DateTime::::from_naive_utc_and_offset(naive, chrono::Utc); + } + + fallback +} + +pub fn group_subjects( + events: &[GraylogEvent], + now: chrono::DateTime, +) -> Vec { + let mut map: HashMap = HashMap::new(); + + for event in events { + let source = event + .service + .as_deref() + .filter(|value| !value.trim().is_empty()) + .unwrap_or(&event.source) + .to_string(); + let normalized_message = normalize_message(&event.message); + let subject_key = format!("{}|{}", source, normalized_message); + + let event_time = parse_event_timestamp(&event.timestamp, now); + let age_minutes = now.signed_duration_since(event_time).num_minutes().max(0); + + let entry = map.entry(subject_key.clone()).or_insert_with(|| SubjectAggregate { + subject_key: subject_key.clone(), + source: source.clone(), + normalized_message: normalized_message.clone(), + counts: SeverityCounts::default(), + total_count: 0, + last_seen_age_minutes: age_minutes, + last_seen_at: event.timestamp.clone(), + sample_events: Vec::new(), + score: 0, + }); + + entry.total_count += 1; + + let level = event.level.to_ascii_lowercase(); + if level.contains("critical") { + entry.counts.critical += 1; + } else if level.contains("error") { + entry.counts.error += 1; + } else if level.contains("warn") { + entry.counts.warning += 1; + } + + if age_minutes < entry.last_seen_age_minutes { + entry.last_seen_age_minutes = age_minutes; + entry.last_seen_at = event.timestamp.clone(); + } + + if entry.sample_events.len() < 5 { + entry.sample_events.push(event.raw.clone()); + } + } + + let mut aggregates: Vec = map + .into_values() + .map(|mut aggregate| { + aggregate.score = compute_score( + aggregate.counts, + aggregate.total_count, + aggregate.last_seen_age_minutes, + ); + aggregate + }) + .collect(); + + aggregates.sort_by(|a, b| { + b.score + .cmp(&a.score) + .then_with(|| b.total_count.cmp(&a.total_count)) + .then_with(|| a.subject_key.cmp(&b.subject_key)) + }); + + aggregates +} + +#[cfg(test)] +mod tests { + use super::*; + + fn event( + timestamp: &str, + source: &str, + service: Option<&str>, + level: &str, + message: &str, + ) -> GraylogEvent { + GraylogEvent { + timestamp: timestamp.to_string(), + source: source.to_string(), + service: service.map(str::to_string), + level: level.to_string(), + message: message.to_string(), + raw: serde_json::json!({ + "timestamp": timestamp, + "source": source, + "service": service, + "level": level, + "message": message, + }), + } + } + + #[test] + fn test_normalize_message_replaces_dynamic_values() { + let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd"; + let normalized = normalize_message(message); + + assert!(normalized.contains("")); + assert!(normalized.contains("")); + assert!(normalized.contains("")); + assert!(normalized.contains("")); + } + + #[test] + fn test_compute_score_critical_recent_is_high() { + let score = compute_score( + SeverityCounts { + critical: 1, + error: 0, + warning: 0, + }, + 2, + 30, + ); + assert!(score >= 70); + } + + #[test] + fn test_group_subjects_groups_by_service_and_normalized_message() { + let now = chrono::DateTime::parse_from_rfc3339("2026-04-17T10:30:00Z") + .expect("timestamp should parse") + .with_timezone(&chrono::Utc); + + let events = vec![ + event( + "2026-04-17T10:29:00Z", + "api-1", + Some("api"), + "error", + "Timeout user 42 from 10.0.0.1", + ), + event( + "2026-04-17T10:28:00Z", + "api-2", + Some("api"), + "critical", + "Timeout user 91 from 10.0.0.2", + ), + event( + "2026-04-17T09:40:00Z", + "worker-1", + Some("worker"), + "warning", + "Retry budget exhausted id=123", + ), + ]; + + let groups = group_subjects(&events, now); + assert_eq!(groups.len(), 2); + + assert_eq!(groups[0].source, "api"); + assert_eq!(groups[0].counts.critical, 1); + assert_eq!(groups[0].counts.error, 1); + assert_eq!(groups[0].total_count, 2); + assert!(groups[0].score > groups[1].score); + } +} diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index dc26457..9944d1e 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -2,6 +2,7 @@ pub mod agent_runtime; pub mod cli_process; pub mod crypto; pub mod filter_engine; +pub mod graylog_scoring; pub mod notifier; pub mod orchestrator; pub mod poller;