feat(graylog): add model layer and project module default

This commit is contained in:
thibaud-lclr 2026-04-17 15:21:12 +02:00
parent fc434fe560
commit aecc4689e7
3 changed files with 741 additions and 0 deletions

View file

@ -0,0 +1,699 @@
use rusqlite::{params, Connection, OptionalExtension, Result};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraylogCredentials {
pub id: String,
pub project_id: String,
pub base_url: String,
pub api_token_encrypted: String,
pub analyst_agent_id: String,
pub developer_agent_id: String,
pub stream_id: Option<String>,
pub query_filter: String,
pub polling_interval_minutes: i32,
pub lookback_minutes: i32,
pub score_threshold: i32,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraylogCredentialsSafe {
pub id: String,
pub project_id: String,
pub base_url: String,
pub analyst_agent_id: String,
pub developer_agent_id: String,
pub stream_id: Option<String>,
pub query_filter: String,
pub polling_interval_minutes: i32,
pub lookback_minutes: i32,
pub score_threshold: i32,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraylogSubject {
pub id: String,
pub project_id: String,
pub subject_key: String,
pub source: String,
pub normalized_message: String,
pub first_seen_at: String,
pub last_seen_at: String,
pub last_score: i32,
pub active_ticket_id: Option<String>,
pub status: String,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraylogDetection {
pub id: String,
pub subject_id: String,
pub window_start: String,
pub window_end: String,
pub critical_count: i32,
pub error_count: i32,
pub warning_count: i32,
pub total_count: i32,
pub last_seen_at: String,
pub score: i32,
pub triggered: bool,
pub triggered_ticket_id: Option<String>,
pub created_at: String,
}
fn timestamp_now() -> String {
chrono::Utc::now().to_rfc3339()
}
fn credentials_from_row(row: &rusqlite::Row) -> rusqlite::Result<GraylogCredentials> {
Ok(GraylogCredentials {
id: row.get(0)?,
project_id: row.get(1)?,
base_url: row.get(2)?,
api_token_encrypted: row.get(3)?,
analyst_agent_id: row.get(4)?,
developer_agent_id: row.get(5)?,
stream_id: row.get(6)?,
query_filter: row.get(7)?,
polling_interval_minutes: row.get(8)?,
lookback_minutes: row.get(9)?,
score_threshold: row.get(10)?,
created_at: row.get(11)?,
updated_at: row.get(12)?,
})
}
fn subject_from_row(row: &rusqlite::Row) -> rusqlite::Result<GraylogSubject> {
Ok(GraylogSubject {
id: row.get(0)?,
project_id: row.get(1)?,
subject_key: row.get(2)?,
source: row.get(3)?,
normalized_message: row.get(4)?,
first_seen_at: row.get(5)?,
last_seen_at: row.get(6)?,
last_score: row.get(7)?,
active_ticket_id: row.get(8)?,
status: row.get(9)?,
created_at: row.get(10)?,
updated_at: row.get(11)?,
})
}
fn detection_from_row(row: &rusqlite::Row) -> rusqlite::Result<GraylogDetection> {
let triggered_int: i32 = row.get(10)?;
Ok(GraylogDetection {
id: row.get(0)?,
subject_id: row.get(1)?,
window_start: row.get(2)?,
window_end: row.get(3)?,
critical_count: row.get(4)?,
error_count: row.get(5)?,
warning_count: row.get(6)?,
total_count: row.get(7)?,
last_seen_at: row.get(8)?,
score: row.get(9)?,
triggered: triggered_int != 0,
triggered_ticket_id: row.get(11)?,
created_at: row.get(12)?,
})
}
impl GraylogCredentials {
pub fn upsert_for_project(
conn: &Connection,
project_id: &str,
base_url: &str,
api_token_encrypted: &str,
analyst_agent_id: &str,
developer_agent_id: &str,
stream_id: Option<&str>,
query_filter: &str,
polling_interval_minutes: i32,
lookback_minutes: i32,
score_threshold: i32,
) -> Result<GraylogCredentials> {
let now = timestamp_now();
let id = Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO graylog_credentials (
id,
project_id,
base_url,
api_token_encrypted,
analyst_agent_id,
developer_agent_id,
stream_id,
query_filter,
polling_interval_minutes,
lookback_minutes,
score_threshold,
created_at,
updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
ON CONFLICT(project_id) DO UPDATE SET
base_url = excluded.base_url,
api_token_encrypted = excluded.api_token_encrypted,
analyst_agent_id = excluded.analyst_agent_id,
developer_agent_id = excluded.developer_agent_id,
stream_id = excluded.stream_id,
query_filter = excluded.query_filter,
polling_interval_minutes = excluded.polling_interval_minutes,
lookback_minutes = excluded.lookback_minutes,
score_threshold = excluded.score_threshold,
updated_at = excluded.updated_at",
params![
id,
project_id,
base_url,
api_token_encrypted,
analyst_agent_id,
developer_agent_id,
stream_id,
query_filter,
polling_interval_minutes,
lookback_minutes,
score_threshold,
now,
now,
],
)?;
Self::get_by_project(conn, project_id)?.ok_or(rusqlite::Error::QueryReturnedNoRows)
}
pub fn get_by_project(
conn: &Connection,
project_id: &str,
) -> Result<Option<GraylogCredentials>> {
conn.query_row(
"SELECT
id,
project_id,
base_url,
api_token_encrypted,
analyst_agent_id,
developer_agent_id,
stream_id,
query_filter,
polling_interval_minutes,
lookback_minutes,
score_threshold,
created_at,
updated_at
FROM graylog_credentials
WHERE project_id = ?1
LIMIT 1",
params![project_id],
credentials_from_row,
)
.optional()
}
pub fn delete_for_project(conn: &Connection, project_id: &str) -> Result<()> {
conn.execute(
"DELETE FROM graylog_credentials WHERE project_id = ?1",
params![project_id],
)?;
Ok(())
}
pub fn to_safe(&self) -> GraylogCredentialsSafe {
GraylogCredentialsSafe {
id: self.id.clone(),
project_id: self.project_id.clone(),
base_url: self.base_url.clone(),
analyst_agent_id: self.analyst_agent_id.clone(),
developer_agent_id: self.developer_agent_id.clone(),
stream_id: self.stream_id.clone(),
query_filter: self.query_filter.clone(),
polling_interval_minutes: self.polling_interval_minutes,
lookback_minutes: self.lookback_minutes,
score_threshold: self.score_threshold,
created_at: self.created_at.clone(),
updated_at: self.updated_at.clone(),
}
}
}
impl GraylogSubject {
pub fn upsert_subject(
conn: &Connection,
project_id: &str,
subject_key: &str,
source: &str,
normalized_message: &str,
last_seen_at: &str,
last_score: i32,
) -> Result<GraylogSubject> {
let now = timestamp_now();
let id = Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO graylog_subjects (
id,
project_id,
subject_key,
source,
normalized_message,
first_seen_at,
last_seen_at,
last_score,
status,
created_at,
updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6, ?7, 'idle', ?8, ?8)
ON CONFLICT(project_id, subject_key) DO UPDATE SET
source = excluded.source,
normalized_message = excluded.normalized_message,
last_seen_at = excluded.last_seen_at,
last_score = excluded.last_score,
updated_at = excluded.updated_at",
params![
id,
project_id,
subject_key,
source,
normalized_message,
last_seen_at,
last_score,
now,
],
)?;
Self::get_by_project_and_key(conn, project_id, subject_key)?
.ok_or(rusqlite::Error::QueryReturnedNoRows)
}
pub fn get_by_project_and_key(
conn: &Connection,
project_id: &str,
subject_key: &str,
) -> Result<Option<GraylogSubject>> {
conn.query_row(
"SELECT
id,
project_id,
subject_key,
source,
normalized_message,
first_seen_at,
last_seen_at,
last_score,
active_ticket_id,
status,
created_at,
updated_at
FROM graylog_subjects
WHERE project_id = ?1
AND subject_key = ?2
LIMIT 1",
params![project_id, subject_key],
subject_from_row,
)
.optional()
}
pub fn set_active_ticket(
conn: &Connection,
subject_id: &str,
active_ticket_id: Option<&str>,
status: &str,
) -> Result<()> {
let now = timestamp_now();
let affected = conn.execute(
"UPDATE graylog_subjects
SET active_ticket_id = ?1,
status = ?2,
updated_at = ?3
WHERE id = ?4",
params![active_ticket_id, status, now, subject_id],
)?;
if affected == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Ok(())
}
pub fn list_by_project(conn: &Connection, project_id: &str) -> Result<Vec<GraylogSubject>> {
let mut stmt = conn.prepare(
"SELECT
id,
project_id,
subject_key,
source,
normalized_message,
first_seen_at,
last_seen_at,
last_score,
active_ticket_id,
status,
created_at,
updated_at
FROM graylog_subjects
WHERE project_id = ?1
ORDER BY last_seen_at DESC, created_at DESC, id DESC",
)?;
let rows = stmt.query_map(params![project_id], subject_from_row)?;
rows.collect()
}
}
impl GraylogDetection {
#[allow(clippy::too_many_arguments)]
pub fn insert(
conn: &Connection,
subject_id: &str,
window_start: &str,
window_end: &str,
critical_count: i32,
error_count: i32,
warning_count: i32,
total_count: i32,
last_seen_at: &str,
score: i32,
triggered: bool,
triggered_ticket_id: Option<&str>,
) -> Result<GraylogDetection> {
let id = Uuid::new_v4().to_string();
let created_at = timestamp_now();
let triggered_int = if triggered { 1i32 } else { 0i32 };
conn.execute(
"INSERT INTO graylog_detections (
id,
subject_id,
window_start,
window_end,
critical_count,
error_count,
warning_count,
total_count,
last_seen_at,
score,
triggered,
triggered_ticket_id,
created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
params![
id,
subject_id,
window_start,
window_end,
critical_count,
error_count,
warning_count,
total_count,
last_seen_at,
score,
triggered_int,
triggered_ticket_id,
created_at,
],
)?;
conn.query_row(
"SELECT
id,
subject_id,
window_start,
window_end,
critical_count,
error_count,
warning_count,
total_count,
last_seen_at,
score,
triggered,
triggered_ticket_id,
created_at
FROM graylog_detections
WHERE id = ?1",
params![id],
detection_from_row,
)
}
pub fn list_by_project(
conn: &Connection,
project_id: &str,
subject_id: Option<&str>,
) -> Result<Vec<GraylogDetection>> {
let mut stmt = if subject_id.is_some() {
conn.prepare(
"SELECT
gd.id,
gd.subject_id,
gd.window_start,
gd.window_end,
gd.critical_count,
gd.error_count,
gd.warning_count,
gd.total_count,
gd.last_seen_at,
gd.score,
gd.triggered,
gd.triggered_ticket_id,
gd.created_at
FROM graylog_detections gd
JOIN graylog_subjects gs ON gs.id = gd.subject_id
WHERE gs.project_id = ?1
AND gd.subject_id = ?2
ORDER BY gd.created_at DESC, gd.id DESC",
)?
} else {
conn.prepare(
"SELECT
gd.id,
gd.subject_id,
gd.window_start,
gd.window_end,
gd.critical_count,
gd.error_count,
gd.warning_count,
gd.total_count,
gd.last_seen_at,
gd.score,
gd.triggered,
gd.triggered_ticket_id,
gd.created_at
FROM graylog_detections gd
JOIN graylog_subjects gs ON gs.id = gd.subject_id
WHERE gs.project_id = ?1
ORDER BY gd.created_at DESC, gd.id DESC",
)?
};
let rows = if let Some(subject_id) = subject_id {
stmt.query_map(params![project_id, subject_id], detection_from_row)?
} else {
stmt.query_map(params![project_id], detection_from_row)?
};
rows.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db;
use crate::models::agent::{Agent, AgentRole, AgentTool};
use crate::models::project::Project;
use crate::models::ticket::ProcessedTicket;
use rusqlite::Connection;
fn setup() -> (Connection, Project, Agent, Agent) {
let conn = db::init_in_memory().expect("db init should succeed");
let project = Project::insert(&conn, "Graylog", "/tmp/graylog", None, "main")
.expect("project insert should succeed");
let analyst = Agent::insert(&conn, "Analyst", AgentRole::Analyst, AgentTool::Codex, "")
.expect("analyst insert should succeed");
let developer = Agent::insert(
&conn,
"Developer",
AgentRole::Developer,
AgentTool::ClaudeCode,
"",
)
.expect("developer insert should succeed");
(conn, project, analyst, developer)
}
#[test]
fn test_upsert_graylog_credentials_for_project() {
let (conn, project, analyst, developer) = setup();
let creds = GraylogCredentials::upsert_for_project(
&conn,
&project.id,
"https://graylog.local",
"enc-token",
&analyst.id,
&developer.id,
Some("stream-1"),
"level:(critical OR error)",
10,
30,
70,
)
.expect("upsert should succeed");
assert_eq!(creds.project_id, project.id);
assert_eq!(creds.base_url, "https://graylog.local");
assert_eq!(creds.api_token_encrypted, "enc-token");
assert_eq!(creds.stream_id.as_deref(), Some("stream-1"));
let stored = GraylogCredentials::get_by_project(&conn, &project.id)
.expect("get should succeed")
.expect("credentials should exist");
assert_eq!(stored.id, creds.id);
assert_eq!(stored.analyst_agent_id, analyst.id);
let safe = stored.to_safe();
assert_eq!(safe.project_id, project.id);
assert_eq!(safe.base_url, "https://graylog.local");
GraylogCredentials::delete_for_project(&conn, &project.id).expect("delete should succeed");
let deleted = GraylogCredentials::get_by_project(&conn, &project.id)
.expect("get after delete should succeed");
assert!(deleted.is_none());
}
#[test]
fn test_upsert_subject_and_get_by_project_and_key() {
let (conn, project, _, _) = setup();
let first_seen = "2026-04-17T08:00:00Z";
let second_seen = "2026-04-17T09:00:00Z";
let subject = GraylogSubject::upsert_subject(
&conn,
&project.id,
"api:timeout",
"api",
"timeout while calling payment gateway",
first_seen,
42,
)
.expect("initial subject upsert should succeed");
assert_eq!(subject.project_id, project.id);
assert_eq!(subject.subject_key, "api:timeout");
assert_eq!(subject.first_seen_at, first_seen);
assert_eq!(subject.last_seen_at, first_seen);
assert_eq!(subject.last_score, 42);
assert_eq!(subject.status, "idle");
assert!(subject.active_ticket_id.is_none());
let ticket = ProcessedTicket::insert_external(
&conn,
&project.id,
"graylog",
Some(&subject.id),
101,
"[Graylog] api - timeout",
"{\"subject_id\":\"api:timeout\"}",
)
.expect("ticket insert should succeed");
GraylogSubject::upsert_subject(
&conn,
&project.id,
"api:timeout",
"api",
"timeout while calling payment gateway",
second_seen,
73,
)
.expect("second subject upsert should succeed");
GraylogSubject::set_active_ticket(&conn, &subject.id, Some(&ticket.id), "queued")
.expect("set active ticket should succeed");
let stored = GraylogSubject::get_by_project_and_key(&conn, &project.id, "api:timeout")
.expect("get should succeed")
.expect("subject should exist");
assert_eq!(stored.id, subject.id);
assert_eq!(stored.first_seen_at, first_seen);
assert_eq!(stored.last_seen_at, second_seen);
assert_eq!(stored.last_score, 73);
assert_eq!(stored.active_ticket_id.as_deref(), Some(ticket.id.as_str()));
assert_eq!(stored.status, "queued");
let subjects =
GraylogSubject::list_by_project(&conn, &project.id).expect("list should succeed");
assert_eq!(subjects.len(), 1);
assert_eq!(subjects[0].id, subject.id);
}
#[test]
fn test_insert_detection_and_list_by_project_orders_latest_first() {
let (conn, project, _, _) = setup();
let subject = GraylogSubject::upsert_subject(
&conn,
&project.id,
"worker:panic",
"worker",
"panic in background worker",
"2026-04-17T08:00:00Z",
55,
)
.expect("subject upsert should succeed");
let first = GraylogDetection::insert(
&conn,
&subject.id,
"2026-04-17T08:00:00Z",
"2026-04-17T08:10:00Z",
1,
3,
0,
4,
"2026-04-17T08:09:00Z",
55,
false,
None,
)
.expect("first detection insert should succeed");
let second = GraylogDetection::insert(
&conn,
&subject.id,
"2026-04-17T09:00:00Z",
"2026-04-17T09:10:00Z",
2,
4,
1,
7,
"2026-04-17T09:09:00Z",
80,
true,
None,
)
.expect("second detection insert should succeed");
let detections = GraylogDetection::list_by_project(&conn, &project.id, None)
.expect("list should succeed");
assert_eq!(detections.len(), 2);
assert_eq!(detections[0].id, second.id);
assert_eq!(detections[1].id, first.id);
let subject_detections =
GraylogDetection::list_by_project(&conn, &project.id, Some(&subject.id))
.expect("subject list should succeed");
assert_eq!(subject_detections.len(), 2);
assert!(subject_detections[0].triggered);
assert_eq!(subject_detections[0].score, 80);
}
}

View file

@ -1,6 +1,7 @@
pub mod agent;
pub mod agent_task;
pub mod credential;
pub mod graylog;
pub mod live_agent;
pub mod module;
pub mod notification;

View file

@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub const MODULE_TULEAP_AUTO_RESOLVE: &str = "tuleap_polling_auto_resolve";
pub const MODULE_GRAYLOG_AUTO_RESOLVE: &str = "graylog_polling_auto_resolve";
pub const MODULE_AI_LIVE_CHAT: &str = "ai_live_chat";
pub const MODULE_AGENT_TASK_RUNNER: &str = "agent_task_runner";
@ -70,6 +71,13 @@ impl ProjectModule {
"Polling Tuleap + auto-resolve",
"Surveille Tuleap et lance le pipeline analyste/developpeur.",
)?;
insert_default(
conn,
project_id,
MODULE_GRAYLOG_AUTO_RESOLVE,
"Polling Graylog + auto-resolve",
"Surveille Graylog, score les sujets, et déclenche le pipeline analyste/developpeur.",
)?;
insert_default(
conn,
project_id,
@ -140,3 +148,36 @@ impl ProjectModule {
Ok(enabled_int != 0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db;
use crate::models::project::Project;
#[test]
fn test_ensure_defaults_for_project_includes_graylog_module() {
let conn = db::init_in_memory().expect("db init should succeed");
let project = Project::insert(&conn, "Modules", "/tmp/modules", None, "main")
.expect("project insert should succeed");
ProjectModule::ensure_defaults_for_project(&conn, &project.id)
.expect("ensure defaults should succeed");
let modules =
ProjectModule::list_by_project(&conn, &project.id).expect("list should succeed");
assert_eq!(modules.len(), 4);
let graylog = modules
.iter()
.find(|module| module.module_key == MODULE_GRAYLOG_AUTO_RESOLVE)
.expect("graylog module should be present");
assert_eq!(graylog.name, "Polling Graylog + auto-resolve");
assert_eq!(
graylog.description,
"Surveille Graylog, score les sujets, et déclenche le pipeline analyste/developpeur."
);
assert!(graylog.enabled);
}
}