feat(graylog): add deterministic scoring and subject grouping

This commit is contained in:
thibaud-lclr 2026-04-17 15:28:19 +02:00
parent aecc4689e7
commit ff09212177
4 changed files with 301 additions and 0 deletions

2
src-tauri/Cargo.lock generated
View file

@ -2686,6 +2686,7 @@ dependencies = [
"chrono", "chrono",
"dirs 5.0.1", "dirs 5.0.1",
"rand 0.8.5", "rand 0.8.5",
"regex",
"reqwest 0.12.28", "reqwest 0.12.28",
"rusqlite", "rusqlite",
"serde", "serde",
@ -2696,6 +2697,7 @@ dependencies = [
"tauri-plugin-notification", "tauri-plugin-notification",
"tempfile", "tempfile",
"tokio", "tokio",
"urlencoding",
"uuid", "uuid",
] ]

View file

@ -32,6 +32,8 @@ tokio = { version = "1", features = ["time", "sync", "macros", "process", "io-ut
aes-gcm = "0.10" aes-gcm = "0.10"
rand = "0.8" rand = "0.8"
base64 = "0.22" base64 = "0.22"
regex = "1"
urlencoding = "2"
[dev-dependencies] [dev-dependencies]
tempfile = "3" tempfile = "3"

View file

@ -0,0 +1,296 @@
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::OnceLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraylogEvent {
pub timestamp: String,
pub source: String,
pub service: Option<String>,
pub level: String,
pub message: String,
pub raw: serde_json::Value,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct SeverityCounts {
pub critical: i32,
pub error: i32,
pub warning: i32,
}
#[derive(Debug, Clone)]
pub struct SubjectAggregate {
pub subject_key: String,
pub source: String,
pub normalized_message: String,
pub counts: SeverityCounts,
pub total_count: i32,
pub last_seen_age_minutes: i64,
pub last_seen_at: String,
pub sample_events: Vec<serde_json::Value>,
pub score: i32,
}
fn uuid_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"\b[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}\b")
.expect("valid uuid regex")
})
}
fn ip_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").expect("valid ip regex"))
}
fn ts_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"\b\d{4}-\d{2}-\d{2}[t ]\d{2}:\d{2}:\d{2}(?:\.\d+)?z?\b")
.expect("valid timestamp regex")
})
}
fn hash_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b[0-9a-f]{12,}\b").expect("valid hash regex"))
}
fn num_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b\d+\b").expect("valid number regex"))
}
fn ws_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\s+").expect("valid whitespace regex"))
}
pub fn normalize_message(input: &str) -> String {
let mut value = input.to_lowercase();
value = uuid_re().replace_all(&value, "<uuid>").to_string();
value = ip_re().replace_all(&value, "<ip>").to_string();
value = ts_re().replace_all(&value, "<ts>").to_string();
value = hash_re().replace_all(&value, "<hash>").to_string();
value = num_re().replace_all(&value, "<num>").to_string();
ws_re().replace_all(value.trim(), " ").to_string()
}
pub fn compute_score(counts: SeverityCounts, total_count: i32, last_seen_age_minutes: i64) -> i32 {
let severity_score = if counts.critical > 0 {
50
} else if counts.error > 0 {
35
} else if counts.warning > 0 {
20
} else {
0
};
let frequency_score = match total_count {
i32::MIN..=0 => 0,
1 => 5,
2..=3 => 12,
4..=7 => 22,
8..=15 => 30,
_ => 35,
};
let recency_score = if last_seen_age_minutes <= 2 {
15
} else if last_seen_age_minutes <= 10 {
12
} else if last_seen_age_minutes <= 30 {
8
} else if last_seen_age_minutes <= 120 {
4
} else {
0
};
severity_score + frequency_score + recency_score
}
fn parse_event_timestamp(timestamp: &str, fallback: chrono::DateTime<chrono::Utc>) -> chrono::DateTime<chrono::Utc> {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
return dt.with_timezone(&chrono::Utc);
}
if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%d %H:%M:%S%.f") {
return chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive, chrono::Utc);
}
fallback
}
pub fn group_subjects(
events: &[GraylogEvent],
now: chrono::DateTime<chrono::Utc>,
) -> Vec<SubjectAggregate> {
let mut map: HashMap<String, SubjectAggregate> = HashMap::new();
for event in events {
let source = event
.service
.as_deref()
.filter(|value| !value.trim().is_empty())
.unwrap_or(&event.source)
.to_string();
let normalized_message = normalize_message(&event.message);
let subject_key = format!("{}|{}", source, normalized_message);
let event_time = parse_event_timestamp(&event.timestamp, now);
let age_minutes = now.signed_duration_since(event_time).num_minutes().max(0);
let entry = map.entry(subject_key.clone()).or_insert_with(|| SubjectAggregate {
subject_key: subject_key.clone(),
source: source.clone(),
normalized_message: normalized_message.clone(),
counts: SeverityCounts::default(),
total_count: 0,
last_seen_age_minutes: age_minutes,
last_seen_at: event.timestamp.clone(),
sample_events: Vec::new(),
score: 0,
});
entry.total_count += 1;
let level = event.level.to_ascii_lowercase();
if level.contains("critical") {
entry.counts.critical += 1;
} else if level.contains("error") {
entry.counts.error += 1;
} else if level.contains("warn") {
entry.counts.warning += 1;
}
if age_minutes < entry.last_seen_age_minutes {
entry.last_seen_age_minutes = age_minutes;
entry.last_seen_at = event.timestamp.clone();
}
if entry.sample_events.len() < 5 {
entry.sample_events.push(event.raw.clone());
}
}
let mut aggregates: Vec<SubjectAggregate> = map
.into_values()
.map(|mut aggregate| {
aggregate.score = compute_score(
aggregate.counts,
aggregate.total_count,
aggregate.last_seen_age_minutes,
);
aggregate
})
.collect();
aggregates.sort_by(|a, b| {
b.score
.cmp(&a.score)
.then_with(|| b.total_count.cmp(&a.total_count))
.then_with(|| a.subject_key.cmp(&b.subject_key))
});
aggregates
}
#[cfg(test)]
mod tests {
use super::*;
fn event(
timestamp: &str,
source: &str,
service: Option<&str>,
level: &str,
message: &str,
) -> GraylogEvent {
GraylogEvent {
timestamp: timestamp.to_string(),
source: source.to_string(),
service: service.map(str::to_string),
level: level.to_string(),
message: message.to_string(),
raw: serde_json::json!({
"timestamp": timestamp,
"source": source,
"service": service,
"level": level,
"message": message,
}),
}
}
#[test]
fn test_normalize_message_replaces_dynamic_values() {
let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd";
let normalized = normalize_message(message);
assert!(normalized.contains("<num>"));
assert!(normalized.contains("<ip>"));
assert!(normalized.contains("<ts>"));
assert!(normalized.contains("<hash>"));
}
#[test]
fn test_compute_score_critical_recent_is_high() {
let score = compute_score(
SeverityCounts {
critical: 1,
error: 0,
warning: 0,
},
2,
30,
);
assert!(score >= 70);
}
#[test]
fn test_group_subjects_groups_by_service_and_normalized_message() {
let now = chrono::DateTime::parse_from_rfc3339("2026-04-17T10:30:00Z")
.expect("timestamp should parse")
.with_timezone(&chrono::Utc);
let events = vec![
event(
"2026-04-17T10:29:00Z",
"api-1",
Some("api"),
"error",
"Timeout user 42 from 10.0.0.1",
),
event(
"2026-04-17T10:28:00Z",
"api-2",
Some("api"),
"critical",
"Timeout user 91 from 10.0.0.2",
),
event(
"2026-04-17T09:40:00Z",
"worker-1",
Some("worker"),
"warning",
"Retry budget exhausted id=123",
),
];
let groups = group_subjects(&events, now);
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].source, "api");
assert_eq!(groups[0].counts.critical, 1);
assert_eq!(groups[0].counts.error, 1);
assert_eq!(groups[0].total_count, 2);
assert!(groups[0].score > groups[1].score);
}
}

View file

@ -2,6 +2,7 @@ pub mod agent_runtime;
pub mod cli_process; pub mod cli_process;
pub mod crypto; pub mod crypto;
pub mod filter_engine; pub mod filter_engine;
pub mod graylog_scoring;
pub mod notifier; pub mod notifier;
pub mod orchestrator; pub mod orchestrator;
pub mod poller; pub mod poller;