orchai/src-tauri/src/services/orchestrator.rs

1556 lines
50 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use crate::models::agent::{Agent, AgentRole, AgentTool};
use crate::models::graylog::GraylogCredentials;
use crate::models::module::{
ProjectModule, MODULE_GRAYLOG_AUTO_RESOLVE, MODULE_TULEAP_AUTO_RESOLVE,
};
use crate::models::project::Project;
use crate::models::ticket::ProcessedTicket;
use crate::models::tracker::WatchedTracker;
use crate::models::worktree::Worktree;
use crate::services::process_registry::ProcessRegistry;
use crate::services::{cli_process, notifier, worktree_manager};
use rusqlite::Connection;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tauri::{AppHandle, Emitter};
use tokio::process::Command;
use tokio::sync::Mutex as AsyncMutex;
use tokio::time::{interval, timeout, Duration};
const PROGRESS_BUFFER_MAX_BYTES: usize = 2048;
const PROGRESS_EMIT_INTERVAL_MS: u128 = 250;
#[derive(Debug, Clone, PartialEq)]
pub enum Verdict {
FixNeeded,
NoFix,
}
pub fn build_analyst_prompt(ticket: &ProcessedTicket, project: &Project) -> String {
let source_ref = ticket.source_ref.as_deref().unwrap_or("-");
format!(
r#"Tu es un analyste technique. Voici un ticket a analyser.
## Ticket
- ID: {artifact_id}
- Titre: {title}
- Donnees: {data}
- Source: {source}
- Source ref: {source_ref}
## Contexte
- Projet: {project_name}
- Repo: {project_path}
- Branche de base: {base_branch}
## Ta mission
1. Analyse le ticket et identifie les fichiers/fonctions concernes
2. Explique techniquement le probleme
3. Evalue si une correction de code est necessaire
4. Produis un rapport structure en markdown
## Format de sortie obligatoire
- Ecris un rapport en markdown avec des titres, des sous-titres et des listes.
- Laisse une ligne vide entre chaque section.
- Mets les labels importants en gras (ex: **Impact**, **Cause racine**).
- Evite les gros paragraphes: maximum 4 lignes par paragraphe.
- Respecte cette structure:
# Analyse ticket #{artifact_id} - {title}
## Resume executif
- **Constat:**
- **Impact:**
- **Urgence:**
## Diagnostic technique
### Cause racine
...
### Indices observables
- ...
## Zone de code probable
- `chemin/fichier.ext` - justification
## Plan de correction
1. ...
2. ...
## Risques et validations
- **Risques:**
- **Tests a executer:**
## Conclusion
- **Decision:** FIX_NEEDED ou NO_FIX
- **Rationale courte:** ...
Termine ton rapport par un de ces verdicts sur une ligne separee:
[VERDICT: FIX_NEEDED] si une correction de code est necessaire
[VERDICT: NO_FIX] si aucune correction n'est necessaire"#,
artifact_id = ticket.artifact_id,
title = ticket.artifact_title,
data = ticket.artifact_data,
source = ticket.source,
source_ref = source_ref,
project_name = project.name,
project_path = project.path,
base_branch = project.base_branch,
)
}
pub fn build_developer_prompt(
ticket: &ProcessedTicket,
project: &Project,
analyst_report: &str,
worktree_path: &str,
) -> String {
let source_ref = ticket.source_ref.as_deref().unwrap_or("-");
format!(
r#"Tu es un developpeur. Tu dois corriger un bug ou implementer une fonctionnalite d'apres l'analyse suivante.
## Rapport d'analyse
{analyst_report}
## Ticket
- ID: {artifact_id}
- Titre: {title}
- Source: {source}
- Source ref: {source_ref}
## Contexte
- Projet: {project_name}
- Repo (worktree): {worktree_path}
- Branche de base: {base_branch}
## Ta mission
1. Implemente la correction dans le code
2. Fais des commits atomiques avec des messages clairs
3. Produis un rapport en markdown decrivant les changements effectues"#,
analyst_report = analyst_report,
artifact_id = ticket.artifact_id,
title = ticket.artifact_title,
source = ticket.source,
source_ref = source_ref,
project_name = project.name,
worktree_path = worktree_path,
base_branch = project.base_branch,
)
}
pub fn build_review_prompt(
ticket: &ProcessedTicket,
project: &Project,
analyst_report: &str,
developer_report: &str,
worktree_path: &str,
branch_name: &str,
) -> String {
let source_ref = ticket.source_ref.as_deref().unwrap_or("-");
format!(
r#"Tu es un reviewer technique. Tu dois valider la correction proposee par le developpeur.
## Ticket
- ID: {artifact_id}
- Titre: {title}
- Source: {source}
- Source ref: {source_ref}
## Contexte
- Projet: {project_name}
- Repo (worktree): {worktree_path}
- Branche de base: {base_branch}
- Branche de travail: {branch_name}
## Rapport analyste
{analyst_report}
## Rapport developpeur
{developer_report}
## Ta mission
1. Verifie la coherence entre l'analyse, le correctif et le ticket.
2. Verifie la qualite des changements (risques, dette, regressions possibles, tests manquants).
3. Produis un rapport markdown structuré avec:
- Synthese
- Points conformes
- Risques / points a corriger
- Verdict final"#,
artifact_id = ticket.artifact_id,
title = ticket.artifact_title,
source = ticket.source,
source_ref = source_ref,
project_name = project.name,
worktree_path = worktree_path,
base_branch = project.base_branch,
branch_name = branch_name,
analyst_report = analyst_report,
developer_report = developer_report,
)
}
fn append_custom_prompt(base_prompt: String, custom_prompt: &str) -> String {
let extra = custom_prompt.trim();
if extra.is_empty() {
return base_prompt;
}
format!(
"{base_prompt}\n\n## Instructions supplementaires (agent)\n{extra}",
base_prompt = base_prompt,
extra = extra
)
}
fn record_ticket_error(
db: &Arc<Mutex<Connection>>,
app_handle: &AppHandle,
project_id: &str,
ticket_id: &str,
artifact_id: i32,
error: &str,
) {
if let Ok(conn) = db.lock() {
let _ = ProcessedTicket::set_error(&conn, ticket_id, error);
}
notifier::notify_error(db, app_handle, project_id, ticket_id, artifact_id, error);
let _ = app_handle.emit(
"ticket-processing-error",
serde_json::json!({
"project_id": project_id,
"ticket_id": ticket_id,
"artifact_id": artifact_id,
"error": error
}),
);
}
pub fn parse_verdict(report: &str) -> Verdict {
for line in report.lines().rev() {
let trimmed = line.trim();
if trimmed.contains("[VERDICT: NO_FIX]") {
return Verdict::NoFix;
}
if trimmed.contains("[VERDICT: FIX_NEEDED]") {
return Verdict::FixNeeded;
}
}
Verdict::FixNeeded
}
fn resolve_path_from_working_dir(working_dir: &Path, path: &str) -> Option<PathBuf> {
let trimmed = path.trim();
if trimmed.is_empty() {
return None;
}
let candidate = PathBuf::from(trimmed);
if candidate.is_absolute() {
Some(candidate)
} else {
Some(working_dir.join(candidate))
}
}
fn codex_additional_writable_dirs(working_dir: &str) -> Vec<String> {
let working_dir_path = Path::new(working_dir);
let normalized_working_dir =
std::fs::canonicalize(working_dir_path).unwrap_or_else(|_| working_dir_path.to_path_buf());
let mut dirs = vec![normalized_working_dir.to_string_lossy().to_string()];
let output = std::process::Command::new("git")
.args(["rev-parse", "--git-dir", "--git-common-dir"])
.current_dir(working_dir)
.output();
let output = match output {
Ok(value) if value.status.success() => value,
_ => return dirs,
};
for line in String::from_utf8_lossy(&output.stdout).lines() {
let Some(path) = resolve_path_from_working_dir(working_dir_path, line) else {
continue;
};
let normalized = std::fs::canonicalize(&path).unwrap_or(path);
if normalized.starts_with(&normalized_working_dir) {
continue;
}
let normalized = normalized.to_string_lossy().to_string();
if !dirs.contains(&normalized) {
dirs.push(normalized);
}
}
dirs
}
fn build_agent_cli_args(agent: &Agent, working_dir: &str) -> Vec<String> {
let mut args = agent.tool.to_non_interactive_args();
if matches!(agent.tool, AgentTool::Codex | AgentTool::ClaudeCode) {
for dir in codex_additional_writable_dirs(working_dir) {
args.push("--add-dir".to_string());
args.push(dir);
}
}
args
}
fn developer_report_indicates_permission_block(report: &str) -> bool {
let lowered = report.to_lowercase();
[
"refus d'autorisation",
"permission d'edition",
"permission dedition",
"permission denied",
"read-only file system",
"operation not permitted",
"cannot touch",
]
.iter()
.any(|needle| lowered.contains(needle))
}
fn evaluate_developer_completion(
developer_report: &str,
commit_count: usize,
has_diff: bool,
) -> Result<(), String> {
if developer_report_indicates_permission_block(developer_report) {
return Err("Developer agent reported a worktree write permission issue.".to_string());
}
if commit_count == 0 {
return Err(
"Developer run completed without any commit in the worktree branch.".to_string(),
);
}
if !has_diff {
return Err(
"Developer run completed without any diff against the base branch.".to_string(),
);
}
Ok(())
}
fn validate_developer_completion(
project: &Project,
branch_name: &str,
developer_report: &str,
) -> Result<(), String> {
if developer_report_indicates_permission_block(developer_report) {
return evaluate_developer_completion(developer_report, 0, false);
}
let commits = worktree_manager::list_commits(&project.path, &project.base_branch, branch_name)
.map_err(|e| format!("Failed to verify developer commits: {}", e))?;
let diff = worktree_manager::get_diff(&project.path, &project.base_branch, branch_name)
.map_err(|e| format!("Failed to verify developer diff: {}", e))?;
evaluate_developer_completion(developer_report, commits.len(), !diff.trim().is_empty())
}
fn should_flush_progress_buffer(
buffer: &str,
elapsed_since_last_emit: std::time::Duration,
) -> bool {
if buffer.is_empty() {
return false;
}
buffer.len() >= PROGRESS_BUFFER_MAX_BYTES
|| elapsed_since_last_emit.as_millis() >= PROGRESS_EMIT_INTERVAL_MS
}
fn recover_interrupted_tickets(db: &Arc<Mutex<Connection>>) -> Result<usize, String> {
let inflight = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
ProcessedTicket::list_inflight(&conn).map_err(|e| format!("list_inflight failed: {}", e))?
};
let mut recovered = 0usize;
for ticket in inflight {
let cleanup_target = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let worktree = Worktree::get_by_ticket_id(&conn, &ticket.id)
.map_err(|e| format!("get worktree failed: {}", e))?;
let project = Project::get_by_id(&conn, &ticket.project_id)
.map_err(|e| format!("get project failed: {}", e))?;
worktree.map(|wt| (wt, project.path))
};
if let Some((worktree, project_path)) = cleanup_target {
if worktree.status == "Active" {
// Best effort cleanup so a re-queued ticket can create a fresh worktree.
let _ = worktree_manager::delete_worktree(
&project_path,
&worktree.path,
&worktree.branch_name,
);
}
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let _ = Worktree::delete(&conn, &worktree.id);
}
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
ProcessedTicket::reset_for_retry(&conn, &ticket.id)
.map_err(|e| format!("reset_for_retry failed: {}", e))?;
recovered += 1;
}
Ok(recovered)
}
pub struct TicketCliContext<'a> {
pub app_handle: &'a AppHandle,
pub ticket_id: &'a str,
pub process_registry: &'a ProcessRegistry,
}
pub async fn run_cli_command(
command: &str,
args: &[String],
prompt: &str,
working_dir: &str,
timeout_secs: u64,
context: TicketCliContext<'_>,
) -> Result<String, String> {
let TicketCliContext {
app_handle,
ticket_id,
process_registry,
} = context;
let child = Command::new(command)
.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(working_dir)
.spawn()
.map_err(|e| format!("Failed to spawn '{}': {}", command, e))?;
let child = Arc::new(AsyncMutex::new(child));
let cancellation_requested = Arc::new(AtomicBool::new(false));
process_registry.register_ticket(ticket_id, child.clone(), cancellation_requested.clone());
let (stdin, stdout, stderr) = {
let mut child_guard = child.lock().await;
let stdin = child_guard.stdin.take();
let stdout = child_guard.stdout.take();
let stderr = child_guard.stderr.take();
(stdin, stdout, stderr)
};
if let Some(mut stdin) = stdin {
use tokio::io::AsyncWriteExt;
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
process_registry.unregister_ticket(ticket_id);
format!("Failed to write to stdin: {}", e)
})?;
}
let stdout = stdout.ok_or_else(|| {
process_registry.unregister_ticket(ticket_id);
"Failed to capture stdout".to_string()
})?;
let stderr = stderr.ok_or_else(|| {
process_registry.unregister_ticket(ticket_id);
"Failed to capture stderr".to_string()
})?;
let mut progress_buffer = String::new();
let mut last_progress_emit = std::time::Instant::now();
let read_future = cli_process::collect_process_output(child.clone(), stdout, stderr, |line| {
if !line.is_empty() {
if !progress_buffer.is_empty() {
progress_buffer.push('\n');
}
progress_buffer.push_str(line);
}
if should_flush_progress_buffer(&progress_buffer, last_progress_emit.elapsed()) {
let chunk = mem::take(&mut progress_buffer);
let _ = app_handle.emit(
"ticket-processing-progress",
serde_json::json!({
"ticket_id": ticket_id,
"output_chunk": chunk,
}),
);
last_progress_emit = std::time::Instant::now();
}
Ok(())
});
let (result, stderr_output, status) =
match timeout(Duration::from_secs(timeout_secs), read_future).await {
Ok(result) => match result {
Ok(values) => values,
Err(e) => {
process_registry.unregister_ticket(ticket_id);
return Err(e);
}
},
Err(_) => {
cancellation_requested.store(true, Ordering::SeqCst);
{
let mut child_guard = child.lock().await;
let _ = child_guard.start_kill();
}
process_registry.unregister_ticket(ticket_id);
return Err(format!("CLI command timed out after {}s", timeout_secs));
}
};
if !progress_buffer.is_empty() {
let chunk = mem::take(&mut progress_buffer);
let _ = app_handle.emit(
"ticket-processing-progress",
serde_json::json!({
"ticket_id": ticket_id,
"output_chunk": chunk,
}),
);
}
process_registry.unregister_ticket(ticket_id);
if cancellation_requested.load(Ordering::SeqCst) {
return Err("CLI command cancelled".to_string());
}
if !status.success() {
let stderr = stderr_output.trim();
let code = status.code().unwrap_or(-1);
if stderr.is_empty() {
return Err(format!("CLI command exited with code {}", code));
}
return Err(format!("CLI command exited with code {}: {}", code, stderr));
}
Ok(result)
}
fn is_ticket_cancelled(db: &Arc<Mutex<Connection>>, ticket_id: &str) -> Result<bool, String> {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let current =
ProcessedTicket::get_by_id(&conn, ticket_id).map_err(|e| format!("get_by_id: {}", e))?;
Ok(current.status == "Cancelled")
}
async fn process_ticket(
db: &Arc<Mutex<Connection>>,
app_handle: &AppHandle,
process_registry: &ProcessRegistry,
) -> Result<bool, String> {
let (ticket, project, tracker) = {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let pending = ProcessedTicket::list_pending(&conn)
.map_err(|e| format!("list_pending failed: {}", e))?;
let mut selected: Option<(ProcessedTicket, Project, Option<WatchedTracker>)> = None;
for ticket in pending {
let project = Project::get_by_id(&conn, &ticket.project_id)
.map_err(|e| format!("get project failed: {}", e))?;
match ticket.source.as_str() {
"tuleap" => {
let tracker = match ticket.tracker_id.as_deref() {
Some(tracker_id) => Some(
WatchedTracker::get_by_id(&conn, tracker_id)
.map_err(|e| format!("get tracker failed: {}", e))?,
),
None => None,
};
let enabled =
ProjectModule::is_enabled(&conn, &project.id, MODULE_TULEAP_AUTO_RESOLVE)
.map_err(|e| format!("module lookup failed: {}", e))?;
if enabled {
selected = Some((ticket, project, tracker));
break;
}
}
"graylog" => {
let enabled =
ProjectModule::is_enabled(&conn, &project.id, MODULE_GRAYLOG_AUTO_RESOLVE)
.map_err(|e| format!("module lookup failed: {}", e))?;
if enabled {
selected = Some((ticket, project, None));
break;
}
}
_ => {
eprintln!(
"orchestrator: unsupported ticket source '{}' for ticket {}",
ticket.source, ticket.id
);
}
}
}
match selected {
Some(item) => item,
None => return Ok(false),
}
};
let (analyst_agent, developer_agent, reviewer_agent) = {
let (analyst_id, developer_id, reviewer_id) = if ticket.source == "graylog" {
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let config = match GraylogCredentials::get_by_project(&conn, &project.id)
.map_err(|e| format!("graylog credentials lookup failed: {}", e))?
{
Some(value) => value,
None => {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Graylog credentials are missing.",
);
return Ok(true);
}
};
drop(conn);
(
config.analyst_agent_id.to_string(),
config.developer_agent_id.to_string(),
config.reviewer_agent_id.to_string(),
)
} else if ticket.source == "tuleap" {
let tracker = match &tracker {
Some(value) => value,
None => {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Missing tracker reference for Tuleap ticket.",
);
return Ok(true);
}
};
if tracker.status != "valid" {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Tracker is invalid. Configure analyst, developer and reviewer agents.",
);
return Ok(true);
}
let analyst_id = match tracker.analyst_agent_id.as_deref() {
Some(id) => id.to_string(),
None => {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Tracker has no analyst agent configured.",
);
return Ok(true);
}
};
let developer_id = match tracker.developer_agent_id.as_deref() {
Some(id) => id.to_string(),
None => {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Tracker has no developer agent configured.",
);
return Ok(true);
}
};
let reviewer_id = match tracker.reviewer_agent_id.as_deref() {
Some(id) => id.to_string(),
None => {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Tracker has no reviewer agent configured.",
);
return Ok(true);
}
};
(analyst_id, developer_id, reviewer_id)
} else {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
&format!("Unsupported ticket source '{}'.", ticket.source),
);
return Ok(true);
};
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
let analyst_agent = match Agent::get_by_id(&conn, &analyst_id) {
Ok(agent) => agent,
Err(_) => {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured analyst agent was not found.",
);
return Ok(true);
}
};
let developer_agent = match Agent::get_by_id(&conn, &developer_id) {
Ok(agent) => agent,
Err(_) => {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured developer agent was not found.",
);
return Ok(true);
}
};
let reviewer_agent = match Agent::get_by_id(&conn, &reviewer_id) {
Ok(agent) => agent,
Err(_) => {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured reviewer agent was not found.",
);
return Ok(true);
}
};
if analyst_agent.role != AgentRole::Analyst {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured analyst agent has an invalid role.",
);
return Ok(true);
}
if developer_agent.role != AgentRole::Developer {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured developer agent has an invalid role.",
);
return Ok(true);
}
if reviewer_agent.role != AgentRole::Reviewer {
drop(conn);
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
"Configured reviewer agent has an invalid role.",
);
return Ok(true);
}
(analyst_agent, developer_agent, reviewer_agent)
};
{
let conn = db.lock().map_err(|e| format!("DB lock failed: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Analyzing")
.map_err(|e| format!("update_status failed: {}", e))?;
}
let _ = app_handle.emit(
"ticket-processing-started",
serde_json::json!({
"project_id": &project.id,
"ticket_id": &ticket.id,
"artifact_id": ticket.artifact_id,
"step": "analyst",
}),
);
let analyst_prompt = append_custom_prompt(
build_analyst_prompt(&ticket, &project),
&analyst_agent.custom_prompt,
);
let analyst_args = build_agent_cli_args(&analyst_agent, &project.path);
let analyst_result = run_cli_command(
analyst_agent.tool.to_command(),
&analyst_args,
&analyst_prompt,
&project.path,
600,
TicketCliContext {
app_handle,
ticket_id: &ticket.id,
process_registry,
},
)
.await;
let analyst_report = match analyst_result {
Ok(report) => report,
Err(e) => {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
&e,
);
return Ok(true);
}
};
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_analyst_report(&conn, &ticket.id, &analyst_report)
.map_err(|e| format!("set_analyst_report: {}", e))?;
}
let verdict = parse_verdict(&analyst_report);
if verdict == Verdict::NoFix {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Done")
.map_err(|e| format!("update_status: {}", e))?;
}
let _ = app_handle.emit(
"ticket-processing-done",
serde_json::json!({
"project_id": &project.id,
"ticket_id": &ticket.id,
"artifact_id": ticket.artifact_id,
}),
);
notifier::notify_analysis_done(db, app_handle, &project.id, &ticket.id, ticket.artifact_id);
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
let current = ProcessedTicket::get_by_id(&conn, &ticket.id)
.map_err(|e| format!("get_by_id: {}", e))?;
if current.status == "Cancelled" {
return Ok(true);
}
}
let worktree_result =
worktree_manager::create_worktree(&project.path, &project.base_branch, ticket.artifact_id);
if let Err(e) = &worktree_result {
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
e,
);
}
let (wt_path, branch_name) = worktree_result?;
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_worktree_info(&conn, &ticket.id, &wt_path, &branch_name)
.map_err(|e| format!("set_worktree_info: {}", e))?;
Worktree::insert(&conn, &ticket.id, &wt_path, &branch_name)
.map_err(|e| format!("insert worktree: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Developing")
.map_err(|e| format!("update_status: {}", e))?;
}
let _ = app_handle.emit(
"ticket-processing-started",
serde_json::json!({
"project_id": &project.id,
"ticket_id": &ticket.id,
"artifact_id": ticket.artifact_id,
"step": "developer",
}),
);
let developer_prompt = append_custom_prompt(
build_developer_prompt(&ticket, &project, &analyst_report, &wt_path),
&developer_agent.custom_prompt,
);
let developer_args = build_agent_cli_args(&developer_agent, &wt_path);
let developer_result = run_cli_command(
developer_agent.tool.to_command(),
&developer_args,
&developer_prompt,
&wt_path,
600,
TicketCliContext {
app_handle,
ticket_id: &ticket.id,
process_registry,
},
)
.await;
let developer_report = match developer_result {
Ok(report) => report,
Err(e) => {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
&e,
);
return Ok(true);
}
};
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
if let Err(validation_error) =
validate_developer_completion(&project, &branch_name, &developer_report)
{
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_developer_report(&conn, &ticket.id, &developer_report)
.map_err(|e| format!("set_developer_report: {}", e))?;
}
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
&validation_error,
);
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_developer_report(&conn, &ticket.id, &developer_report)
.map_err(|e| format!("set_developer_report: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Reviewing")
.map_err(|e| format!("update_status: {}", e))?;
}
let _ = app_handle.emit(
"ticket-processing-started",
serde_json::json!({
"project_id": &project.id,
"ticket_id": &ticket.id,
"artifact_id": ticket.artifact_id,
"step": "review",
}),
);
let review_prompt = append_custom_prompt(
build_review_prompt(
&ticket,
&project,
&analyst_report,
&developer_report,
&wt_path,
&branch_name,
),
&reviewer_agent.custom_prompt,
);
let review_args = build_agent_cli_args(&reviewer_agent, &wt_path);
let review_result = run_cli_command(
reviewer_agent.tool.to_command(),
&review_args,
&review_prompt,
&wt_path,
600,
TicketCliContext {
app_handle,
ticket_id: &ticket.id,
process_registry,
},
)
.await;
let review_report = match review_result {
Ok(report) => report,
Err(e) => {
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
record_ticket_error(
db,
app_handle,
&project.id,
&ticket.id,
ticket.artifact_id,
&e,
);
return Ok(true);
}
};
if is_ticket_cancelled(db, &ticket.id)? {
return Ok(true);
}
{
let conn = db.lock().map_err(|e| format!("DB lock: {}", e))?;
ProcessedTicket::set_review_report(&conn, &ticket.id, &review_report)
.map_err(|e| format!("set_review_report: {}", e))?;
ProcessedTicket::update_status(&conn, &ticket.id, "Done")
.map_err(|e| format!("update_status: {}", e))?;
}
let _ = app_handle.emit(
"ticket-processing-done",
serde_json::json!({
"project_id": &project.id,
"ticket_id": &ticket.id,
"artifact_id": ticket.artifact_id,
}),
);
notifier::notify_fix_ready(db, app_handle, &project.id, &ticket.id, ticket.artifact_id);
Ok(true)
}
pub fn start(db: Arc<Mutex<Connection>>, app_handle: AppHandle, process_registry: ProcessRegistry) {
match recover_interrupted_tickets(&db) {
Ok(count) if count > 0 => {
eprintln!(
"orchestrator: recovered {} interrupted ticket(s) to Pending state",
count
);
}
Ok(_) => {}
Err(e) => {
eprintln!("orchestrator: failed to recover interrupted tickets: {}", e);
}
}
tauri::async_runtime::spawn(async move {
let mut tick = interval(Duration::from_secs(10));
loop {
tick.tick().await;
match process_ticket(&db, &app_handle, &process_registry).await {
Ok(true) => {
continue;
}
Ok(false) => {}
Err(e) => {
eprintln!("orchestrator: {}", e);
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db;
use std::path::Path;
use std::process::Command;
#[test]
fn test_build_analyst_prompt_contains_ticket_info() {
let ticket = ProcessedTicket {
id: "t1".into(),
tracker_id: Some("tr1".into()),
project_id: "p1".into(),
source: "tuleap".into(),
source_ref: None,
artifact_id: 42,
artifact_title: "Login crash on empty password".into(),
artifact_data: r#"{"id":42,"title":"Login crash"}"#.into(),
status: "Pending".into(),
analyst_report: None,
developer_report: None,
review_report: None,
worktree_path: None,
branch_name: None,
detected_at: "2026-01-01T00:00:00Z".into(),
processed_at: None,
};
let project = Project {
id: "p1".into(),
name: "MyApp".into(),
path: "/home/user/myapp".into(),
cloned_from: None,
base_branch: "stable".into(),
created_at: "2026-01-01T00:00:00Z".into(),
};
let prompt = build_analyst_prompt(&ticket, &project);
assert!(prompt.contains("42"));
assert!(prompt.contains("Login crash on empty password"));
assert!(prompt.contains("MyApp"));
assert!(prompt.contains("/home/user/myapp"));
assert!(prompt.contains("stable"));
assert!(prompt.contains("[VERDICT: FIX_NEEDED]"));
assert!(prompt.contains("[VERDICT: NO_FIX]"));
}
#[test]
fn test_build_developer_prompt_contains_report() {
let ticket = ProcessedTicket {
id: "t1".into(),
tracker_id: Some("tr1".into()),
project_id: "p1".into(),
source: "tuleap".into(),
source_ref: None,
artifact_id: 42,
artifact_title: "Login crash".into(),
artifact_data: "{}".into(),
status: "Developing".into(),
analyst_report: None,
developer_report: None,
review_report: None,
worktree_path: None,
branch_name: None,
detected_at: "2026-01-01T00:00:00Z".into(),
processed_at: None,
};
let project = Project {
id: "p1".into(),
name: "MyApp".into(),
path: "/home/user/myapp".into(),
cloned_from: None,
base_branch: "main".into(),
created_at: "2026-01-01T00:00:00Z".into(),
};
let prompt =
build_developer_prompt(&ticket, &project, "## Bug found in auth.rs", "/tmp/wt");
assert!(prompt.contains("## Bug found in auth.rs"));
assert!(prompt.contains("42"));
assert!(prompt.contains("/tmp/wt"));
}
#[test]
fn test_parse_verdict_fix_needed() {
let report = "## Analysis\nBug found.\n[VERDICT: FIX_NEEDED]\n";
assert_eq!(parse_verdict(report), Verdict::FixNeeded);
}
#[test]
fn test_parse_verdict_no_fix() {
let report = "## Analysis\nThis is a feature request, not a bug.\n[VERDICT: NO_FIX]\n";
assert_eq!(parse_verdict(report), Verdict::NoFix);
}
#[test]
fn test_parse_verdict_missing_defaults_to_fix() {
let report = "## Analysis\nSomething is wrong but I forgot the verdict.";
assert_eq!(parse_verdict(report), Verdict::FixNeeded);
}
#[test]
fn test_parse_verdict_embedded_in_line() {
let report = "Verdict: [VERDICT: NO_FIX] - no code change needed.";
assert_eq!(parse_verdict(report), Verdict::NoFix);
}
#[test]
fn test_recover_interrupted_tickets_requeues_analyzing_status() {
let conn = db::init_in_memory().expect("db init should succeed");
let project =
Project::insert(&conn, "Test", "/tmp/orchai-test", None, "main").expect("project");
let ticket = ProcessedTicket::insert_external(
&conn,
&project.id,
"graylog",
Some("subject-1"),
-1,
"Interrupted ticket",
"{}",
)
.expect("ticket");
ProcessedTicket::update_status(&conn, &ticket.id, "Analyzing")
.expect("status update should succeed");
let shared_db = Arc::new(Mutex::new(conn));
let recovered = recover_interrupted_tickets(&shared_db)
.expect("recovery should succeed for analyzing tickets");
assert_eq!(recovered, 1);
let guard = shared_db.lock().expect("db lock should succeed");
let updated =
ProcessedTicket::get_by_id(&guard, &ticket.id).expect("ticket lookup should succeed");
assert_eq!(updated.status, "Pending");
}
#[test]
fn test_should_flush_progress_buffer_when_elapsed_interval_exceeded() {
let should_flush =
should_flush_progress_buffer("some progress", std::time::Duration::from_millis(300));
assert!(should_flush);
}
#[test]
fn test_should_flush_progress_buffer_when_buffer_reaches_size_limit() {
let payload = "x".repeat(2048);
let should_flush =
should_flush_progress_buffer(&payload, std::time::Duration::from_millis(10));
assert!(should_flush);
}
#[test]
fn test_should_not_flush_progress_buffer_when_empty() {
let should_flush = should_flush_progress_buffer("", std::time::Duration::from_secs(1));
assert!(!should_flush);
}
#[test]
fn test_evaluate_developer_completion_rejects_permission_block_report() {
let result = evaluate_developer_completion(
"Je suis bloque par un refus d'autorisation pour l'edition des fichiers.",
1,
true,
);
assert!(result.is_err());
assert!(result.unwrap_err().contains("permission"));
}
#[test]
fn test_evaluate_developer_completion_rejects_no_commit() {
let result = evaluate_developer_completion("Correction appliquee.", 0, true);
assert!(result.is_err());
assert!(result.unwrap_err().contains("commit"));
}
#[test]
fn test_evaluate_developer_completion_rejects_empty_diff() {
let result = evaluate_developer_completion("Correction appliquee.", 1, false);
assert!(result.is_err());
assert!(result.unwrap_err().contains("diff"));
}
#[test]
fn test_evaluate_developer_completion_accepts_valid_fix() {
let result = evaluate_developer_completion("Correction appliquee.", 1, true);
assert!(result.is_ok());
}
fn setup_test_repo() -> tempfile::TempDir {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().to_str().expect("utf8 path");
let init = Command::new("git")
.args(["init"])
.current_dir(path)
.output()
.expect("git init");
assert!(init.status.success(), "git init should succeed");
let set_email = Command::new("git")
.args(["config", "user.email", "orchai-test@example.com"])
.current_dir(path)
.output()
.expect("git config email");
assert!(set_email.status.success());
let set_name = Command::new("git")
.args(["config", "user.name", "Orchai Test"])
.current_dir(path)
.output()
.expect("git config name");
assert!(set_name.status.success());
std::fs::write(dir.path().join("README.md"), "# Orchai test repo").expect("write readme");
let add = Command::new("git")
.args(["add", "."])
.current_dir(path)
.output()
.expect("git add");
assert!(add.status.success());
let commit = Command::new("git")
.args(["commit", "-m", "init"])
.current_dir(path)
.output()
.expect("git commit");
assert!(commit.status.success());
dir
}
fn current_branch(path: &str) -> String {
let output = Command::new("git")
.args(["rev-parse", "--abbrev-ref", "HEAD"])
.current_dir(path)
.output()
.expect("current branch");
assert!(output.status.success());
String::from_utf8_lossy(&output.stdout).trim().to_string()
}
fn build_project(project_path: &str, base_branch: &str) -> Project {
Project {
id: "p-test".into(),
name: "Test project".into(),
path: project_path.to_string(),
cloned_from: None,
base_branch: base_branch.to_string(),
created_at: "2026-01-01T00:00:00Z".into(),
}
}
fn collect_add_dirs(args: &[String]) -> Vec<String> {
let mut dirs = Vec::new();
let mut index = 0usize;
while index + 1 < args.len() {
if args[index] == "--add-dir" {
dirs.push(args[index + 1].clone());
index += 2;
continue;
}
index += 1;
}
dirs
}
fn build_test_agent(tool: crate::models::agent::AgentTool) -> Agent {
Agent {
id: "a-test".into(),
name: "Agent test".into(),
role: AgentRole::Developer,
tool,
custom_prompt: String::new(),
is_default: false,
created_at: "2026-01-01T00:00:00Z".into(),
updated_at: "2026-01-01T00:00:00Z".into(),
runtime_status: crate::models::agent::AgentRuntimeStatus::Available,
exhausted_until: None,
runtime_error: None,
}
}
#[test]
fn test_build_agent_cli_args_adds_git_metadata_dirs_for_codex_worktree() {
let repo = setup_test_repo();
let repo_path = repo.path().to_str().expect("utf8 path");
let base_branch = current_branch(repo_path);
let (worktree_path, _branch_name) =
worktree_manager::create_worktree(repo_path, &base_branch, 202)
.expect("worktree creation should succeed");
let agent = build_test_agent(crate::models::agent::AgentTool::Codex);
let args = build_agent_cli_args(&agent, &worktree_path);
let add_dirs = collect_add_dirs(&args);
let normalized_worktree = std::fs::canonicalize(&worktree_path)
.unwrap_or_else(|_| Path::new(&worktree_path).to_path_buf())
.to_string_lossy()
.to_string();
assert!(
add_dirs.contains(&normalized_worktree),
"Expected --add-dir to contain worktree '{}', got {:?}",
normalized_worktree,
add_dirs
);
let rev_parse = Command::new("git")
.args(["rev-parse", "--git-dir", "--git-common-dir"])
.current_dir(&worktree_path)
.output()
.expect("git rev-parse should succeed");
assert!(rev_parse.status.success(), "git rev-parse should succeed");
let expected_dirs: Vec<String> = String::from_utf8_lossy(&rev_parse.stdout)
.lines()
.map(|line| {
let path = Path::new(line);
let absolute = if path.is_absolute() {
path.to_path_buf()
} else {
Path::new(&worktree_path).join(path)
};
std::fs::canonicalize(&absolute)
.unwrap_or(absolute)
.to_string_lossy()
.to_string()
})
.collect();
for expected in expected_dirs {
assert!(
add_dirs.contains(&expected),
"Expected --add-dir to contain '{}', got {:?}",
expected,
add_dirs
);
}
}
#[test]
fn test_build_agent_cli_args_adds_git_metadata_dirs_for_claude_worktree() {
let repo = setup_test_repo();
let repo_path = repo.path().to_str().expect("utf8 path");
let base_branch = current_branch(repo_path);
let (worktree_path, _branch_name) =
worktree_manager::create_worktree(repo_path, &base_branch, 203)
.expect("worktree creation should succeed");
let agent = build_test_agent(crate::models::agent::AgentTool::ClaudeCode);
let args = build_agent_cli_args(&agent, &worktree_path);
let add_dirs = collect_add_dirs(&args);
let normalized_worktree = std::fs::canonicalize(&worktree_path)
.unwrap_or_else(|_| Path::new(&worktree_path).to_path_buf())
.to_string_lossy()
.to_string();
assert!(
add_dirs.contains(&normalized_worktree),
"Expected --add-dir to contain worktree '{}', got {:?}",
normalized_worktree,
add_dirs
);
let rev_parse = Command::new("git")
.args(["rev-parse", "--git-dir", "--git-common-dir"])
.current_dir(&worktree_path)
.output()
.expect("git rev-parse should succeed");
assert!(rev_parse.status.success(), "git rev-parse should succeed");
let expected_dirs: Vec<String> = String::from_utf8_lossy(&rev_parse.stdout)
.lines()
.map(|line| {
let path = Path::new(line);
let absolute = if path.is_absolute() {
path.to_path_buf()
} else {
Path::new(&worktree_path).join(path)
};
std::fs::canonicalize(&absolute)
.unwrap_or(absolute)
.to_string_lossy()
.to_string()
})
.collect();
for expected in expected_dirs {
assert!(
add_dirs.contains(&expected),
"Expected --add-dir to contain '{}', got {:?}",
expected,
add_dirs
);
}
}
#[test]
fn test_validate_developer_completion_rejects_branch_without_commit() {
let repo = setup_test_repo();
let repo_path = repo.path().to_str().expect("utf8 path");
let base_branch = current_branch(repo_path);
let (_wt_path, branch_name) =
worktree_manager::create_worktree(repo_path, &base_branch, 100).expect("worktree");
let project = build_project(repo_path, &base_branch);
let result = validate_developer_completion(&project, &branch_name, "Correction appliquee.");
assert!(result.is_err());
assert!(result.unwrap_err().contains("commit"));
}
#[test]
fn test_validate_developer_completion_accepts_branch_with_commit_and_diff() {
let repo = setup_test_repo();
let repo_path = repo.path().to_str().expect("utf8 path");
let base_branch = current_branch(repo_path);
let (wt_path, branch_name) =
worktree_manager::create_worktree(repo_path, &base_branch, 101).expect("worktree");
let fix_file = Path::new(&wt_path).join("fix.txt");
std::fs::write(&fix_file, "critical fix").expect("write fix");
let add = Command::new("git")
.args(["add", "."])
.current_dir(&wt_path)
.output()
.expect("git add");
assert!(add.status.success());
let commit = Command::new("git")
.args(["commit", "-m", "fix: add critical fix"])
.current_dir(&wt_path)
.output()
.expect("git commit");
assert!(commit.status.success());
let project = build_project(repo_path, &base_branch);
let result = validate_developer_completion(&project, &branch_name, "Correction appliquee.");
assert!(result.is_ok());
}
}