feat(graylog): add client commands and source-aware poller flow
This commit is contained in:
parent
ff09212177
commit
d561578e78
8 changed files with 958 additions and 50 deletions
209
src-tauri/src/commands/graylog.rs
Normal file
209
src-tauri/src/commands/graylog.rs
Normal file
|
|
@ -0,0 +1,209 @@
|
|||
use crate::error::AppError;
|
||||
use crate::models::graylog::{
|
||||
GraylogCredentials, GraylogCredentialsSafe, GraylogDetection, GraylogSubject,
|
||||
};
|
||||
use crate::services::crypto;
|
||||
use crate::services::graylog_client::GraylogClient;
|
||||
use crate::services::graylog_poller;
|
||||
use crate::AppState;
|
||||
use tauri::State;
|
||||
|
||||
fn validate_input(
|
||||
base_url: &str,
|
||||
analyst_agent_id: &str,
|
||||
developer_agent_id: &str,
|
||||
polling_interval_minutes: i32,
|
||||
lookback_minutes: i32,
|
||||
score_threshold: i32,
|
||||
) -> Result<(String, String, String), AppError> {
|
||||
let base_url = base_url.trim().to_string();
|
||||
let analyst_agent_id = analyst_agent_id.trim().to_string();
|
||||
let developer_agent_id = developer_agent_id.trim().to_string();
|
||||
|
||||
if base_url.is_empty() {
|
||||
return Err(AppError::from("Graylog URL is required".to_string()));
|
||||
}
|
||||
if analyst_agent_id.is_empty() {
|
||||
return Err(AppError::from("Analyst agent is required".to_string()));
|
||||
}
|
||||
if developer_agent_id.is_empty() {
|
||||
return Err(AppError::from("Developer agent is required".to_string()));
|
||||
}
|
||||
if polling_interval_minutes <= 0 {
|
||||
return Err(AppError::from(
|
||||
"Polling interval must be strictly positive".to_string(),
|
||||
));
|
||||
}
|
||||
if lookback_minutes <= 0 {
|
||||
return Err(AppError::from(
|
||||
"Lookback window must be strictly positive".to_string(),
|
||||
));
|
||||
}
|
||||
if !(1..=100).contains(&score_threshold) {
|
||||
return Err(AppError::from(
|
||||
"Score threshold must be between 1 and 100".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok((base_url, analyst_agent_id, developer_agent_id))
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn set_graylog_credentials(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
base_url: String,
|
||||
api_token: String,
|
||||
analyst_agent_id: String,
|
||||
developer_agent_id: String,
|
||||
stream_id: Option<String>,
|
||||
query_filter: String,
|
||||
polling_interval_minutes: i32,
|
||||
lookback_minutes: i32,
|
||||
score_threshold: i32,
|
||||
) -> Result<GraylogCredentialsSafe, AppError> {
|
||||
let (base_url, analyst_agent_id, developer_agent_id) = validate_input(
|
||||
&base_url,
|
||||
&analyst_agent_id,
|
||||
&developer_agent_id,
|
||||
polling_interval_minutes,
|
||||
lookback_minutes,
|
||||
score_threshold,
|
||||
)?;
|
||||
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
let token_encrypted = if api_token.trim().is_empty() {
|
||||
match GraylogCredentials::get_by_project(&db, &project_id)? {
|
||||
Some(existing) => existing.api_token_encrypted,
|
||||
None => {
|
||||
return Err(AppError::from(
|
||||
"API token is required for initial setup".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
crypto::encrypt(&state.encryption_key, api_token.trim()).map_err(AppError::from)?
|
||||
};
|
||||
|
||||
let creds = GraylogCredentials::upsert_for_project(
|
||||
&db,
|
||||
&project_id,
|
||||
&base_url,
|
||||
&token_encrypted,
|
||||
&analyst_agent_id,
|
||||
&developer_agent_id,
|
||||
stream_id.as_deref().map(str::trim).filter(|v| !v.is_empty()),
|
||||
query_filter.trim(),
|
||||
polling_interval_minutes,
|
||||
lookback_minutes,
|
||||
score_threshold,
|
||||
)?;
|
||||
|
||||
Ok(creds.to_safe())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub fn get_graylog_credentials(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
) -> Result<Option<GraylogCredentialsSafe>, AppError> {
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
Ok(GraylogCredentials::get_by_project(&db, &project_id)?.map(|creds| creds.to_safe()))
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub fn delete_graylog_credentials(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
) -> Result<(), AppError> {
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
GraylogCredentials::delete_for_project(&db, &project_id)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn test_graylog_connection(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
) -> Result<String, AppError> {
|
||||
let (base_url, token) = {
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
let creds = GraylogCredentials::get_by_project(&db, &project_id)?
|
||||
.ok_or_else(|| AppError::from("No Graylog credentials configured".to_string()))?;
|
||||
|
||||
let token = crypto::decrypt(&state.encryption_key, &creds.api_token_encrypted)
|
||||
.map_err(AppError::from)?;
|
||||
|
||||
(creds.base_url, token)
|
||||
};
|
||||
|
||||
let client = GraylogClient::new(&state.http_client, &base_url, &token);
|
||||
client.test_connection().await.map_err(AppError::from)?;
|
||||
|
||||
Ok("Connection successful".to_string())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn manual_graylog_poll(
|
||||
state: State<'_, AppState>,
|
||||
app_handle: tauri::AppHandle,
|
||||
project_id: String,
|
||||
) -> Result<i32, AppError> {
|
||||
graylog_poller::poll_project_once(
|
||||
&state.db,
|
||||
&state.encryption_key,
|
||||
&state.http_client,
|
||||
&app_handle,
|
||||
&project_id,
|
||||
)
|
||||
.await
|
||||
.map_err(AppError::from)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub fn list_graylog_subjects(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
) -> Result<Vec<GraylogSubject>, AppError> {
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
Ok(GraylogSubject::list_by_project(&db, &project_id)?)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub fn list_graylog_detections(
|
||||
state: State<'_, AppState>,
|
||||
project_id: String,
|
||||
subject_id: Option<String>,
|
||||
) -> Result<Vec<GraylogDetection>, AppError> {
|
||||
let db = state
|
||||
.db
|
||||
.lock()
|
||||
.map_err(|e| AppError::from(format!("Database lock failed: {}", e)))?;
|
||||
|
||||
Ok(GraylogDetection::list_by_project(
|
||||
&db,
|
||||
&project_id,
|
||||
subject_id.as_deref(),
|
||||
)?)
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
pub mod agent;
|
||||
pub mod credential;
|
||||
pub mod graylog;
|
||||
pub mod live_agent;
|
||||
pub mod module;
|
||||
pub mod notification;
|
||||
|
|
|
|||
|
|
@ -42,6 +42,13 @@ pub fn run() {
|
|||
|
||||
// Start background poller
|
||||
services::poller::start(
|
||||
db_arc.clone(),
|
||||
encryption_key,
|
||||
http_client.clone(),
|
||||
app.handle().clone(),
|
||||
);
|
||||
|
||||
services::graylog_poller::start(
|
||||
db_arc.clone(),
|
||||
encryption_key,
|
||||
http_client,
|
||||
|
|
@ -76,6 +83,13 @@ pub fn run() {
|
|||
commands::credential::get_tuleap_credentials,
|
||||
commands::credential::delete_tuleap_credentials,
|
||||
commands::credential::test_tuleap_connection,
|
||||
commands::graylog::set_graylog_credentials,
|
||||
commands::graylog::get_graylog_credentials,
|
||||
commands::graylog::delete_graylog_credentials,
|
||||
commands::graylog::test_graylog_connection,
|
||||
commands::graylog::manual_graylog_poll,
|
||||
commands::graylog::list_graylog_subjects,
|
||||
commands::graylog::list_graylog_detections,
|
||||
commands::tracker::add_tracker,
|
||||
commands::tracker::list_trackers,
|
||||
commands::tracker::update_tracker,
|
||||
|
|
|
|||
|
|
@ -128,6 +128,7 @@ fn detection_from_row(row: &rusqlite::Row) -> rusqlite::Result<GraylogDetection>
|
|||
}
|
||||
|
||||
impl GraylogCredentials {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn upsert_for_project(
|
||||
conn: &Connection,
|
||||
project_id: &str,
|
||||
|
|
@ -219,6 +220,30 @@ impl GraylogCredentials {
|
|||
.optional()
|
||||
}
|
||||
|
||||
pub fn list_all(conn: &Connection) -> Result<Vec<GraylogCredentials>> {
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT
|
||||
id,
|
||||
project_id,
|
||||
base_url,
|
||||
api_token_encrypted,
|
||||
analyst_agent_id,
|
||||
developer_agent_id,
|
||||
stream_id,
|
||||
query_filter,
|
||||
polling_interval_minutes,
|
||||
lookback_minutes,
|
||||
score_threshold,
|
||||
created_at,
|
||||
updated_at
|
||||
FROM graylog_credentials
|
||||
ORDER BY updated_at DESC, id DESC",
|
||||
)?;
|
||||
|
||||
let rows = stmt.query_map([], credentials_from_row)?;
|
||||
rows.collect()
|
||||
}
|
||||
|
||||
pub fn delete_for_project(conn: &Connection, project_id: &str) -> Result<()> {
|
||||
conn.execute(
|
||||
"DELETE FROM graylog_credentials WHERE project_id = ?1",
|
||||
|
|
|
|||
258
src-tauri/src/services/graylog_client.rs
Normal file
258
src-tauri/src/services/graylog_client.rs
Normal file
|
|
@ -0,0 +1,258 @@
|
|||
use crate::services::graylog_scoring::GraylogEvent;
|
||||
use serde_json::Value;
|
||||
use std::time::Instant;
|
||||
use tokio::time::{sleep, Duration};
|
||||
|
||||
pub struct GraylogClient {
|
||||
http: reqwest::Client,
|
||||
base_url: String,
|
||||
token: String,
|
||||
}
|
||||
|
||||
impl GraylogClient {
|
||||
pub fn new(http: &reqwest::Client, base_url: &str, token: &str) -> Self {
|
||||
Self {
|
||||
http: http.clone(),
|
||||
base_url: base_url.trim_end_matches('/').to_string(),
|
||||
token: token.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_get(&self, url: &str) -> Result<reqwest::Response, String> {
|
||||
const MAX_ATTEMPTS: u32 = 3;
|
||||
const BASE_DELAY_MS: u64 = 500;
|
||||
|
||||
for attempt in 1..=MAX_ATTEMPTS {
|
||||
let started_at = Instant::now();
|
||||
eprintln!(
|
||||
"[graylog] -> GET {} (attempt {}/{})",
|
||||
url, attempt, MAX_ATTEMPTS
|
||||
);
|
||||
|
||||
let response = self
|
||||
.http
|
||||
.get(url)
|
||||
.header("Authorization", format!("Bearer {}", self.token))
|
||||
.header("X-Requested-By", "orchai")
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let status = resp.status();
|
||||
eprintln!(
|
||||
"[graylog] <- GET {} | status={} | {}ms",
|
||||
url,
|
||||
status,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if (status == reqwest::StatusCode::TOO_MANY_REQUESTS
|
||||
|| status.is_server_error())
|
||||
&& attempt < MAX_ATTEMPTS
|
||||
{
|
||||
let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt - 1);
|
||||
eprintln!(
|
||||
"[graylog] ~~ retry GET {} in {}ms (status={})",
|
||||
url, delay_ms, status
|
||||
);
|
||||
sleep(Duration::from_millis(delay_ms)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(resp);
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!(
|
||||
"[graylog] xx GET {} | error={} | {}ms",
|
||||
url,
|
||||
err,
|
||||
started_at.elapsed().as_millis()
|
||||
);
|
||||
|
||||
if attempt < MAX_ATTEMPTS {
|
||||
let delay_ms = BASE_DELAY_MS * 2u64.pow(attempt - 1);
|
||||
eprintln!(
|
||||
"[graylog] ~~ retry GET {} in {}ms (error={})",
|
||||
url, delay_ms, err
|
||||
);
|
||||
sleep(Duration::from_millis(delay_ms)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
return Err(format!("graylog request failed: {}", err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err("graylog request failed after retries".to_string())
|
||||
}
|
||||
|
||||
pub async fn test_connection(&self) -> Result<(), String> {
|
||||
let url = format!("{}/api/system", self.base_url);
|
||||
let resp = self.send_get(&url).await?;
|
||||
|
||||
if resp.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!("graylog connection test failed: HTTP {}", resp.status()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn search_relative(
|
||||
&self,
|
||||
query: &str,
|
||||
stream_id: Option<&str>,
|
||||
range_seconds: i32,
|
||||
) -> Result<Vec<GraylogEvent>, String> {
|
||||
let normalized_range = range_seconds.max(60);
|
||||
let mut url = format!(
|
||||
"{}/api/search/universal/relative?query={}&range={}&limit=500",
|
||||
self.base_url,
|
||||
urlencoding::encode(query),
|
||||
normalized_range
|
||||
);
|
||||
|
||||
if let Some(stream) = stream_id.map(str::trim).filter(|value| !value.is_empty()) {
|
||||
url.push_str("&streams=");
|
||||
url.push_str(&urlencoding::encode(stream));
|
||||
}
|
||||
|
||||
let resp = self.send_get(&url).await?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(format!("graylog search failed: HTTP {}", resp.status()));
|
||||
}
|
||||
|
||||
let body: Value = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| format!("invalid graylog JSON: {}", e))?;
|
||||
|
||||
Ok(parse_search_response(&body))
|
||||
}
|
||||
}
|
||||
|
||||
fn level_to_string(value: &Value) -> String {
|
||||
match value {
|
||||
Value::String(s) => s.to_string(),
|
||||
Value::Number(n) => {
|
||||
let level = n.as_i64().unwrap_or(6);
|
||||
match level {
|
||||
i64::MIN..=2 => "critical".to_string(),
|
||||
3 => "error".to_string(),
|
||||
4 => "warning".to_string(),
|
||||
_ => "info".to_string(),
|
||||
}
|
||||
}
|
||||
other => other.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_search_response(body: &Value) -> Vec<GraylogEvent> {
|
||||
let rows = body
|
||||
.get("messages")
|
||||
.and_then(Value::as_array)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
rows.into_iter()
|
||||
.filter_map(|row| {
|
||||
let message = row.get("message")?;
|
||||
|
||||
let timestamp = message
|
||||
.get("timestamp")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let source = message
|
||||
.get("source")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let level = message
|
||||
.get("level")
|
||||
.map(level_to_string)
|
||||
.unwrap_or_else(|| "".to_string());
|
||||
let msg = message
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let service = message
|
||||
.get("service")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string);
|
||||
|
||||
if timestamp.is_empty() || source.is_empty() || msg.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(GraylogEvent {
|
||||
timestamp,
|
||||
source,
|
||||
service,
|
||||
level,
|
||||
message: msg,
|
||||
raw: message.clone(),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_search_response_extracts_events() {
|
||||
let payload = serde_json::json!({
|
||||
"messages": [
|
||||
{ "message": {
|
||||
"timestamp": "2026-04-17T10:00:00.000Z",
|
||||
"source": "api-1",
|
||||
"level": "error",
|
||||
"message": "timeout id=42",
|
||||
"service": "api"
|
||||
} }
|
||||
]
|
||||
});
|
||||
|
||||
let events = parse_search_response(&payload);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_eq!(events[0].source, "api-1");
|
||||
assert_eq!(events[0].service.as_deref(), Some("api"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_search_response_maps_numeric_level() {
|
||||
let payload = serde_json::json!({
|
||||
"messages": [
|
||||
{ "message": {
|
||||
"timestamp": "2026-04-17T10:00:00.000Z",
|
||||
"source": "worker-1",
|
||||
"level": 4,
|
||||
"message": "queue lag"
|
||||
} }
|
||||
]
|
||||
});
|
||||
|
||||
let events = parse_search_response(&payload);
|
||||
assert_eq!(events.len(), 1);
|
||||
assert_eq!(events[0].level, "warning");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_search_response_skips_incomplete_rows() {
|
||||
let payload = serde_json::json!({
|
||||
"messages": [
|
||||
{ "message": {
|
||||
"timestamp": "2026-04-17T10:00:00.000Z",
|
||||
"source": "api-1"
|
||||
} }
|
||||
]
|
||||
});
|
||||
|
||||
let events = parse_search_response(&payload);
|
||||
assert!(events.is_empty());
|
||||
}
|
||||
}
|
||||
305
src-tauri/src/services/graylog_poller.rs
Normal file
305
src-tauri/src/services/graylog_poller.rs
Normal file
|
|
@ -0,0 +1,305 @@
|
|||
use crate::models::graylog::{GraylogCredentials, GraylogDetection, GraylogSubject};
|
||||
use crate::models::module::{ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE};
|
||||
use crate::models::ticket::ProcessedTicket;
|
||||
use crate::services::crypto;
|
||||
use crate::services::graylog_client::GraylogClient;
|
||||
use crate::services::graylog_scoring::{group_subjects, SubjectAggregate};
|
||||
use rusqlite::Connection;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tauri::{AppHandle, Emitter};
|
||||
use tokio::time::{interval, Duration};
|
||||
|
||||
fn is_ticket_active(status: &str) -> bool {
|
||||
matches!(status, "Pending" | "Analyzing" | "Developing")
|
||||
}
|
||||
|
||||
fn should_trigger_subject(score: i32, threshold: i32, has_active_ticket: bool) -> bool {
|
||||
score >= threshold && !has_active_ticket
|
||||
}
|
||||
|
||||
fn synthetic_artifact_id(subject_key: &str) -> i32 {
|
||||
// Stable FNV-1a hash so the synthetic id remains deterministic across runs.
|
||||
const FNV_OFFSET: u64 = 0xcbf29ce484222325;
|
||||
const FNV_PRIME: u64 = 0x100000001b3;
|
||||
|
||||
let mut hash = FNV_OFFSET;
|
||||
for byte in subject_key.as_bytes() {
|
||||
hash ^= u64::from(*byte);
|
||||
hash = hash.wrapping_mul(FNV_PRIME);
|
||||
}
|
||||
|
||||
let raw = (hash & 0x3fff_ffff) as i32;
|
||||
-raw.max(1)
|
||||
}
|
||||
|
||||
fn subject_payload(aggregate: &SubjectAggregate) -> String {
|
||||
serde_json::json!({
|
||||
"source": aggregate.source,
|
||||
"normalized_message": aggregate.normalized_message,
|
||||
"counts": {
|
||||
"critical": aggregate.counts.critical,
|
||||
"error": aggregate.counts.error,
|
||||
"warning": aggregate.counts.warning,
|
||||
"total": aggregate.total_count,
|
||||
},
|
||||
"last_seen_at": aggregate.last_seen_at,
|
||||
"score": aggregate.score,
|
||||
"samples": aggregate.sample_events,
|
||||
})
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn emit_polling_error(app_handle: &AppHandle, project_id: &str, error: &str) {
|
||||
let _ = app_handle.emit(
|
||||
"graylog-polling-error",
|
||||
serde_json::json!({
|
||||
"project_id": project_id,
|
||||
"error": error,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
db: Arc<Mutex<Connection>>,
|
||||
encryption_key: [u8; 32],
|
||||
http_client: reqwest::Client,
|
||||
app_handle: AppHandle,
|
||||
) {
|
||||
tauri::async_runtime::spawn(async move {
|
||||
let mut tick = interval(Duration::from_secs(60));
|
||||
let mut last_polled_at: HashMap<String, chrono::DateTime<chrono::Utc>> = HashMap::new();
|
||||
|
||||
loop {
|
||||
tick.tick().await;
|
||||
|
||||
let credentials = {
|
||||
let conn = match db.lock() {
|
||||
Ok(conn) => conn,
|
||||
Err(err) => {
|
||||
eprintln!("graylog_poller: failed to lock db: {}", err);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match GraylogCredentials::list_all(&conn) {
|
||||
Ok(values) => values,
|
||||
Err(err) => {
|
||||
eprintln!("graylog_poller: failed to load credentials: {}", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now();
|
||||
let known_projects: HashSet<String> =
|
||||
credentials.iter().map(|item| item.project_id.clone()).collect();
|
||||
last_polled_at.retain(|project_id, _| known_projects.contains(project_id));
|
||||
|
||||
for credential in credentials {
|
||||
let project_id = credential.project_id.clone();
|
||||
let polling_interval_minutes = credential.polling_interval_minutes.max(1) as i64;
|
||||
|
||||
let due = match last_polled_at.get(&project_id) {
|
||||
Some(last_at) => {
|
||||
now.signed_duration_since(*last_at).num_minutes() >= polling_interval_minutes
|
||||
}
|
||||
None => true,
|
||||
};
|
||||
|
||||
if !due {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(err) = poll_project_once(
|
||||
&db,
|
||||
&encryption_key,
|
||||
&http_client,
|
||||
&app_handle,
|
||||
&project_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
eprintln!("graylog_poller: project {} failed: {}", project_id, err);
|
||||
emit_polling_error(&app_handle, &project_id, &err);
|
||||
}
|
||||
|
||||
last_polled_at.insert(project_id, now);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn poll_project_once(
|
||||
db: &Arc<Mutex<Connection>>,
|
||||
encryption_key: &[u8; 32],
|
||||
http_client: &reqwest::Client,
|
||||
app_handle: &AppHandle,
|
||||
project_id: &str,
|
||||
) -> Result<i32, String> {
|
||||
let credentials = {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
|
||||
let module_enabled = ProjectModule::is_enabled(&conn, project_id, MODULE_GRAYLOG_AUTO_RESOLVE)
|
||||
.map_err(|e| format!("module lookup failed: {}", e))?;
|
||||
|
||||
if !module_enabled {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
GraylogCredentials::get_by_project(&conn, project_id)
|
||||
.map_err(|e| format!("credentials lookup failed: {}", e))?
|
||||
.ok_or_else(|| "No Graylog credentials configured".to_string())?
|
||||
};
|
||||
|
||||
let token = crypto::decrypt(encryption_key, &credentials.api_token_encrypted)
|
||||
.map_err(|e| format!("token decrypt failed: {}", e))?;
|
||||
|
||||
let client = GraylogClient::new(http_client, &credentials.base_url, &token);
|
||||
let query = if credentials.query_filter.trim().is_empty() {
|
||||
"level:(critical OR error OR warning)".to_string()
|
||||
} else {
|
||||
credentials.query_filter.clone()
|
||||
};
|
||||
let range_seconds = credentials.lookback_minutes.max(1) * 60;
|
||||
|
||||
let _ = app_handle.emit(
|
||||
"graylog-polling-started",
|
||||
serde_json::json!({ "project_id": project_id }),
|
||||
);
|
||||
|
||||
let events = match client
|
||||
.search_relative(&query, credentials.stream_id.as_deref(), range_seconds)
|
||||
.await
|
||||
{
|
||||
Ok(events) => events,
|
||||
Err(err) => {
|
||||
emit_polling_error(app_handle, project_id, &err);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now();
|
||||
let window_start = (now - chrono::Duration::minutes(credentials.lookback_minutes as i64))
|
||||
.to_rfc3339();
|
||||
let window_end = now.to_rfc3339();
|
||||
|
||||
let aggregates = group_subjects(&events, now);
|
||||
let mut triggered_count = 0i32;
|
||||
|
||||
for aggregate in aggregates {
|
||||
let (subject, has_active_ticket) = {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
|
||||
let mut subject = GraylogSubject::upsert_subject(
|
||||
&conn,
|
||||
project_id,
|
||||
&aggregate.subject_key,
|
||||
&aggregate.source,
|
||||
&aggregate.normalized_message,
|
||||
&aggregate.last_seen_at,
|
||||
aggregate.score,
|
||||
)
|
||||
.map_err(|e| format!("upsert subject failed: {}", e))?;
|
||||
|
||||
let mut has_active_ticket = false;
|
||||
if let Some(active_ticket_id) = subject.active_ticket_id.clone() {
|
||||
match ProcessedTicket::get_by_id(&conn, &active_ticket_id) {
|
||||
Ok(ticket) if is_ticket_active(&ticket.status) => {
|
||||
has_active_ticket = true;
|
||||
}
|
||||
_ => {
|
||||
GraylogSubject::set_active_ticket(&conn, &subject.id, None, "idle")
|
||||
.map_err(|e| format!("clear stale subject ticket failed: {}", e))?;
|
||||
subject.active_ticket_id = None;
|
||||
subject.status = "idle".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(subject, has_active_ticket)
|
||||
};
|
||||
|
||||
let should_trigger = should_trigger_subject(
|
||||
aggregate.score,
|
||||
credentials.score_threshold,
|
||||
has_active_ticket,
|
||||
);
|
||||
|
||||
let triggered_ticket_id = if should_trigger {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
let ticket = ProcessedTicket::insert_external(
|
||||
&conn,
|
||||
project_id,
|
||||
"graylog",
|
||||
Some(&subject.id),
|
||||
synthetic_artifact_id(&aggregate.subject_key),
|
||||
&format!(
|
||||
"[Graylog] {} - {}",
|
||||
aggregate.source, aggregate.normalized_message
|
||||
),
|
||||
&subject_payload(&aggregate),
|
||||
)
|
||||
.map_err(|e| format!("insert graylog ticket failed: {}", e))?;
|
||||
|
||||
GraylogSubject::set_active_ticket(&conn, &subject.id, Some(&ticket.id), "queued")
|
||||
.map_err(|e| format!("set active ticket failed: {}", e))?;
|
||||
|
||||
triggered_count += 1;
|
||||
let _ = app_handle.emit(
|
||||
"graylog-subject-triggered",
|
||||
serde_json::json!({
|
||||
"project_id": project_id,
|
||||
"subject_id": subject.id,
|
||||
"ticket_id": ticket.id,
|
||||
"score": aggregate.score,
|
||||
}),
|
||||
);
|
||||
|
||||
Some(ticket.id)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
{
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
GraylogDetection::insert(
|
||||
&conn,
|
||||
&subject.id,
|
||||
&window_start,
|
||||
&window_end,
|
||||
aggregate.counts.critical,
|
||||
aggregate.counts.error,
|
||||
aggregate.counts.warning,
|
||||
aggregate.total_count,
|
||||
&aggregate.last_seen_at,
|
||||
aggregate.score,
|
||||
should_trigger,
|
||||
triggered_ticket_id.as_deref(),
|
||||
)
|
||||
.map_err(|e| format!("insert detection failed: {}", e))?;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = app_handle.emit(
|
||||
"graylog-polling-finished",
|
||||
serde_json::json!({
|
||||
"project_id": project_id,
|
||||
"triggered_count": triggered_count,
|
||||
}),
|
||||
);
|
||||
|
||||
Ok(triggered_count)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::should_trigger_subject;
|
||||
|
||||
#[test]
|
||||
fn test_should_trigger_subject_respects_active_ticket() {
|
||||
assert!(should_trigger_subject(82, 70, false));
|
||||
assert!(!should_trigger_subject(82, 70, true));
|
||||
assert!(!should_trigger_subject(60, 70, false));
|
||||
}
|
||||
}
|
||||
|
|
@ -2,6 +2,8 @@ pub mod agent_runtime;
|
|||
pub mod cli_process;
|
||||
pub mod crypto;
|
||||
pub mod filter_engine;
|
||||
pub mod graylog_client;
|
||||
pub mod graylog_poller;
|
||||
pub mod graylog_scoring;
|
||||
pub mod notifier;
|
||||
pub mod orchestrator;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
use crate::models::agent::{Agent, AgentRole};
|
||||
use crate::models::module::{ProjectModule, MODULE_TULEAP_AUTO_RESOLVE};
|
||||
use crate::models::graylog::GraylogCredentials;
|
||||
use crate::models::module::{
|
||||
ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE, MODULE_TULEAP_AUTO_RESOLVE,
|
||||
};
|
||||
use crate::models::project::Project;
|
||||
use crate::models::ticket::ProcessedTicket;
|
||||
use crate::models::tracker::WatchedTracker;
|
||||
|
|
@ -21,13 +24,16 @@ pub enum Verdict {
|
|||
}
|
||||
|
||||
pub fn build_analyst_prompt(ticket: &ProcessedTicket, project: &Project) -> String {
|
||||
let source_ref = ticket.source_ref.as_deref().unwrap_or("-");
|
||||
format!(
|
||||
r#"Tu es un analyste technique. Voici un ticket Tuleap a analyser.
|
||||
r#"Tu es un analyste technique. Voici un ticket a analyser.
|
||||
|
||||
## Ticket
|
||||
- ID: {artifact_id}
|
||||
- Titre: {title}
|
||||
- Donnees: {data}
|
||||
- Source: {source}
|
||||
- Source ref: {source_ref}
|
||||
|
||||
## Contexte
|
||||
- Projet: {project_name}
|
||||
|
|
@ -46,6 +52,8 @@ Termine ton rapport par un de ces verdicts sur une ligne separee:
|
|||
artifact_id = ticket.artifact_id,
|
||||
title = ticket.artifact_title,
|
||||
data = ticket.artifact_data,
|
||||
source = ticket.source,
|
||||
source_ref = source_ref,
|
||||
project_name = project.name,
|
||||
project_path = project.path,
|
||||
base_branch = project.base_branch,
|
||||
|
|
@ -58,6 +66,7 @@ pub fn build_developer_prompt(
|
|||
analyst_report: &str,
|
||||
worktree_path: &str,
|
||||
) -> String {
|
||||
let source_ref = ticket.source_ref.as_deref().unwrap_or("-");
|
||||
format!(
|
||||
r#"Tu es un developpeur. Tu dois corriger un bug ou implementer une fonctionnalite d'apres l'analyse suivante.
|
||||
|
||||
|
|
@ -67,6 +76,8 @@ pub fn build_developer_prompt(
|
|||
## Ticket
|
||||
- ID: {artifact_id}
|
||||
- Titre: {title}
|
||||
- Source: {source}
|
||||
- Source ref: {source_ref}
|
||||
|
||||
## Contexte
|
||||
- Projet: {project_name}
|
||||
|
|
@ -80,6 +91,8 @@ pub fn build_developer_prompt(
|
|||
analyst_report = analyst_report,
|
||||
artifact_id = ticket.artifact_id,
|
||||
title = ticket.artifact_title,
|
||||
source = ticket.source,
|
||||
source_ref = source_ref,
|
||||
project_name = project.name,
|
||||
worktree_path = worktree_path,
|
||||
base_branch = project.base_branch,
|
||||
|
|
@ -254,28 +267,56 @@ async fn process_ticket(
|
|||
app_handle: &AppHandle,
|
||||
process_registry: &ProcessRegistry,
|
||||
) -> Result<bool, String> {
|
||||
let (ticket, tracker, project) = {
|
||||
let (ticket, project, tracker) = {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
|
||||
let pending = ProcessedTicket::list_pending(&conn)
|
||||
.map_err(|e| format!("list_pending failed: {}", e))?;
|
||||
let mut selected: Option<(ProcessedTicket, WatchedTracker, Project)> = None;
|
||||
let mut selected: Option<(ProcessedTicket, Project, Option<WatchedTracker>)> = None;
|
||||
|
||||
for ticket in pending {
|
||||
let Some(tracker_id) = ticket.tracker_id.as_deref() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let tracker = WatchedTracker::get_by_id(&conn, tracker_id)
|
||||
.map_err(|e| format!("get tracker failed: {}", e))?;
|
||||
let project = Project::get_by_id(&conn, &tracker.project_id)
|
||||
let project = Project::get_by_id(&conn, &ticket.project_id)
|
||||
.map_err(|e| format!("get project failed: {}", e))?;
|
||||
|
||||
let enabled = ProjectModule::is_enabled(&conn, &project.id, MODULE_TULEAP_AUTO_RESOLVE)
|
||||
.map_err(|e| format!("module lookup failed: {}", e))?;
|
||||
if enabled {
|
||||
selected = Some((ticket, tracker, project));
|
||||
break;
|
||||
match ticket.source.as_str() {
|
||||
"tuleap" => {
|
||||
let tracker = match ticket.tracker_id.as_deref() {
|
||||
Some(tracker_id) => Some(
|
||||
WatchedTracker::get_by_id(&conn, tracker_id)
|
||||
.map_err(|e| format!("get tracker failed: {}", e))?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let enabled = ProjectModule::is_enabled(
|
||||
&conn,
|
||||
&project.id,
|
||||
MODULE_TULEAP_AUTO_RESOLVE,
|
||||
)
|
||||
.map_err(|e| format!("module lookup failed: {}", e))?;
|
||||
if enabled {
|
||||
selected = Some((ticket, project, tracker));
|
||||
break;
|
||||
}
|
||||
}
|
||||
"graylog" => {
|
||||
let enabled = ProjectModule::is_enabled(
|
||||
&conn,
|
||||
&project.id,
|
||||
MODULE_GRAYLOG_AUTO_RESOLVE,
|
||||
)
|
||||
.map_err(|e| format!("module lookup failed: {}", e))?;
|
||||
if enabled {
|
||||
selected = Some((ticket, project, None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
eprintln!(
|
||||
"orchestrator: unsupported ticket source '{}' for ticket {}",
|
||||
ticket.source, ticket.id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -286,50 +327,103 @@ async fn process_ticket(
|
|||
};
|
||||
|
||||
let (analyst_agent, developer_agent) = {
|
||||
if tracker.status != "valid" {
|
||||
let (analyst_id, developer_id) = if ticket.source == "graylog" {
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
let config = match GraylogCredentials::get_by_project(&conn, &project.id)
|
||||
.map_err(|e| format!("graylog credentials lookup failed: {}", e))?
|
||||
{
|
||||
Some(value) => value,
|
||||
None => {
|
||||
drop(conn);
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Graylog credentials are missing.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
drop(conn);
|
||||
(
|
||||
config.analyst_agent_id.to_string(),
|
||||
config.developer_agent_id.to_string(),
|
||||
)
|
||||
} else if ticket.source == "tuleap" {
|
||||
let tracker = match &tracker {
|
||||
Some(value) => value,
|
||||
None => {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Missing tracker reference for Tuleap ticket.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
if tracker.status != "valid" {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker is invalid. Configure analyst and developer agents.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let analyst_id = match tracker.analyst_agent_id.as_deref() {
|
||||
Some(id) => id.to_string(),
|
||||
None => {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker has no analyst agent configured.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
let developer_id = match tracker.developer_agent_id.as_deref() {
|
||||
Some(id) => id.to_string(),
|
||||
None => {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker has no developer agent configured.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
(analyst_id, developer_id)
|
||||
} else {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker is invalid. Configure analyst and developer agents.",
|
||||
&format!("Unsupported ticket source '{}'.", ticket.source),
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let analyst_id = match tracker.analyst_agent_id.as_deref() {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker has no analyst agent configured.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
let developer_id = match tracker.developer_agent_id.as_deref() {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
record_ticket_error(
|
||||
db,
|
||||
app_handle,
|
||||
&project.id,
|
||||
&ticket.id,
|
||||
ticket.artifact_id,
|
||||
"Tracker has no developer agent configured.",
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
};
|
||||
|
||||
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
|
||||
let analyst_agent = match Agent::get_by_id(&conn, analyst_id) {
|
||||
let analyst_agent = match Agent::get_by_id(&conn, &analyst_id) {
|
||||
Ok(agent) => agent,
|
||||
Err(_) => {
|
||||
drop(conn);
|
||||
|
|
@ -345,7 +439,7 @@ async fn process_ticket(
|
|||
}
|
||||
};
|
||||
|
||||
let developer_agent = match Agent::get_by_id(&conn, developer_id) {
|
||||
let developer_agent = match Agent::get_by_id(&conn, &developer_id) {
|
||||
Ok(agent) => agent,
|
||||
Err(_) => {
|
||||
drop(conn);
|
||||
|
|
|
|||
Loading…
Reference in a new issue