From 43078ebf3d1057eebcda186204145808f124b08e Mon Sep 17 00:00:00 2001 From: thibaud-lclr Date: Tue, 21 Apr 2026 14:34:47 +0200 Subject: [PATCH] fix(graylog): deduplicate subjects across sources and hashed paths --- src-tauri/src/services/graylog_poller.rs | 32 ++++------ src-tauri/src/services/graylog_scoring.rs | 78 +++++++++++++++++------ 2 files changed, 73 insertions(+), 37 deletions(-) diff --git a/src-tauri/src/services/graylog_poller.rs b/src-tauri/src/services/graylog_poller.rs index cae97d1..dd1144f 100644 --- a/src-tauri/src/services/graylog_poller.rs +++ b/src-tauri/src/services/graylog_poller.rs @@ -57,17 +57,12 @@ fn escape_graylog_query_value(value: &str) -> String { fn build_graylog_subject_permalink( base_url: &str, - source: &str, normalized_message: &str, lookback_minutes: i32, ) -> String { - let source = source.trim(); let normalized_message = normalized_message.trim(); let mut query_terms: Vec = Vec::new(); - if !source.is_empty() { - query_terms.push(format!("source:\"{}\"", escape_graylog_query_value(source))); - } if !normalized_message.is_empty() { query_terms.push(format!( "message:\"{}\"", @@ -144,8 +139,10 @@ pub fn start( }; let now = chrono::Utc::now(); - let known_projects: HashSet = - credentials.iter().map(|item| item.project_id.clone()).collect(); + let known_projects: HashSet = credentials + .iter() + .map(|item| item.project_id.clone()) + .collect(); last_polled_at.retain(|project_id, _| known_projects.contains(project_id)); for credential in credentials { @@ -154,7 +151,8 @@ pub fn start( let due = match last_polled_at.get(&project_id) { Some(last_at) => { - now.signed_duration_since(*last_at).num_minutes() >= polling_interval_minutes + now.signed_duration_since(*last_at).num_minutes() + >= polling_interval_minutes } None => true, }; @@ -194,8 +192,9 @@ pub async fn poll_project_once( let credentials = { let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; - let module_enabled = ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE) - .map_err(|e| format!("module lookup failed: {}", e))?; + let module_enabled = + ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE) + .map_err(|e| format!("module lookup failed: {}", e))?; if !module_enabled { return Ok(0); @@ -240,8 +239,8 @@ pub async fn poll_project_once( }; let now = chrono::Utc::now(); - let window_start = (now - chrono::Duration::minutes(credentials.lookback_minutes as i64)) - .to_rfc3339(); + let window_start = + (now - chrono::Duration::minutes(credentials.lookback_minutes as i64)).to_rfc3339(); let window_end = now.to_rfc3339(); let aggregates = group_subjects(&events, now); @@ -290,7 +289,6 @@ pub async fn poll_project_once( let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?; let source_permalink = build_graylog_subject_permalink( &credentials.base_url, - &aggregate.source, &aggregate.normalized_message, credentials.lookback_minutes, ); @@ -300,10 +298,7 @@ pub async fn poll_project_once( "graylog", Some(&source_permalink), synthetic_artifact_id(&aggregate.subject_key), - &format!( - "[Graylog] {} - {}", - aggregate.source, aggregate.normalized_message - ), + &format!("[Graylog] {}", aggregate.normalized_message), &subject_payload(&aggregate), ) .map_err(|e| format!("insert graylog ticket failed: {}", e))?; @@ -373,7 +368,6 @@ mod tests { fn test_build_graylog_subject_permalink_uses_relative_search_query() { let url = build_graylog_subject_permalink( "https://graylog.example.com/", - "api", "timeout user ", 30, ); @@ -381,6 +375,6 @@ mod tests { assert!(url.starts_with("https://graylog.example.com/search?")); assert!(url.contains("rangetype=relative")); assert!(url.contains("relative=1800")); - assert!(url.contains("q=source%3A%22api%22%20AND%20message%3A%22timeout%20user%20%3Cnum%3E%22")); + assert!(url.contains("q=message%3A%22timeout%20user%20%3Cnum%3E%22")); } } diff --git a/src-tauri/src/services/graylog_scoring.rs b/src-tauri/src/services/graylog_scoring.rs index c42002a..cc3ff40 100644 --- a/src-tauri/src/services/graylog_scoring.rs +++ b/src-tauri/src/services/graylog_scoring.rs @@ -59,6 +59,18 @@ fn hash_re() -> &'static Regex { RE.get_or_init(|| Regex::new(r"\b[0-9a-f]{12,}\b").expect("valid hash regex")) } +fn hashed_path_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| { + Regex::new(r"\b(?:[a-z0-9._-]+/){2,}[a-z0-9._-]{12,}\b").expect("valid hashed path regex") + }) +} + +fn long_token_re() -> &'static Regex { + static RE: OnceLock = OnceLock::new(); + RE.get_or_init(|| Regex::new(r"\b[a-z0-9]{20,}\b").expect("valid long token regex")) +} + fn num_re() -> &'static Regex { static RE: OnceLock = OnceLock::new(); RE.get_or_init(|| Regex::new(r"\b\d+\b").expect("valid number regex")) @@ -75,7 +87,9 @@ pub fn normalize_message(input: &str) -> String { value = uuid_re().replace_all(&value, "").to_string(); value = ip_re().replace_all(&value, "").to_string(); value = ts_re().replace_all(&value, "").to_string(); + value = hashed_path_re().replace_all(&value, "").to_string(); value = hash_re().replace_all(&value, "").to_string(); + value = long_token_re().replace_all(&value, "").to_string(); value = num_re().replace_all(&value, "").to_string(); ws_re().replace_all(value.trim(), " ").to_string() @@ -116,7 +130,10 @@ pub fn compute_score(counts: SeverityCounts, total_count: i32, last_seen_age_min severity_score + frequency_score + recency_score } -fn parse_event_timestamp(timestamp: &str, fallback: chrono::DateTime) -> chrono::DateTime { +fn parse_event_timestamp( + timestamp: &str, + fallback: chrono::DateTime, +) -> chrono::DateTime { if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) { return dt.with_timezone(&chrono::Utc); } @@ -142,22 +159,28 @@ pub fn group_subjects( .unwrap_or(&event.source) .to_string(); let normalized_message = normalize_message(&event.message); - let subject_key = format!("{}|{}", source, normalized_message); + let subject_key = normalized_message.clone(); let event_time = parse_event_timestamp(&event.timestamp, now); let age_minutes = now.signed_duration_since(event_time).num_minutes().max(0); - let entry = map.entry(subject_key.clone()).or_insert_with(|| SubjectAggregate { - subject_key: subject_key.clone(), - source: source.clone(), - normalized_message: normalized_message.clone(), - counts: SeverityCounts::default(), - total_count: 0, - last_seen_age_minutes: age_minutes, - last_seen_at: event.timestamp.clone(), - sample_events: Vec::new(), - score: 0, - }); + let entry = map + .entry(subject_key.clone()) + .or_insert_with(|| SubjectAggregate { + subject_key: subject_key.clone(), + source: source.clone(), + normalized_message: normalized_message.clone(), + counts: SeverityCounts::default(), + total_count: 0, + last_seen_age_minutes: age_minutes, + last_seen_at: event.timestamp.clone(), + sample_events: Vec::new(), + score: 0, + }); + + if entry.source != source && entry.source != "multi-source" { + entry.source = "multi-source".to_string(); + } entry.total_count += 1; @@ -231,13 +254,24 @@ mod tests { #[test] fn test_normalize_message_replaces_dynamic_values() { - let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd"; + let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd file ab/cd/zz19ff88qq77tt66"; let normalized = normalize_message(message); assert!(normalized.contains("")); assert!(normalized.contains("")); assert!(normalized.contains("")); assert!(normalized.contains("")); + assert!(normalized.contains("")); + } + + #[test] + fn test_normalize_message_collapses_hashed_paths() { + let left = normalize_message("The mime type of file ab/cd/00112233445566778899 is invalid"); + let right = + normalize_message("The mime type of file ef/gh/zz11yy22xx33ww44vv55 is invalid"); + + assert_eq!(left, right); + assert!(left.contains("")); } #[test] @@ -255,7 +289,7 @@ mod tests { } #[test] - fn test_group_subjects_groups_by_service_and_normalized_message() { + fn test_group_subjects_groups_by_normalized_message_even_across_sources() { let now = chrono::DateTime::parse_from_rfc3339("2026-04-17T10:30:00Z") .expect("timestamp should parse") .with_timezone(&chrono::Utc); @@ -275,6 +309,13 @@ mod tests { "critical", "Timeout user 91 from 10.0.0.2", ), + event( + "2026-04-17T10:27:00Z", + "api-3", + Some("billing"), + "error", + "Timeout user 73 from 10.0.0.3", + ), event( "2026-04-17T09:40:00Z", "worker-1", @@ -287,10 +328,11 @@ mod tests { let groups = group_subjects(&events, now); assert_eq!(groups.len(), 2); - assert_eq!(groups[0].source, "api"); + assert_eq!(groups[0].source, "multi-source"); assert_eq!(groups[0].counts.critical, 1); - assert_eq!(groups[0].counts.error, 1); - assert_eq!(groups[0].total_count, 2); + assert_eq!(groups[0].counts.error, 2); + assert_eq!(groups[0].total_count, 3); + assert!(!groups[0].subject_key.contains('|')); assert!(groups[0].score > groups[1].score); } }