feat: mark exhausted agents and requeue token-limited tasks

This commit is contained in:
thibaud-lclr 2026-04-21 14:18:43 +02:00
parent ce37ce9ea0
commit 904e02367b
10 changed files with 393 additions and 19 deletions

View file

@ -0,0 +1,7 @@
ALTER TABLE agents ADD COLUMN runtime_status TEXT NOT NULL DEFAULT 'available';
ALTER TABLE agents ADD COLUMN exhausted_until TEXT;
ALTER TABLE agents ADD COLUMN runtime_error TEXT;
UPDATE agents
SET runtime_status = 'available'
WHERE runtime_status IS NULL OR trim(runtime_status) = '';

View file

@ -10,6 +10,7 @@ const MIGRATION_006: &str = include_str!("../migrations/006_processed_tickets_un
const MIGRATION_007: &str = include_str!("../migrations/007_normalize_timestamps_rfc3339.sql");
const MIGRATION_008: &str = include_str!("../migrations/008_project_scoped_tuleap_credentials.sql");
const MIGRATION_009: &str = include_str!("../migrations/009_graylog_auto_resolve.sql");
const MIGRATION_010: &str = include_str!("../migrations/010_agent_runtime_status.sql");
pub fn init(db_path: &Path) -> Result<Connection> {
let conn = Connection::open(db_path)?;
@ -71,6 +72,10 @@ fn migrate(conn: &Connection) -> Result<()> {
conn.execute_batch(MIGRATION_009)?;
conn.pragma_update(None, "user_version", 9)?;
}
if version < 10 {
conn.execute_batch(MIGRATION_010)?;
conn.pragma_update(None, "user_version", 10)?;
}
Ok(())
}
@ -129,7 +134,7 @@ mod tests {
let version: i32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, 9);
assert_eq!(version, 10);
}
#[test]

View file

@ -79,6 +79,26 @@ impl AgentTool {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AgentRuntimeStatus {
Available,
Exhausted,
}
impl AgentRuntimeStatus {
pub fn from_str(value: &str) -> Result<Self> {
match value {
"available" => Ok(AgentRuntimeStatus::Available),
"exhausted" => Ok(AgentRuntimeStatus::Exhausted),
_ => Err(rusqlite::Error::InvalidParameterName(format!(
"Invalid agent runtime status: {}",
value
))),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub id: String,
@ -89,12 +109,16 @@ pub struct Agent {
pub is_default: bool,
pub created_at: String,
pub updated_at: String,
pub runtime_status: AgentRuntimeStatus,
pub exhausted_until: Option<String>,
pub runtime_error: Option<String>,
}
fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Agent> {
let role_raw: String = row.get(2)?;
let tool_raw: String = row.get(3)?;
let is_default_int: i32 = row.get(5)?;
let runtime_status_raw: String = row.get(8)?;
Ok(Agent {
id: row.get(0)?,
@ -105,10 +129,33 @@ fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Agent> {
is_default: is_default_int != 0,
created_at: row.get(6)?,
updated_at: row.get(7)?,
runtime_status: AgentRuntimeStatus::from_str(&runtime_status_raw)?,
exhausted_until: row.get(9)?,
runtime_error: row.get(10)?,
})
}
impl Agent {
pub fn is_runtime_exhausted(&self) -> bool {
self.runtime_status == AgentRuntimeStatus::Exhausted
}
pub fn exhaustion_has_expired(&self) -> bool {
if !self.is_runtime_exhausted() {
return false;
}
let Some(until_raw) = self.exhausted_until.as_deref() else {
return false;
};
let Ok(parsed) = chrono::DateTime::parse_from_rfc3339(until_raw) else {
return false;
};
parsed.with_timezone(&chrono::Utc) <= chrono::Utc::now()
}
pub fn insert(
conn: &Connection,
name: &str,
@ -120,7 +167,9 @@ impl Agent {
let now = chrono::Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO agents (id, name, role, tool, custom_prompt, is_default, created_at, updated_at) VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7)",
"INSERT INTO agents \
(id, name, role, tool, custom_prompt, is_default, created_at, updated_at, runtime_status, exhausted_until, runtime_error) \
VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7, 'available', NULL, NULL)",
params![id, name, role.as_str(), tool.as_str(), custom_prompt, now, now],
)?;
@ -133,12 +182,15 @@ impl Agent {
is_default: false,
created_at: now.clone(),
updated_at: now,
runtime_status: AgentRuntimeStatus::Available,
exhausted_until: None,
runtime_error: None,
})
}
pub fn list(conn: &Connection) -> Result<Vec<Agent>> {
let mut stmt = conn.prepare(
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at, runtime_status, exhausted_until, runtime_error
FROM agents
ORDER BY role ASC, is_default DESC, created_at DESC",
)?;
@ -148,7 +200,9 @@ impl Agent {
pub fn get_by_id(conn: &Connection, id: &str) -> Result<Agent> {
conn.query_row(
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at FROM agents WHERE id = ?1",
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at, runtime_status, exhausted_until, runtime_error
FROM agents
WHERE id = ?1",
params![id],
from_row,
)
@ -161,7 +215,7 @@ impl Agent {
};
conn.query_row(
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at
"SELECT id, name, role, tool, custom_prompt, is_default, created_at, updated_at, runtime_status, exhausted_until, runtime_error
FROM agents
WHERE id = ?1 AND role = ?2 AND is_default = 1
LIMIT 1",
@ -207,6 +261,49 @@ impl Agent {
Ok(())
}
pub fn mark_exhausted(
conn: &Connection,
id: &str,
reason: &str,
exhausted_until: Option<&str>,
) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let affected = conn.execute(
"UPDATE agents
SET runtime_status = 'exhausted',
exhausted_until = ?1,
runtime_error = ?2,
updated_at = ?3
WHERE id = ?4",
params![exhausted_until, reason, now, id],
)?;
if affected == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Ok(())
}
pub fn mark_available(conn: &Connection, id: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let affected = conn.execute(
"UPDATE agents
SET runtime_status = 'available',
exhausted_until = NULL,
runtime_error = NULL,
updated_at = ?1
WHERE id = ?2",
params![now, id],
)?;
if affected == 0 {
return Err(rusqlite::Error::QueryReturnedNoRows);
}
Ok(())
}
pub fn delete(conn: &Connection, id: &str) -> Result<()> {
let agent = Self::get_by_id(conn, id)?;
@ -299,6 +396,9 @@ mod tests {
assert_eq!(found.tool, AgentTool::Codex);
assert_eq!(found.custom_prompt, "Focus on root cause.");
assert!(!found.is_default);
assert_eq!(found.runtime_status, AgentRuntimeStatus::Available);
assert!(found.exhausted_until.is_none());
assert!(found.runtime_error.is_none());
}
#[test]
@ -352,6 +452,53 @@ mod tests {
assert_eq!(updated.custom_prompt, "Prompt override");
}
#[test]
fn test_mark_exhausted_and_mark_available() {
let conn = setup();
let created = Agent::insert(
&conn,
"Developer Claude",
AgentRole::Developer,
AgentTool::ClaudeCode,
"",
)
.unwrap();
let until = (chrono::Utc::now() + chrono::Duration::minutes(45)).to_rfc3339();
Agent::mark_exhausted(&conn, &created.id, "quota reached", Some(&until)).unwrap();
let exhausted = Agent::get_by_id(&conn, &created.id).unwrap();
assert_eq!(exhausted.runtime_status, AgentRuntimeStatus::Exhausted);
assert_eq!(exhausted.exhausted_until.as_deref(), Some(until.as_str()));
assert_eq!(exhausted.runtime_error.as_deref(), Some("quota reached"));
assert!(!exhausted.exhaustion_has_expired());
Agent::mark_available(&conn, &created.id).unwrap();
let available = Agent::get_by_id(&conn, &created.id).unwrap();
assert_eq!(available.runtime_status, AgentRuntimeStatus::Available);
assert!(available.exhausted_until.is_none());
assert!(available.runtime_error.is_none());
}
#[test]
fn test_exhaustion_has_expired_when_until_is_in_the_past() {
let conn = setup();
let created = Agent::insert(
&conn,
"Developer Claude",
AgentRole::Developer,
AgentTool::ClaudeCode,
"",
)
.unwrap();
let past = (chrono::Utc::now() - chrono::Duration::minutes(1)).to_rfc3339();
Agent::mark_exhausted(&conn, &created.id, "rate limit", Some(&past)).unwrap();
let exhausted = Agent::get_by_id(&conn, &created.id).unwrap();
assert!(exhausted.exhaustion_has_expired());
}
#[test]
fn test_delete_default_agent_is_rejected() {
let conn = setup();

View file

@ -139,20 +139,24 @@ impl AgentTask {
Ok(())
}
pub fn retry(conn: &Connection, id: &str) -> Result<()> {
pub fn mark_pending(conn: &Connection, id: &str, note: Option<&str>) -> Result<()> {
conn.execute(
"UPDATE project_agent_tasks
SET status = 'pending',
result = NULL,
error = NULL,
error = ?1,
started_at = NULL,
finished_at = NULL
WHERE id = ?1",
params![id],
WHERE id = ?2",
params![note, id],
)?;
Ok(())
}
pub fn retry(conn: &Connection, id: &str) -> Result<()> {
Self::mark_pending(conn, id, None)
}
pub fn cancel(conn: &Connection, id: &str) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
conn.execute(

View file

@ -1,5 +1,7 @@
use crate::services::cli_process;
use crate::services::process_registry::ProcessRegistry;
use chrono::Duration as ChronoDuration;
use regex::Regex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
@ -7,6 +9,21 @@ use tokio::process::Command;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::{timeout, Duration};
const DEFAULT_EXHAUSTION_RETRY_AFTER_SECS: i64 = 60 * 60;
const MAX_EXHAUSTION_RETRY_AFTER_SECS: i64 = 7 * 24 * 60 * 60;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AgentExhaustionInfo {
pub reason: String,
pub retry_after: ChronoDuration,
}
impl AgentExhaustionInfo {
pub fn exhausted_until_rfc3339(&self) -> String {
(chrono::Utc::now() + self.retry_after).to_rfc3339()
}
}
fn normalize_process_stderr(stderr: &str) -> String {
let trimmed = stderr.trim();
if trimmed.is_empty() {
@ -20,6 +37,105 @@ fn normalize_process_stderr(stderr: &str) -> String {
trimmed.to_string()
}
fn parse_retry_after_seconds(message: &str) -> Option<i64> {
let unit_patterns = [
(
Regex::new(r"(?i)(\d+)\s*(?:d|day|days|jour|jours)").expect("valid day regex"),
86_400i64,
),
(
Regex::new(r"(?i)(\d+)\s*(?:h|hr|hrs|hour|hours|heure|heures)")
.expect("valid hour regex"),
3_600i64,
),
(
Regex::new(r"(?i)(\d+)\s*(?:m|min|mins|minute|minutes)").expect("valid minute regex"),
60i64,
),
(
Regex::new(r"(?i)(\d+)\s*(?:s|sec|secs|second|seconds|seconde|secondes)")
.expect("valid second regex"),
1i64,
),
];
let mut total_secs = 0i64;
let mut matched_any_unit = false;
for (regex, multiplier) in &unit_patterns {
for caps in regex.captures_iter(message) {
let Some(raw) = caps.get(1) else {
continue;
};
let Ok(value) = raw.as_str().parse::<i64>() else {
continue;
};
total_secs = total_secs.saturating_add(value.saturating_mul(*multiplier));
matched_any_unit = true;
}
}
if matched_any_unit && total_secs > 0 {
return Some(total_secs.min(MAX_EXHAUSTION_RETRY_AFTER_SECS));
}
let explicit_retry_after_seconds = Regex::new(
r"(?i)(?:retry|try again|next reset|available again|reessayez|réessayez|attendez)[^0-9]{0,24}(\d{1,6})\s*(?:s|sec|secs|second|seconds|seconde|secondes)",
)
.expect("valid explicit retry-after regex");
let secs = explicit_retry_after_seconds
.captures(message)
.and_then(|caps| caps.get(1))
.and_then(|raw| raw.as_str().parse::<i64>().ok())?;
if secs <= 0 {
return None;
}
Some(secs.min(MAX_EXHAUSTION_RETRY_AFTER_SECS))
}
pub fn detect_agent_exhaustion(error: &str) -> Option<AgentExhaustionInfo> {
let reason = error.trim();
if reason.is_empty() {
return None;
}
let lowered = reason.to_lowercase();
let exhaustion_markers = [
"insufficient_quota",
"quota exceeded",
"exceeded your current quota",
"usage limit",
"token limit",
"out of credits",
"not enough credits",
"credit balance",
"rate limit",
"too many requests",
"status code: 429",
"http 429",
"plus de token",
"tokens are exhausted",
"limite de quota",
"limite de taux",
];
if !exhaustion_markers
.iter()
.any(|marker| lowered.contains(marker))
{
return None;
}
let retry_after_seconds =
parse_retry_after_seconds(&lowered).unwrap_or(DEFAULT_EXHAUSTION_RETRY_AFTER_SECS);
Some(AgentExhaustionInfo {
reason: reason.to_string(),
retry_after: ChronoDuration::seconds(retry_after_seconds),
})
}
pub async fn run_agent_command(
command: &str,
args: &[String],
@ -260,6 +376,30 @@ mod tests {
assert_eq!(normalize_process_stderr(raw), "some other stderr");
}
#[test]
fn test_detect_agent_exhaustion_returns_none_for_unrelated_error() {
let err = "CLI command exited with code 1: repository has local changes";
assert!(detect_agent_exhaustion(err).is_none());
}
#[test]
fn test_detect_agent_exhaustion_detects_quota_and_default_retry() {
let err = "CLI command exited with code 1: insufficient_quota for this account";
let exhaustion = detect_agent_exhaustion(err).expect("must detect exhaustion");
assert_eq!(exhaustion.reason, err);
assert_eq!(
exhaustion.retry_after,
ChronoDuration::seconds(DEFAULT_EXHAUSTION_RETRY_AFTER_SECS)
);
}
#[test]
fn test_detect_agent_exhaustion_parses_retry_window() {
let err = "CLI command exited with code 1: Rate limit reached. Try again in 2h 30m 10s";
let exhaustion = detect_agent_exhaustion(err).expect("must detect exhaustion");
assert_eq!(exhaustion.retry_after, ChronoDuration::seconds(9_010));
}
#[tokio::test]
async fn test_run_agent_command_streaming_collects_chunks() {
let args = vec!["-c".to_string(), "cat".to_string()];

View file

@ -1193,6 +1193,9 @@ mod tests {
is_default: false,
created_at: "2026-01-01T00:00:00Z".into(),
updated_at: "2026-01-01T00:00:00Z".into(),
runtime_status: crate::models::agent::AgentRuntimeStatus::Available,
exhausted_until: None,
runtime_error: None,
}
}

View file

@ -73,31 +73,43 @@ async fn process_next_task(
let enabled =
ProjectModule::is_enabled(&conn, &task.project_id, MODULE_AGENT_TASK_RUNNER)
.map_err(|e| format!("task module lookup failed: {}", e))?;
if enabled {
selected = Some(task);
break;
if !enabled {
continue;
}
let agent = Agent::get_by_id(&conn, &task.agent_id)
.map_err(|e| format!("agent lookup failed: {}", e))?;
if agent.is_runtime_exhausted() {
if agent.exhaustion_has_expired() {
Agent::mark_available(&conn, &agent.id)
.map_err(|e| format!("agent availability reset failed: {}", e))?;
} else {
continue;
}
}
selected = Some((task, agent));
break;
}
selected
};
let task = match next_task {
Some(task) => task,
let (task, agent) = match next_task {
Some(payload) => payload,
None => return Ok(false),
};
let (project, agent) = {
let project = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::mark_running(&conn, &task.id)
.map_err(|e| format!("mark task running failed: {}", e))?;
let project = Project::get_by_id(&conn, &task.project_id)
.map_err(|e| format!("project lookup failed: {}", e))?;
let agent = Agent::get_by_id(&conn, &task.agent_id)
.map_err(|e| format!("agent lookup failed: {}", e))?;
(project, agent)
project
};
emit_status(app_handle, &task.project_id, &task.id, "running", None);
@ -143,6 +155,28 @@ async fn process_next_task(
return Ok(true);
}
if let Some(exhaustion) = agent_runtime::detect_agent_exhaustion(&error) {
let exhausted_until = exhaustion.exhausted_until_rfc3339();
let waiting_note = format!(
"Agent '{}' est épuisé (quota/token). Tâche remise en attente jusqu'à {}.",
agent.name, exhausted_until
);
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
Agent::mark_exhausted(&conn, &agent.id, &exhaustion.reason, Some(&exhausted_until))
.map_err(|e| format!("mark agent exhausted failed: {}", e))?;
AgentTask::mark_pending(&conn, &task.id, Some(&waiting_note))
.map_err(|e| format!("requeue task failed: {}", e))?;
emit_status(
app_handle,
&task.project_id,
&task.id,
"pending",
Some(&waiting_note),
);
return Ok(true);
}
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
AgentTask::mark_error(&conn, &task.id, &error)
.map_err(|e| format!("mark task error failed: {}", e))?;

View file

@ -58,6 +58,16 @@ export default function AgentList() {
return tool === "codex" ? "Codex" : "Claude Code";
}
function runtimeLabel(agent: Agent): string {
if (agent.runtime_status === "exhausted") {
if (agent.exhausted_until) {
return `Épuisé jusqu'au ${new Date(agent.exhausted_until).toLocaleString()}`;
}
return "Épuisé";
}
return "Disponible";
}
return (
<div className={pageClass}>
<div className="mb-6 flex items-center justify-between gap-3">
@ -106,6 +116,15 @@ export default function AgentList() {
<span className="rounded-full bg-blue-50 px-2 py-0.5 text-xs text-blue-700">
{toolLabel(agent.tool)}
</span>
<span
className={`rounded-full px-2 py-0.5 text-xs ${
agent.runtime_status === "exhausted"
? "bg-orange-100 text-orange-800"
: "bg-emerald-100 text-emerald-700"
}`}
>
{runtimeLabel(agent)}
</span>
</div>
{agent.custom_prompt.trim() ? (
<p className="mt-2 line-clamp-3 text-xs text-gray-500">

View file

@ -46,6 +46,10 @@ export default function ProjectTasks() {
() => agents.filter((agent) => agent.role === "analyst" || agent.role === "developer"),
[agents]
);
const selectedAgent = useMemo(
() => usableAgents.find((agent) => agent.id === agentId) ?? null,
[usableAgents, agentId]
);
async function refresh() {
if (!projectId) return;
@ -195,6 +199,11 @@ export default function ProjectTasks() {
Le module est désactivé. Les tâches existantes restent visibles, mais la création et la relance sont bloquées.
</div>
)}
{selectedAgent?.runtime_status === "exhausted" && (
<div className={noticeClass("warning", true)}>
L&apos;agent sélectionné est épuisé. La tâche restera en attente jusqu&apos;à reprise de son quota.
</div>
)}
<form onSubmit={handleCreateTask} className={cardContentClass}>
<h3 className="mb-3 text-sm font-semibold text-gray-800">Créer une tâche</h3>
@ -206,7 +215,9 @@ export default function ProjectTasks() {
>
{usableAgents.map((agent) => (
<option key={agent.id} value={agent.id}>
{agent.name} ({agent.tool})
{agent.name} ({agent.tool}
{agent.runtime_status === "exhausted" ? ", épuisé" : ""}
)
</option>
))}
</select>

View file

@ -16,6 +16,7 @@ export interface TuleapCredentialsSafe {
export type AgentRole = "analyst" | "developer";
export type AgentTool = "codex" | "claude_code";
export type AgentRuntimeStatus = "available" | "exhausted";
export interface Agent {
id: string;
@ -26,6 +27,9 @@ export interface Agent {
is_default: boolean;
created_at: string;
updated_at: string;
runtime_status: AgentRuntimeStatus;
exhausted_until: string | null;
runtime_error: string | null;
}
export interface Filter {