fix(graylog): deduplicate subjects across sources and hashed paths

This commit is contained in:
thibaud-lclr 2026-04-21 14:34:47 +02:00
parent 904e02367b
commit 43078ebf3d
2 changed files with 73 additions and 37 deletions

View file

@ -57,17 +57,12 @@ fn escape_graylog_query_value(value: &str) -> String {
fn build_graylog_subject_permalink(
base_url: &str,
source: &str,
normalized_message: &str,
lookback_minutes: i32,
) -> String {
let source = source.trim();
let normalized_message = normalized_message.trim();
let mut query_terms: Vec<String> = Vec::new();
if !source.is_empty() {
query_terms.push(format!("source:\"{}\"", escape_graylog_query_value(source)));
}
if !normalized_message.is_empty() {
query_terms.push(format!(
"message:\"{}\"",
@ -144,8 +139,10 @@ pub fn start(
};
let now = chrono::Utc::now();
let known_projects: HashSet<String> =
credentials.iter().map(|item| item.project_id.clone()).collect();
let known_projects: HashSet<String> = credentials
.iter()
.map(|item| item.project_id.clone())
.collect();
last_polled_at.retain(|project_id, _| known_projects.contains(project_id));
for credential in credentials {
@ -154,7 +151,8 @@ pub fn start(
let due = match last_polled_at.get(&project_id) {
Some(last_at) => {
now.signed_duration_since(*last_at).num_minutes() >= polling_interval_minutes
now.signed_duration_since(*last_at).num_minutes()
>= polling_interval_minutes
}
None => true,
};
@ -194,7 +192,8 @@ pub async fn poll_project_once(
let credentials = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let module_enabled = ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE)
let module_enabled =
ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE)
.map_err(|e| format!("module lookup failed: {}", e))?;
if !module_enabled {
@ -240,8 +239,8 @@ pub async fn poll_project_once(
};
let now = chrono::Utc::now();
let window_start = (now - chrono::Duration::minutes(credentials.lookback_minutes as i64))
.to_rfc3339();
let window_start =
(now - chrono::Duration::minutes(credentials.lookback_minutes as i64)).to_rfc3339();
let window_end = now.to_rfc3339();
let aggregates = group_subjects(&events, now);
@ -290,7 +289,6 @@ pub async fn poll_project_once(
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let source_permalink = build_graylog_subject_permalink(
&credentials.base_url,
&aggregate.source,
&aggregate.normalized_message,
credentials.lookback_minutes,
);
@ -300,10 +298,7 @@ pub async fn poll_project_once(
"graylog",
Some(&source_permalink),
synthetic_artifact_id(&aggregate.subject_key),
&format!(
"[Graylog] {} - {}",
aggregate.source, aggregate.normalized_message
),
&format!("[Graylog] {}", aggregate.normalized_message),
&subject_payload(&aggregate),
)
.map_err(|e| format!("insert graylog ticket failed: {}", e))?;
@ -373,7 +368,6 @@ mod tests {
fn test_build_graylog_subject_permalink_uses_relative_search_query() {
let url = build_graylog_subject_permalink(
"https://graylog.example.com/",
"api",
"timeout user <num>",
30,
);
@ -381,6 +375,6 @@ mod tests {
assert!(url.starts_with("https://graylog.example.com/search?"));
assert!(url.contains("rangetype=relative"));
assert!(url.contains("relative=1800"));
assert!(url.contains("q=source%3A%22api%22%20AND%20message%3A%22timeout%20user%20%3Cnum%3E%22"));
assert!(url.contains("q=message%3A%22timeout%20user%20%3Cnum%3E%22"));
}
}

View file

@ -59,6 +59,18 @@ fn hash_re() -> &'static Regex {
RE.get_or_init(|| Regex::new(r"\b[0-9a-f]{12,}\b").expect("valid hash regex"))
}
fn hashed_path_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| {
Regex::new(r"\b(?:[a-z0-9._-]+/){2,}[a-z0-9._-]{12,}\b").expect("valid hashed path regex")
})
}
fn long_token_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b[a-z0-9]{20,}\b").expect("valid long token regex"))
}
fn num_re() -> &'static Regex {
static RE: OnceLock<Regex> = OnceLock::new();
RE.get_or_init(|| Regex::new(r"\b\d+\b").expect("valid number regex"))
@ -75,7 +87,9 @@ pub fn normalize_message(input: &str) -> String {
value = uuid_re().replace_all(&value, "<uuid>").to_string();
value = ip_re().replace_all(&value, "<ip>").to_string();
value = ts_re().replace_all(&value, "<ts>").to_string();
value = hashed_path_re().replace_all(&value, "<path>").to_string();
value = hash_re().replace_all(&value, "<hash>").to_string();
value = long_token_re().replace_all(&value, "<hash>").to_string();
value = num_re().replace_all(&value, "<num>").to_string();
ws_re().replace_all(value.trim(), " ").to_string()
@ -116,7 +130,10 @@ pub fn compute_score(counts: SeverityCounts, total_count: i32, last_seen_age_min
severity_score + frequency_score + recency_score
}
fn parse_event_timestamp(timestamp: &str, fallback: chrono::DateTime<chrono::Utc>) -> chrono::DateTime<chrono::Utc> {
fn parse_event_timestamp(
timestamp: &str,
fallback: chrono::DateTime<chrono::Utc>,
) -> chrono::DateTime<chrono::Utc> {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(timestamp) {
return dt.with_timezone(&chrono::Utc);
}
@ -142,12 +159,14 @@ pub fn group_subjects(
.unwrap_or(&event.source)
.to_string();
let normalized_message = normalize_message(&event.message);
let subject_key = format!("{}|{}", source, normalized_message);
let subject_key = normalized_message.clone();
let event_time = parse_event_timestamp(&event.timestamp, now);
let age_minutes = now.signed_duration_since(event_time).num_minutes().max(0);
let entry = map.entry(subject_key.clone()).or_insert_with(|| SubjectAggregate {
let entry = map
.entry(subject_key.clone())
.or_insert_with(|| SubjectAggregate {
subject_key: subject_key.clone(),
source: source.clone(),
normalized_message: normalized_message.clone(),
@ -159,6 +178,10 @@ pub fn group_subjects(
score: 0,
});
if entry.source != source && entry.source != "multi-source" {
entry.source = "multi-source".to_string();
}
entry.total_count += 1;
let level = event.level.to_ascii_lowercase();
@ -231,13 +254,24 @@ mod tests {
#[test]
fn test_normalize_message_replaces_dynamic_values() {
let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd";
let message = "User 42 failed from 10.0.0.1 at 2026-04-17T10:00:00Z request=9f8b12ab34cd file ab/cd/zz19ff88qq77tt66";
let normalized = normalize_message(message);
assert!(normalized.contains("<num>"));
assert!(normalized.contains("<ip>"));
assert!(normalized.contains("<ts>"));
assert!(normalized.contains("<hash>"));
assert!(normalized.contains("<path>"));
}
#[test]
fn test_normalize_message_collapses_hashed_paths() {
let left = normalize_message("The mime type of file ab/cd/00112233445566778899 is invalid");
let right =
normalize_message("The mime type of file ef/gh/zz11yy22xx33ww44vv55 is invalid");
assert_eq!(left, right);
assert!(left.contains("<path>"));
}
#[test]
@ -255,7 +289,7 @@ mod tests {
}
#[test]
fn test_group_subjects_groups_by_service_and_normalized_message() {
fn test_group_subjects_groups_by_normalized_message_even_across_sources() {
let now = chrono::DateTime::parse_from_rfc3339("2026-04-17T10:30:00Z")
.expect("timestamp should parse")
.with_timezone(&chrono::Utc);
@ -275,6 +309,13 @@ mod tests {
"critical",
"Timeout user 91 from 10.0.0.2",
),
event(
"2026-04-17T10:27:00Z",
"api-3",
Some("billing"),
"error",
"Timeout user 73 from 10.0.0.3",
),
event(
"2026-04-17T09:40:00Z",
"worker-1",
@ -287,10 +328,11 @@ mod tests {
let groups = group_subjects(&events, now);
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].source, "api");
assert_eq!(groups[0].source, "multi-source");
assert_eq!(groups[0].counts.critical, 1);
assert_eq!(groups[0].counts.error, 1);
assert_eq!(groups[0].total_count, 2);
assert_eq!(groups[0].counts.error, 2);
assert_eq!(groups[0].total_count, 3);
assert!(!groups[0].subject_key.contains('|'));
assert!(groups[0].score > groups[1].score);
}
}