tirea_agentos_server/service/
mailbox.rs

1// Some items (MailboxDispatcher, AgentReceiver, etc.) are only used by #[cfg(test)] tests.
2#![allow(dead_code)]
3
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use async_trait::async_trait;
8use futures::stream::FuturesUnordered;
9use futures::StreamExt;
10use tirea_agentos::contracts::storage::{
11    MailboxEntry, MailboxEntryOrigin, MailboxEntryStatus, MailboxQuery, MailboxReader,
12    MailboxStore, MailboxStoreError, RunStatus, ThreadReader,
13};
14use tirea_agentos::contracts::thread::Message;
15use tirea_agentos::contracts::RunRequest;
16use tirea_agentos::{AgentOs, AgentOsRunError, RunStream};
17use tirea_contract::storage::{MailboxReceiver, ReceiveOutcome, RunRecord};
18
19use super::ApiError;
20
21const DEFAULT_MAILBOX_POLL_INTERVAL_MS: u64 = 100;
22pub(crate) const DEFAULT_MAILBOX_LEASE_MS: u64 = 30_000;
23const DEFAULT_MAILBOX_RETRY_MS: u64 = 250;
24const DEFAULT_MAILBOX_BATCH_SIZE: usize = 16;
25const DEFAULT_MAILBOX_MAX_ATTEMPTS: u32 = 10;
26const DEFAULT_MAILBOX_GC_INTERVAL_SECS: u64 = 60;
27const DEFAULT_MAILBOX_GC_TTL_MS: u64 = 24 * 60 * 60 * 1000; // 24 hours
28pub(crate) const INLINE_MAILBOX_AVAILABLE_AT: u64 = i64::MAX as u64;
29
30pub(crate) fn now_unix_millis() -> u64 {
31    SystemTime::now()
32        .duration_since(UNIX_EPOCH)
33        .unwrap_or_default()
34        .as_millis()
35        .min(u128::from(u64::MAX)) as u64
36}
37
38pub(crate) fn new_id() -> String {
39    uuid::Uuid::now_v7().simple().to_string()
40}
41
42// ---------------------------------------------------------------------------
43// Agent-specific helpers
44// ---------------------------------------------------------------------------
45
46pub(crate) fn normalize_background_run_request(
47    agent_id: &str,
48    mut request: RunRequest,
49) -> RunRequest {
50    request.agent_id = agent_id.to_string();
51    if request.thread_id.is_none() {
52        request.thread_id = Some(new_id());
53    }
54    if request.run_id.is_none() {
55        request.run_id = Some(new_id());
56    }
57    request
58}
59
60/// Optional envelope fields for enqueued mailbox entries.
61#[derive(Debug, Clone, Default)]
62pub struct EnqueueOptions {
63    /// Identity of the sender for audit and reply routing.
64    pub sender_id: Option<String>,
65    /// Dispatch priority (higher = dispatched first). Default 0.
66    pub priority: u8,
67    /// Deduplication key — rejected if another entry with the same key exists in the mailbox.
68    pub dedupe_key: Option<String>,
69}
70
71pub(crate) fn mailbox_entry_from_request(
72    request: &RunRequest,
73    generation: u64,
74    options: &EnqueueOptions,
75    available_at: u64,
76) -> MailboxEntry {
77    let now = now_unix_millis();
78    let mailbox_id = request
79        .thread_id
80        .clone()
81        .expect("background mailbox request should have thread_id");
82    let payload = serde_json::to_value(request).expect("RunRequest should be serializable");
83    MailboxEntry {
84        entry_id: new_id(),
85        mailbox_id,
86        origin: MailboxEntryOrigin::from_run_origin(request.origin),
87        sender_id: options.sender_id.clone(),
88        payload,
89        priority: options.priority,
90        dedupe_key: options.dedupe_key.clone(),
91        generation,
92        status: MailboxEntryStatus::Queued,
93        available_at,
94        attempt_count: 0,
95        last_error: None,
96        claim_token: None,
97        claimed_by: None,
98        lease_until: None,
99        created_at: now,
100        updated_at: now,
101    }
102}
103
104pub(crate) fn mailbox_error(err: MailboxStoreError) -> ApiError {
105    ApiError::Internal(err.to_string())
106}
107
108pub(crate) fn is_generation_mismatch(err: &MailboxStoreError) -> bool {
109    matches!(err, MailboxStoreError::GenerationMismatch { .. })
110}
111
112pub(crate) fn is_permanent_dispatch_error(err: &AgentOsRunError) -> bool {
113    matches!(err, AgentOsRunError::Resolve(_))
114}
115
116pub(crate) async fn drain_background_run(mut run: RunStream) {
117    while run.events.next().await.is_some() {}
118}
119
120pub(crate) fn parent_completion_notification_dedupe_key(
121    parent_run_id: &str,
122    child_run_id: &str,
123) -> String {
124    format!("parent-task-completion:{parent_run_id}:{child_run_id}")
125}
126
127fn parent_completion_notification_status(record: &RunRecord) -> Option<&'static str> {
128    if record.status != RunStatus::Done {
129        return None;
130    }
131
132    match record.termination_code.as_deref() {
133        Some("error") => Some("failed"),
134        Some("cancelled") => Some("cancelled"),
135        Some(code) if code.starts_with("stopped:") => Some("stopped"),
136        Some("behavior_requested" | "natural") | None => Some("completed"),
137        Some(_) => Some("completed"),
138    }
139}
140
141pub(crate) fn build_parent_completion_notification_message(record: &RunRecord) -> Option<Message> {
142    let recipient_task_id = record.parent_run_id.clone()?;
143    let status = parent_completion_notification_status(record)?;
144    let task_id = record
145        .source_mailbox_entry_id
146        .as_deref()
147        .unwrap_or(record.run_id.as_str());
148    let content = serde_json::json!({
149        "type": "background_task_notification",
150        "recipient_task_id": recipient_task_id,
151        "child_task_id": task_id,
152        "child_run_id": record.run_id,
153        "child_thread_id": record.thread_id,
154        "status": status,
155        "termination_code": record.termination_code,
156        "termination_detail": record.termination_detail,
157        "result_ref": {
158            "task_id": task_id,
159            "run_id": record.run_id,
160            "thread_id": record.thread_id,
161        },
162    });
163
164    Some(Message::internal_system(content.to_string()))
165}
166
167// ---------------------------------------------------------------------------
168// Entry lifecycle helpers
169// ---------------------------------------------------------------------------
170
171pub(crate) async fn ack_claimed_entry(
172    mailbox_store: &Arc<dyn MailboxStore>,
173    entry_id: &str,
174    claim_token: &str,
175) -> Result<(), ApiError> {
176    match mailbox_store
177        .ack_mailbox_entry(entry_id, claim_token, now_unix_millis())
178        .await
179    {
180        Ok(()) => Ok(()),
181        Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
182        Err(err) => Err(mailbox_error(err)),
183    }
184}
185
186pub(crate) async fn nack_claimed_entry(
187    mailbox_store: &Arc<dyn MailboxStore>,
188    entry_id: &str,
189    claim_token: &str,
190    retry_delay_ms: u64,
191    error: &str,
192) -> Result<(), ApiError> {
193    let now = now_unix_millis();
194    match mailbox_store
195        .nack_mailbox_entry(
196            entry_id,
197            claim_token,
198            now.saturating_add(retry_delay_ms),
199            error,
200            now,
201        )
202        .await
203    {
204        Ok(()) => Ok(()),
205        Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
206        Err(err) => Err(mailbox_error(err)),
207    }
208}
209
210pub(crate) async fn dead_letter_claimed_entry(
211    mailbox_store: &Arc<dyn MailboxStore>,
212    entry_id: &str,
213    claim_token: &str,
214    error: &str,
215) -> Result<(), ApiError> {
216    match mailbox_store
217        .dead_letter_mailbox_entry(entry_id, claim_token, error, now_unix_millis())
218        .await
219    {
220        Ok(()) => Ok(()),
221        Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
222        Err(err) => Err(mailbox_error(err)),
223    }
224}
225
226// ---------------------------------------------------------------------------
227// Agent receiver (implements MailboxReceiver for agent runs)
228// ---------------------------------------------------------------------------
229
230pub struct AgentReceiver {
231    os: Arc<AgentOs>,
232}
233
234impl AgentReceiver {
235    pub fn new(os: Arc<AgentOs>) -> Self {
236        Self { os }
237    }
238}
239
240#[async_trait]
241impl MailboxReceiver for AgentReceiver {
242    async fn receive(&self, entry: &MailboxEntry) -> ReceiveOutcome {
243        let mut request: RunRequest = match serde_json::from_value(entry.payload.clone()) {
244            Ok(r) => r,
245            Err(e) => return ReceiveOutcome::Reject(format!("invalid payload: {e}")),
246        };
247
248        request.source_mailbox_entry_id = Some(entry.entry_id.clone());
249
250        let agent_id = request.agent_id.clone();
251        let thread_id = request
252            .thread_id
253            .clone()
254            .unwrap_or_else(|| entry.mailbox_id.clone());
255
256        match self
257            .os
258            .current_run_id_for_thread(&agent_id, &thread_id)
259            .await
260        {
261            Ok(Some(_)) => return ReceiveOutcome::Retry("thread has active run".into()),
262            Ok(None) => {}
263            Err(e) => return ReceiveOutcome::Retry(e.to_string()),
264        }
265
266        let resolved = match self.os.resolve(&agent_id) {
267            Ok(r) => r,
268            Err(e) => return ReceiveOutcome::Reject(e.to_string()),
269        };
270
271        match self
272            .os
273            .start_active_run_with_persistence(&agent_id, request, resolved, true, false)
274            .await
275        {
276            Ok(run) => {
277                tokio::spawn(drain_background_run(run));
278                ReceiveOutcome::Accepted
279            }
280            Err(e) if is_permanent_dispatch_error(&e) => ReceiveOutcome::Reject(e.to_string()),
281            Err(e) => ReceiveOutcome::Retry(e.to_string()),
282        }
283    }
284}
285
286// ---------------------------------------------------------------------------
287// Agent-specific inline run start (for streaming/synchronous runs)
288// ---------------------------------------------------------------------------
289
290pub(crate) enum MailboxRunStartError {
291    Busy(String),
292    Superseded(String),
293    Permanent(String),
294    Retryable(String),
295    Internal(ApiError),
296}
297
298pub(crate) async fn start_agent_run_for_entry(
299    os: &Arc<AgentOs>,
300    mailbox_store: &Arc<dyn MailboxStore>,
301    entry: &MailboxEntry,
302    persist_run: bool,
303) -> Result<RunStream, MailboxRunStartError> {
304    // Verify entry still claimed with our token
305    if let Some(current) = mailbox_store
306        .load_mailbox_entry(&entry.entry_id)
307        .await
308        .map_err(mailbox_error)
309        .map_err(MailboxRunStartError::Internal)?
310    {
311        if current.status != MailboxEntryStatus::Claimed || current.claim_token != entry.claim_token
312        {
313            return Err(MailboxRunStartError::Superseded(
314                current
315                    .last_error
316                    .unwrap_or_else(|| "mailbox entry is no longer active".to_string()),
317            ));
318        }
319    }
320
321    // Check generation
322    if mailbox_store
323        .load_mailbox_state(&entry.mailbox_id)
324        .await
325        .map_err(mailbox_error)
326        .map_err(MailboxRunStartError::Internal)?
327        .is_some_and(|state| state.current_generation != entry.generation)
328    {
329        return Err(MailboxRunStartError::Superseded(
330            "mailbox entry superseded by interrupt".to_string(),
331        ));
332    }
333
334    // Deserialize payload
335    let mut request: RunRequest = serde_json::from_value(entry.payload.clone())
336        .map_err(|e| MailboxRunStartError::Permanent(format!("invalid payload: {e}")))?;
337    request.source_mailbox_entry_id = Some(entry.entry_id.clone());
338
339    let agent_id = request.agent_id.clone();
340    let thread_id = request
341        .thread_id
342        .clone()
343        .unwrap_or_else(|| entry.mailbox_id.clone());
344
345    // Check for active run on thread
346    match os.current_run_id_for_thread(&agent_id, &thread_id).await {
347        Ok(Some(_)) => {
348            return Err(MailboxRunStartError::Busy(
349                "thread already has an active run".to_string(),
350            ));
351        }
352        Ok(None) => {}
353        Err(err) => return Err(MailboxRunStartError::Internal(ApiError::from(err))),
354    }
355
356    let resolved = os
357        .resolve(&agent_id)
358        .map_err(|err| MailboxRunStartError::Permanent(err.to_string()))?;
359
360    os.start_active_run_with_persistence(&agent_id, request, resolved, persist_run, !persist_run)
361        .await
362        .map_err(|err| {
363            if is_permanent_dispatch_error(&err) {
364                MailboxRunStartError::Permanent(err.to_string())
365            } else {
366                MailboxRunStartError::Retryable(err.to_string())
367            }
368        })
369}
370
371// ---------------------------------------------------------------------------
372// Enqueue / background run
373// ---------------------------------------------------------------------------
374
375/// Enqueue a mailbox entry from a RunRequest. Returns (mailbox_id, entry_id, run_id).
376pub(crate) async fn enqueue_mailbox_run(
377    os: &Arc<AgentOs>,
378    mailbox_store: &Arc<dyn MailboxStore>,
379    agent_id: &str,
380    request: RunRequest,
381    available_at: u64,
382    options: EnqueueOptions,
383) -> Result<(String, String, String), ApiError> {
384    os.resolve(agent_id).map_err(AgentOsRunError::from)?;
385
386    let request = normalize_background_run_request(agent_id, request);
387    let mailbox_id = request
388        .thread_id
389        .clone()
390        .expect("normalized mailbox run request should have thread_id");
391    let run_id = request
392        .run_id
393        .clone()
394        .expect("normalized mailbox run request should have run_id");
395
396    for _ in 0..2 {
397        let now = now_unix_millis();
398        let state = mailbox_store
399            .ensure_mailbox_state(&mailbox_id, now)
400            .await
401            .map_err(mailbox_error)?;
402        let entry =
403            mailbox_entry_from_request(&request, state.current_generation, &options, available_at);
404        let entry_id = entry.entry_id.clone();
405
406        match mailbox_store.enqueue_mailbox_entry(&entry).await {
407            Ok(()) => return Ok((mailbox_id, entry_id, run_id)),
408            Err(err) if is_generation_mismatch(&err) => continue,
409            Err(err) => return Err(mailbox_error(err)),
410        }
411    }
412
413    Err(ApiError::Internal(format!(
414        "mailbox enqueue raced with interrupt for mailbox '{mailbox_id}'"
415    )))
416}
417
418/// Enqueue a background run. Returns (thread_id, run_id, entry_id).
419pub async fn enqueue_background_run(
420    os: &Arc<AgentOs>,
421    mailbox_store: &Arc<dyn MailboxStore>,
422    agent_id: &str,
423    request: RunRequest,
424    options: EnqueueOptions,
425) -> Result<(String, String, String), ApiError> {
426    let (mailbox_id, entry_id, run_id) = enqueue_mailbox_run(
427        os,
428        mailbox_store,
429        agent_id,
430        request,
431        now_unix_millis(),
432        options,
433    )
434    .await?;
435    Ok((mailbox_id, run_id, entry_id))
436}
437
438pub async fn start_streaming_run_via_mailbox(
439    os: &Arc<AgentOs>,
440    mailbox_store: &Arc<dyn MailboxStore>,
441    agent_id: &str,
442    request: RunRequest,
443    consumer_id: &str,
444    options: EnqueueOptions,
445) -> Result<RunStream, ApiError> {
446    let (_mailbox_id, entry_id, _run_id) = enqueue_mailbox_run(
447        os,
448        mailbox_store,
449        agent_id,
450        request,
451        INLINE_MAILBOX_AVAILABLE_AT,
452        options,
453    )
454    .await?;
455
456    let Some(entry) = mailbox_store
457        .claim_mailbox_entry(
458            &entry_id,
459            consumer_id,
460            now_unix_millis(),
461            DEFAULT_MAILBOX_LEASE_MS,
462        )
463        .await
464        .map_err(mailbox_error)?
465    else {
466        let existing = mailbox_store
467            .load_mailbox_entry(&entry_id)
468            .await
469            .map_err(mailbox_error)?;
470        return Err(match existing {
471            Some(entry) if entry.status == MailboxEntryStatus::Accepted => {
472                ApiError::BadRequest("run has already been accepted".to_string())
473            }
474            Some(entry) if entry.status == MailboxEntryStatus::Superseded => {
475                ApiError::BadRequest("run has been superseded".to_string())
476            }
477            Some(entry) if entry.status == MailboxEntryStatus::Cancelled => {
478                ApiError::BadRequest("run has already been cancelled".to_string())
479            }
480            Some(entry) if entry.status == MailboxEntryStatus::DeadLetter => ApiError::Internal(
481                entry
482                    .last_error
483                    .unwrap_or_else(|| "mailbox entry moved to dead letter".to_string()),
484            ),
485            Some(_) => ApiError::BadRequest("entry is already claimed".to_string()),
486            None => ApiError::Internal(format!(
487                "mailbox entry '{entry_id}' disappeared before inline dispatch"
488            )),
489        });
490    };
491
492    let claim_token = entry.claim_token.clone().ok_or_else(|| {
493        ApiError::Internal(format!(
494            "mailbox entry '{}' was claimed without claim_token",
495            entry.entry_id
496        ))
497    })?;
498
499    match start_agent_run_for_entry(os, mailbox_store, &entry, false).await {
500        Ok(run) => {
501            ack_claimed_entry(mailbox_store, &entry.entry_id, &claim_token).await?;
502            Ok(run)
503        }
504        Err(MailboxRunStartError::Superseded(error)) => {
505            let _ = mailbox_store
506                .supersede_mailbox_entry(&entry.entry_id, now_unix_millis(), &error)
507                .await
508                .map_err(mailbox_error)?;
509            Err(ApiError::BadRequest(error))
510        }
511        Err(MailboxRunStartError::Busy(error)) => {
512            mailbox_store
513                .cancel_mailbox_entry(&entry.entry_id, now_unix_millis())
514                .await
515                .map_err(mailbox_error)?;
516            Err(ApiError::BadRequest(error))
517        }
518        Err(MailboxRunStartError::Permanent(error))
519        | Err(MailboxRunStartError::Retryable(error)) => {
520            dead_letter_claimed_entry(mailbox_store, &entry.entry_id, &claim_token, &error).await?;
521            Err(ApiError::Internal(error))
522        }
523        Err(MailboxRunStartError::Internal(error)) => {
524            dead_letter_claimed_entry(
525                mailbox_store,
526                &entry.entry_id,
527                &claim_token,
528                &error.to_string(),
529            )
530            .await?;
531            Err(error)
532        }
533    }
534}
535
536// ---------------------------------------------------------------------------
537// Thread / mailbox lifecycle
538// ---------------------------------------------------------------------------
539
540pub async fn cancel_pending_for_mailbox(
541    mailbox_store: &Arc<dyn MailboxStore>,
542    mailbox_id: &str,
543    exclude_entry_id: Option<&str>,
544) -> Result<Vec<MailboxEntry>, ApiError> {
545    mailbox_store
546        .cancel_pending_for_mailbox(mailbox_id, now_unix_millis(), exclude_entry_id)
547        .await
548        .map_err(mailbox_error)
549}
550
551pub struct ThreadInterruptResult {
552    pub cancelled_run_id: Option<String>,
553    pub generation: u64,
554    pub superseded_entries: Vec<MailboxEntry>,
555}
556
557pub async fn interrupt_thread(
558    os: &Arc<AgentOs>,
559    read_store: &dyn ThreadReader,
560    mailbox_store: &Arc<dyn MailboxStore>,
561    thread_id: &str,
562) -> Result<ThreadInterruptResult, ApiError> {
563    let interrupted = mailbox_store
564        .interrupt_mailbox(thread_id, now_unix_millis())
565        .await
566        .map_err(mailbox_error)?;
567    let cancelled_run_id = os.cancel_active_run_by_thread(thread_id).await;
568
569    if cancelled_run_id.is_none() && interrupted.superseded_entries.is_empty() {
570        let thread_exists = read_store
571            .load_thread(thread_id)
572            .await
573            .map_err(|err| ApiError::Internal(err.to_string()))?
574            .is_some();
575        if !thread_exists {
576            let mailbox_page = mailbox_store
577                .list_mailbox_entries(&MailboxQuery {
578                    mailbox_id: Some(thread_id.to_string()),
579                    limit: 1,
580                    ..Default::default()
581                })
582                .await
583                .map_err(mailbox_error)?;
584            if mailbox_page.total == 0 {
585                return Err(ApiError::ThreadNotFound(thread_id.to_string()));
586            }
587        }
588    }
589
590    Ok(ThreadInterruptResult {
591        cancelled_run_id,
592        generation: interrupted.mailbox_state.current_generation,
593        superseded_entries: interrupted.superseded_entries,
594    })
595}
596
597// ---------------------------------------------------------------------------
598// Background task lookup / cancel
599// ---------------------------------------------------------------------------
600
601#[derive(Debug)]
602pub enum BackgroundTaskLookup {
603    Run(RunRecord),
604    Mailbox(MailboxEntry),
605}
606
607/// Look up a background task by run_id (RunStore) or entry_id (MailboxStore).
608///
609/// If the entry has been accepted, tries to find the corresponding RunRecord
610/// by extracting the run_id from the payload.
611pub async fn load_background_task(
612    read_store: &dyn ThreadReader,
613    mailbox_store: &dyn MailboxReader,
614    id: &str,
615) -> Result<Option<BackgroundTaskLookup>, ApiError> {
616    // Try run store first (id interpreted as run_id).
617    if let Some(record) = read_store
618        .load_run(id)
619        .await
620        .map_err(|err| ApiError::Internal(err.to_string()))?
621    {
622        return Ok(Some(BackgroundTaskLookup::Run(record)));
623    }
624
625    // Try mailbox store (id interpreted as entry_id).
626    let Some(entry) = mailbox_store
627        .load_mailbox_entry(id)
628        .await
629        .map_err(mailbox_error)?
630    else {
631        return Ok(None);
632    };
633
634    // If the entry was accepted, look up the RunRecord via run_id from payload.
635    if entry.status == MailboxEntryStatus::Accepted {
636        if let Some(run_id) = entry.payload.get("run_id").and_then(|v| v.as_str()) {
637            if let Some(record) = read_store
638                .load_run(run_id)
639                .await
640                .map_err(|err| ApiError::Internal(err.to_string()))?
641            {
642                return Ok(Some(BackgroundTaskLookup::Run(record)));
643            }
644        }
645    }
646
647    Ok(Some(BackgroundTaskLookup::Mailbox(entry)))
648}
649
650pub enum CancelBackgroundRunResult {
651    Active,
652    Pending,
653}
654
655/// Cancel by run_id (active run) or entry_id (queued mailbox entry).
656///
657/// If `id` is a run_id, cancels the active run directly.
658/// If `id` is an entry_id for a queued entry, cancels the mailbox entry.
659/// If `id` is an entry_id for an accepted entry, extracts the run_id from
660/// the payload and cancels the active run.
661pub async fn try_cancel_active_or_queued_run_by_id(
662    os: &Arc<AgentOs>,
663    mailbox_store: &Arc<dyn MailboxStore>,
664    id: &str,
665) -> Result<Option<CancelBackgroundRunResult>, ApiError> {
666    // Try cancelling active run directly (id interpreted as run_id).
667    if os.cancel_active_run_by_id(id).await {
668        return Ok(Some(CancelBackgroundRunResult::Active));
669    }
670
671    // Try cancelling a queued mailbox entry (id interpreted as entry_id).
672    let cancelled = mailbox_store
673        .cancel_mailbox_entry(id, now_unix_millis())
674        .await
675        .map_err(mailbox_error)?;
676    if cancelled
677        .as_ref()
678        .is_some_and(|entry| entry.status == MailboxEntryStatus::Cancelled)
679    {
680        return Ok(Some(CancelBackgroundRunResult::Pending));
681    }
682
683    // If the entry exists but is already accepted, try extracting run_id from
684    // the payload to cancel the active run.
685    let entry = match cancelled {
686        Some(e) => Some(e),
687        None => mailbox_store
688            .load_mailbox_entry(id)
689            .await
690            .map_err(mailbox_error)?,
691    };
692    if let Some(entry) = entry {
693        if entry.status == MailboxEntryStatus::Accepted {
694            if let Some(run_id) = entry
695                .payload
696                .get("run_id")
697                .and_then(|v| v.as_str())
698                .filter(|rid| *rid != id)
699            {
700                if os.cancel_active_run_by_id(run_id).await {
701                    return Ok(Some(CancelBackgroundRunResult::Active));
702                }
703            }
704        }
705    }
706
707    Ok(None)
708}
709
710// ---------------------------------------------------------------------------
711// Generic mailbox dispatcher
712// ---------------------------------------------------------------------------
713
714#[derive(Clone)]
715pub struct MailboxDispatcher {
716    mailbox_store: Arc<dyn MailboxStore>,
717    receiver: Arc<dyn MailboxReceiver>,
718    consumer_id: String,
719    poll_interval: Duration,
720    lease_duration_ms: u64,
721    retry_delay_ms: u64,
722    batch_size: usize,
723    max_attempts: u32,
724}
725
726impl MailboxDispatcher {
727    pub fn new(mailbox_store: Arc<dyn MailboxStore>, receiver: Arc<dyn MailboxReceiver>) -> Self {
728        Self {
729            mailbox_store,
730            receiver,
731            consumer_id: format!("mailbox-{}", new_id()),
732            poll_interval: Duration::from_millis(DEFAULT_MAILBOX_POLL_INTERVAL_MS),
733            lease_duration_ms: DEFAULT_MAILBOX_LEASE_MS,
734            retry_delay_ms: DEFAULT_MAILBOX_RETRY_MS,
735            batch_size: DEFAULT_MAILBOX_BATCH_SIZE,
736            max_attempts: DEFAULT_MAILBOX_MAX_ATTEMPTS,
737        }
738    }
739
740    #[must_use]
741    pub fn with_consumer_id(mut self, consumer_id: impl Into<String>) -> Self {
742        self.consumer_id = consumer_id.into();
743        self
744    }
745
746    async fn dispatch_claimed_entry(&self, entry: MailboxEntry) -> Result<(), ApiError> {
747        let claim_token = entry.claim_token.clone().ok_or_else(|| {
748            ApiError::Internal(format!(
749                "mailbox entry '{}' was claimed without claim_token",
750                entry.entry_id
751            ))
752        })?;
753
754        let entry_id = entry.entry_id.as_str();
755
756        // Check if superseded since claim (generation mismatch)
757        if self
758            .mailbox_store
759            .load_mailbox_state(&entry.mailbox_id)
760            .await
761            .map_err(mailbox_error)?
762            .is_some_and(|state| state.current_generation != entry.generation)
763        {
764            let _ = self
765                .mailbox_store
766                .supersede_mailbox_entry(entry_id, now_unix_millis(), "superseded by interrupt")
767                .await;
768            tracing::debug!(entry_id, "mailbox entry superseded by generation mismatch");
769            return Ok(());
770        }
771
772        match self.receiver.receive(&entry).await {
773            ReceiveOutcome::Accepted => {
774                ack_claimed_entry(&self.mailbox_store, entry_id, &claim_token).await?;
775                tracing::debug!(entry_id, "mailbox entry accepted");
776            }
777            ReceiveOutcome::Retry(reason) => {
778                if entry.attempt_count >= self.max_attempts {
779                    dead_letter_claimed_entry(
780                        &self.mailbox_store,
781                        entry_id,
782                        &claim_token,
783                        &format!("max attempts ({}) exceeded: {reason}", self.max_attempts),
784                    )
785                    .await?;
786                    tracing::warn!(
787                        entry_id,
788                        attempts = entry.attempt_count,
789                        "mailbox entry dead-lettered after max attempts"
790                    );
791                } else {
792                    nack_claimed_entry(
793                        &self.mailbox_store,
794                        entry_id,
795                        &claim_token,
796                        self.retry_delay_ms,
797                        &reason,
798                    )
799                    .await?;
800                    tracing::debug!(entry_id, attempts = entry.attempt_count, %reason, "mailbox entry nacked for retry");
801                }
802            }
803            ReceiveOutcome::Reject(reason) => {
804                dead_letter_claimed_entry(&self.mailbox_store, entry_id, &claim_token, &reason)
805                    .await?;
806                tracing::warn!(entry_id, %reason, "mailbox entry rejected");
807            }
808        }
809        Ok(())
810    }
811
812    pub async fn dispatch_ready_once(&self) -> Result<usize, ApiError> {
813        let claimed = self
814            .mailbox_store
815            .claim_mailbox_entries(
816                None,
817                self.batch_size,
818                &self.consumer_id,
819                now_unix_millis(),
820                self.lease_duration_ms,
821            )
822            .await
823            .map_err(mailbox_error)?;
824
825        let count = claimed.len();
826        if count == 0 {
827            return Ok(0);
828        }
829
830        let mut futures: FuturesUnordered<_> = claimed
831            .into_iter()
832            .map(|entry| self.dispatch_claimed_entry(entry))
833            .collect();
834
835        let mut first_error: Option<ApiError> = None;
836        while let Some(result) = futures.next().await {
837            if let Err(err) = result {
838                if first_error.is_none() {
839                    first_error = Some(err);
840                }
841            }
842        }
843
844        if let Some(err) = first_error {
845            return Err(err);
846        }
847        Ok(count)
848    }
849
850    pub async fn run_forever(self) {
851        let gc_interval = Duration::from_secs(DEFAULT_MAILBOX_GC_INTERVAL_SECS);
852        let mut last_gc = std::time::Instant::now();
853
854        loop {
855            if let Err(err) = self.dispatch_ready_once().await {
856                tracing::error!("mailbox dispatcher failed: {err}");
857            }
858
859            if last_gc.elapsed() >= gc_interval {
860                let cutoff = now_unix_millis().saturating_sub(DEFAULT_MAILBOX_GC_TTL_MS);
861                match self
862                    .mailbox_store
863                    .purge_terminal_mailbox_entries(cutoff)
864                    .await
865                {
866                    Ok(0) => {}
867                    Ok(n) => tracing::debug!(purged = n, "mailbox GC purged terminal entries"),
868                    Err(err) => tracing::warn!("mailbox GC failed: {err}"),
869                }
870                last_gc = std::time::Instant::now();
871            }
872
873            tokio::time::sleep(self.poll_interval).await;
874        }
875    }
876}
877
878// ---------------------------------------------------------------------------
879// Tests
880// ---------------------------------------------------------------------------
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use std::sync::Arc;
886    use tirea_agentos::composition::{AgentDefinition, AgentDefinitionSpec, AgentOsBuilder};
887    use tirea_agentos::contracts::runtime::behavior::ReadOnlyContext;
888    use tirea_agentos::contracts::runtime::phase::{ActionSet, BeforeInferenceAction};
889    use tirea_agentos::contracts::{AgentBehavior, TerminationReason};
890    use tirea_contract::storage::{
891        MailboxEntryOrigin, MailboxReader, MailboxWriter, RunOrigin, RunReader, ThreadReader,
892    };
893    use tirea_contract::testing::MailboxEntryBuilder;
894    use tirea_store_adapters::MemoryStore;
895
896    struct TerminatePlugin;
897
898    #[async_trait]
899    impl AgentBehavior for TerminatePlugin {
900        fn id(&self) -> &str {
901            "mailbox_terminate"
902        }
903
904        async fn before_inference(
905            &self,
906            _ctx: &ReadOnlyContext<'_>,
907        ) -> ActionSet<BeforeInferenceAction> {
908            ActionSet::single(BeforeInferenceAction::Terminate(
909                TerminationReason::BehaviorRequested,
910            ))
911        }
912    }
913
914    fn make_os(store: Arc<MemoryStore>) -> Arc<AgentOs> {
915        Arc::new(
916            AgentOsBuilder::new()
917                .with_registered_behavior("mailbox_terminate", Arc::new(TerminatePlugin))
918                .with_agent_spec(AgentDefinitionSpec::local_with_id(
919                    "test",
920                    AgentDefinition {
921                        id: "test".to_string(),
922                        behavior_ids: vec!["mailbox_terminate".to_string()],
923                        ..Default::default()
924                    },
925                ))
926                .with_agent_state_store(store)
927                .build()
928                .expect("build AgentOs"),
929        )
930    }
931
932    #[test]
933    fn mailbox_entry_origin_follows_run_origin() {
934        let mut request = RunRequest {
935            agent_id: "test".to_string(),
936            thread_id: Some("mailbox-origin-thread".to_string()),
937            run_id: Some("mailbox-origin-run".to_string()),
938            parent_run_id: None,
939            parent_thread_id: None,
940            resource_id: None,
941            origin: RunOrigin::Internal,
942            state: None,
943            messages: vec![],
944            initial_decisions: vec![],
945            source_mailbox_entry_id: None,
946        };
947
948        let internal = mailbox_entry_from_request(&request, 0, &EnqueueOptions::default(), 1);
949        assert_eq!(internal.origin, MailboxEntryOrigin::Internal);
950
951        request.origin = RunOrigin::AgUi;
952        let external = mailbox_entry_from_request(&request, 0, &EnqueueOptions::default(), 1);
953        assert_eq!(external.origin, MailboxEntryOrigin::External);
954    }
955
956    #[tokio::test]
957    async fn dispatcher_accepts_enqueued_background_run() {
958        let store = Arc::new(MemoryStore::new());
959        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
960        let os = make_os(store.clone());
961
962        let (thread_id, run_id, _entry_id) = enqueue_background_run(
963            &os,
964            &mailbox_store,
965            "test",
966            RunRequest {
967                agent_id: "test".to_string(),
968                thread_id: Some("mailbox-thread".to_string()),
969                run_id: Some("mailbox-run".to_string()),
970                parent_run_id: None,
971                parent_thread_id: None,
972                resource_id: None,
973                origin: Default::default(),
974                state: None,
975                messages: vec![],
976                initial_decisions: vec![],
977                source_mailbox_entry_id: None,
978            },
979            EnqueueOptions::default(),
980        )
981        .await
982        .expect("enqueue background run");
983        assert_eq!(thread_id, "mailbox-thread");
984        assert_eq!(run_id, "mailbox-run");
985
986        let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
987        MailboxDispatcher::new(mailbox_store.clone(), receiver)
988            .with_consumer_id("test-dispatcher")
989            .dispatch_ready_once()
990            .await
991            .expect("dispatch mailbox run");
992
993        // The entry should be accepted
994        let page = mailbox_store
995            .list_mailbox_entries(&MailboxQuery {
996                mailbox_id: Some("mailbox-thread".to_string()),
997                status: Some(MailboxEntryStatus::Accepted),
998                ..Default::default()
999            })
1000            .await
1001            .expect("list mailbox entries");
1002        assert_eq!(page.items.len(), 1);
1003        assert_eq!(page.items[0].status, MailboxEntryStatus::Accepted);
1004
1005        let run_record = RunReader::load_run(store.as_ref(), &run_id)
1006            .await
1007            .expect("load run record")
1008            .expect("run record should be persisted");
1009        assert_eq!(run_record.thread_id, thread_id);
1010
1011        let thread = ThreadReader::load_thread(store.as_ref(), &thread_id)
1012            .await
1013            .expect("load thread")
1014            .expect("thread should exist");
1015        assert_eq!(thread.id, thread_id);
1016    }
1017
1018    #[tokio::test]
1019    async fn dispatcher_skips_claimed_entry_after_interrupt_supersedes_it() {
1020        let store = Arc::new(MemoryStore::new());
1021        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1022        let os = make_os(store.clone());
1023
1024        let (_thread_id, _run_id, _entry_id) = enqueue_background_run(
1025            &os,
1026            &mailbox_store,
1027            "test",
1028            RunRequest {
1029                agent_id: "test".to_string(),
1030                thread_id: Some("mailbox-supersede-thread".to_string()),
1031                run_id: Some("mailbox-supersede-run".to_string()),
1032                parent_run_id: None,
1033                parent_thread_id: None,
1034                resource_id: None,
1035                origin: Default::default(),
1036                state: None,
1037                messages: vec![],
1038                initial_decisions: vec![],
1039                source_mailbox_entry_id: None,
1040            },
1041            EnqueueOptions::default(),
1042        )
1043        .await
1044        .expect("enqueue background run");
1045
1046        let claimed = mailbox_store
1047            .claim_mailbox_entries(None, 1, "test-dispatcher", now_unix_millis(), 5_000)
1048            .await
1049            .expect("claim mailbox entry");
1050        assert_eq!(claimed.len(), 1);
1051
1052        mailbox_store
1053            .interrupt_mailbox("mailbox-supersede-thread", now_unix_millis())
1054            .await
1055            .expect("interrupt mailbox");
1056
1057        let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
1058        let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1059            .with_consumer_id("test-dispatcher");
1060        dispatcher
1061            .dispatch_claimed_entry(claimed.into_iter().next().expect("claimed entry"))
1062            .await
1063            .expect("dispatch after supersede");
1064
1065        // The entry should be superseded
1066        let page = mailbox_store
1067            .list_mailbox_entries(&MailboxQuery {
1068                mailbox_id: Some("mailbox-supersede-thread".to_string()),
1069                ..Default::default()
1070            })
1071            .await
1072            .expect("list mailbox entries");
1073        assert_eq!(page.items.len(), 1);
1074        assert_eq!(page.items[0].status, MailboxEntryStatus::Superseded);
1075    }
1076
1077    // -----------------------------------------------------------------------
1078    // AgentReceiver tests
1079    // -----------------------------------------------------------------------
1080
1081    #[tokio::test]
1082    async fn agent_receiver_rejects_invalid_payload() {
1083        let store = Arc::new(MemoryStore::new());
1084        let os = make_os(store.clone());
1085        let receiver = AgentReceiver::new(os);
1086
1087        let entry = MailboxEntryBuilder::queued("entry-bad-json", "mailbox-bad")
1088            .with_payload(serde_json::json!("not a valid RunRequest"))
1089            .claimed("token", "test", u64::MAX)
1090            .build();
1091
1092        match receiver.receive(&entry).await {
1093            ReceiveOutcome::Reject(reason) => {
1094                assert!(reason.contains("invalid payload"), "got: {reason}");
1095            }
1096            other => panic!("expected Reject, got {other:?}"),
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn agent_receiver_rejects_unknown_agent() {
1102        let store = Arc::new(MemoryStore::new());
1103        let os = make_os(store.clone());
1104        let receiver = AgentReceiver::new(os);
1105
1106        let request = RunRequest {
1107            agent_id: "nonexistent-agent".to_string(),
1108            thread_id: Some("thread-unknown".to_string()),
1109            run_id: Some("run-unknown".to_string()),
1110            parent_run_id: None,
1111            parent_thread_id: None,
1112            resource_id: None,
1113            origin: Default::default(),
1114            state: None,
1115            messages: vec![],
1116            initial_decisions: vec![],
1117            source_mailbox_entry_id: None,
1118        };
1119
1120        let entry = MailboxEntryBuilder::queued("entry-unknown-agent", "mailbox-unknown")
1121            .with_payload(serde_json::to_value(&request).unwrap())
1122            .claimed("token", "test", u64::MAX)
1123            .build();
1124
1125        match receiver.receive(&entry).await {
1126            ReceiveOutcome::Reject(reason) => {
1127                assert!(
1128                    reason.contains("nonexistent-agent")
1129                        || reason.contains("not found")
1130                        || reason.contains("resolve"),
1131                    "expected resolve error, got: {reason}"
1132                );
1133            }
1134            other => panic!("expected Reject for unknown agent, got {other:?}"),
1135        }
1136    }
1137
1138    #[tokio::test]
1139    async fn agent_receiver_accepts_valid_request() {
1140        let store = Arc::new(MemoryStore::new());
1141        let os = make_os(store.clone());
1142        let receiver = AgentReceiver::new(os);
1143
1144        let request = RunRequest {
1145            agent_id: "test".to_string(),
1146            thread_id: Some("thread-valid".to_string()),
1147            run_id: Some("run-valid".to_string()),
1148            parent_run_id: None,
1149            parent_thread_id: None,
1150            resource_id: None,
1151            origin: Default::default(),
1152            state: None,
1153            messages: vec![],
1154            initial_decisions: vec![],
1155            source_mailbox_entry_id: None,
1156        };
1157
1158        let entry = MailboxEntryBuilder::queued("entry-valid", "thread-valid")
1159            .with_payload(serde_json::to_value(&request).unwrap())
1160            .claimed("token", "test", u64::MAX)
1161            .build();
1162
1163        match receiver.receive(&entry).await {
1164            ReceiveOutcome::Accepted => {} // success
1165            other => panic!("expected Accepted, got {other:?}"),
1166        }
1167
1168        // Give the spawned drain task a moment to finish
1169        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1170
1171        // Run should be persisted
1172        let run = RunReader::load_run(store.as_ref(), "run-valid")
1173            .await
1174            .expect("load run")
1175            .expect("run should be persisted");
1176        assert_eq!(run.thread_id, "thread-valid");
1177    }
1178
1179    // -----------------------------------------------------------------------
1180    // MailboxDispatcher with mock receiver
1181    // -----------------------------------------------------------------------
1182
1183    struct MockReceiver {
1184        outcome: std::sync::Mutex<Option<ReceiveOutcome>>,
1185    }
1186
1187    impl MockReceiver {
1188        fn always(outcome: ReceiveOutcome) -> Self {
1189            Self {
1190                outcome: std::sync::Mutex::new(Some(outcome)),
1191            }
1192        }
1193    }
1194
1195    #[async_trait]
1196    impl MailboxReceiver for MockReceiver {
1197        async fn receive(&self, _entry: &MailboxEntry) -> ReceiveOutcome {
1198            self.outcome
1199                .lock()
1200                .unwrap()
1201                .clone()
1202                .unwrap_or(ReceiveOutcome::Accepted)
1203        }
1204    }
1205
1206    #[tokio::test]
1207    async fn dispatcher_reject_outcome_dead_letters_entry() {
1208        let store = Arc::new(MemoryStore::new());
1209        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1210
1211        store.ensure_mailbox_state("mbx-reject", 1).await.unwrap();
1212        let entry = MailboxEntryBuilder::queued("entry-reject", "mbx-reject")
1213            .with_payload(serde_json::json!({"test": true}))
1214            .build();
1215        store.enqueue_mailbox_entry(&entry).await.unwrap();
1216
1217        let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1218            ReceiveOutcome::Reject("bad message".to_string()),
1219        ));
1220        MailboxDispatcher::new(mailbox_store.clone(), receiver)
1221            .with_consumer_id("test-dispatcher")
1222            .dispatch_ready_once()
1223            .await
1224            .expect("dispatch should succeed");
1225
1226        let loaded = store
1227            .load_mailbox_entry("entry-reject")
1228            .await
1229            .unwrap()
1230            .unwrap();
1231        assert_eq!(loaded.status, MailboxEntryStatus::DeadLetter);
1232        assert_eq!(loaded.last_error.as_deref(), Some("bad message"));
1233    }
1234
1235    #[tokio::test]
1236    async fn dispatcher_retry_outcome_nacks_entry_for_retry() {
1237        let store = Arc::new(MemoryStore::new());
1238        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1239
1240        store.ensure_mailbox_state("mbx-retry", 1).await.unwrap();
1241        let entry = MailboxEntryBuilder::queued("entry-retry", "mbx-retry")
1242            .with_payload(serde_json::json!({"test": true}))
1243            .build();
1244        store.enqueue_mailbox_entry(&entry).await.unwrap();
1245
1246        let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1247            ReceiveOutcome::Retry("try later".to_string()),
1248        ));
1249        MailboxDispatcher::new(mailbox_store.clone(), receiver)
1250            .with_consumer_id("test-dispatcher")
1251            .dispatch_ready_once()
1252            .await
1253            .expect("dispatch should succeed");
1254
1255        let loaded = store
1256            .load_mailbox_entry("entry-retry")
1257            .await
1258            .unwrap()
1259            .unwrap();
1260        assert_eq!(loaded.status, MailboxEntryStatus::Queued);
1261        assert_eq!(loaded.last_error.as_deref(), Some("try later"));
1262        assert_eq!(loaded.attempt_count, 1);
1263    }
1264
1265    #[tokio::test]
1266    async fn dispatcher_retry_at_max_attempts_dead_letters() {
1267        let store = Arc::new(MemoryStore::new());
1268        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1269
1270        store.ensure_mailbox_state("mbx-max", 1).await.unwrap();
1271        let entry = MailboxEntryBuilder::queued("entry-max", "mbx-max")
1272            .with_payload(serde_json::json!({"test": true}))
1273            .with_attempt_count(10)
1274            .build(); // Already at max (DEFAULT_MAILBOX_MAX_ATTEMPTS = 10)
1275        store.enqueue_mailbox_entry(&entry).await.unwrap();
1276
1277        let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1278            ReceiveOutcome::Retry("still failing".to_string()),
1279        ));
1280        let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1281            .with_consumer_id("test-dispatcher");
1282        dispatcher
1283            .dispatch_ready_once()
1284            .await
1285            .expect("dispatch should succeed");
1286
1287        let loaded = store
1288            .load_mailbox_entry("entry-max")
1289            .await
1290            .unwrap()
1291            .unwrap();
1292        assert_eq!(loaded.status, MailboxEntryStatus::DeadLetter);
1293        assert!(
1294            loaded
1295                .last_error
1296                .as_deref()
1297                .unwrap()
1298                .contains("max attempts"),
1299            "error should mention max attempts: {:?}",
1300            loaded.last_error
1301        );
1302    }
1303
1304    #[tokio::test]
1305    async fn dispatcher_generation_mismatch_supersedes_entry() {
1306        let store = Arc::new(MemoryStore::new());
1307        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1308
1309        store.ensure_mailbox_state("mbx-genm", 1).await.unwrap();
1310        let entry = MailboxEntryBuilder::queued("entry-genm", "mbx-genm")
1311            .with_payload(serde_json::json!({"test": true}))
1312            .build();
1313        store.enqueue_mailbox_entry(&entry).await.unwrap();
1314
1315        // Claim the entry
1316        let claimed = store
1317            .claim_mailbox_entries(None, 1, "test-dispatcher", now_unix_millis(), 30_000)
1318            .await
1319            .unwrap();
1320        assert_eq!(claimed.len(), 1);
1321        let claimed_entry = claimed.into_iter().next().unwrap();
1322
1323        // Interrupt the mailbox to bump generation
1324        store
1325            .interrupt_mailbox("mbx-genm", now_unix_millis())
1326            .await
1327            .unwrap();
1328
1329        // Dispatch the claimed entry — dispatcher should detect generation mismatch
1330        let receiver: Arc<dyn MailboxReceiver> =
1331            Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1332        let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1333            .with_consumer_id("test-dispatcher");
1334        dispatcher
1335            .dispatch_claimed_entry(claimed_entry)
1336            .await
1337            .expect("dispatch should succeed");
1338
1339        let loaded = store
1340            .load_mailbox_entry("entry-genm")
1341            .await
1342            .unwrap()
1343            .unwrap();
1344        assert_eq!(loaded.status, MailboxEntryStatus::Superseded);
1345    }
1346
1347    #[tokio::test]
1348    async fn dispatcher_accept_outcome_acks_entry() {
1349        let store = Arc::new(MemoryStore::new());
1350        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1351
1352        store.ensure_mailbox_state("mbx-ack", 1).await.unwrap();
1353        let entry = MailboxEntryBuilder::queued("entry-ack", "mbx-ack")
1354            .with_payload(serde_json::json!({"test": true}))
1355            .build();
1356        store.enqueue_mailbox_entry(&entry).await.unwrap();
1357
1358        let receiver: Arc<dyn MailboxReceiver> =
1359            Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1360        MailboxDispatcher::new(mailbox_store.clone(), receiver)
1361            .with_consumer_id("test-dispatcher")
1362            .dispatch_ready_once()
1363            .await
1364            .expect("dispatch should succeed");
1365
1366        let loaded = store
1367            .load_mailbox_entry("entry-ack")
1368            .await
1369            .unwrap()
1370            .unwrap();
1371        assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1372    }
1373
1374    #[tokio::test]
1375    async fn dispatcher_empty_mailbox_returns_zero() {
1376        let store = Arc::new(MemoryStore::new());
1377        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1378
1379        let receiver: Arc<dyn MailboxReceiver> =
1380            Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1381        let count = MailboxDispatcher::new(mailbox_store, receiver)
1382            .with_consumer_id("test-dispatcher")
1383            .dispatch_ready_once()
1384            .await
1385            .expect("dispatch should succeed");
1386        assert_eq!(count, 0);
1387    }
1388
1389    #[tokio::test]
1390    async fn dispatcher_processes_multiple_entries_in_batch() {
1391        let store = Arc::new(MemoryStore::new());
1392        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1393
1394        // Use distinct mailboxes so each entry can be claimed concurrently
1395        // (exclusive claim limits one active claim per mailbox).
1396        for i in 0..5 {
1397            let mid = format!("mbx-batch-{i}");
1398            store.ensure_mailbox_state(&mid, 1).await.unwrap();
1399            let entry = MailboxEntryBuilder::queued(format!("entry-batch-{i}"), &mid)
1400                .with_payload(serde_json::json!({"test": true}))
1401                .build();
1402            store.enqueue_mailbox_entry(&entry).await.unwrap();
1403        }
1404
1405        let receiver: Arc<dyn MailboxReceiver> =
1406            Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1407        let count = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1408            .with_consumer_id("test-dispatcher")
1409            .dispatch_ready_once()
1410            .await
1411            .expect("dispatch should succeed");
1412        assert_eq!(count, 5);
1413
1414        // All entries should be accepted
1415        let page = mailbox_store
1416            .list_mailbox_entries(&MailboxQuery {
1417                status: Some(MailboxEntryStatus::Accepted),
1418                limit: 100,
1419                ..Default::default()
1420            })
1421            .await
1422            .unwrap();
1423        assert_eq!(page.total, 5);
1424    }
1425
1426    // -----------------------------------------------------------------------
1427    // Service function tests
1428    // -----------------------------------------------------------------------
1429
1430    #[tokio::test]
1431    async fn enqueue_background_run_returns_three_tuple() {
1432        let store = Arc::new(MemoryStore::new());
1433        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1434        let os = make_os(store.clone());
1435
1436        let (thread_id, run_id, entry_id) = enqueue_background_run(
1437            &os,
1438            &mailbox_store,
1439            "test",
1440            RunRequest {
1441                agent_id: "test".to_string(),
1442                thread_id: Some("thread-3tuple".to_string()),
1443                run_id: Some("run-3tuple".to_string()),
1444                parent_run_id: None,
1445                parent_thread_id: None,
1446                resource_id: None,
1447                origin: Default::default(),
1448                state: None,
1449                messages: vec![],
1450                initial_decisions: vec![],
1451                source_mailbox_entry_id: None,
1452            },
1453            EnqueueOptions::default(),
1454        )
1455        .await
1456        .expect("enqueue");
1457
1458        assert_eq!(thread_id, "thread-3tuple");
1459        assert_eq!(run_id, "run-3tuple");
1460        assert!(!entry_id.is_empty());
1461
1462        // Verify the entry exists in the store
1463        let entry = mailbox_store
1464            .load_mailbox_entry(&entry_id)
1465            .await
1466            .unwrap()
1467            .expect("entry should exist");
1468        assert_eq!(entry.mailbox_id, "thread-3tuple");
1469        assert_eq!(entry.status, MailboxEntryStatus::Queued);
1470
1471        // Verify payload contains run_id
1472        assert_eq!(
1473            entry.payload.get("run_id").and_then(|v| v.as_str()),
1474            Some("run-3tuple")
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn enqueue_background_run_rejects_unknown_agent() {
1480        let store = Arc::new(MemoryStore::new());
1481        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1482        let os = make_os(store.clone());
1483
1484        let result = enqueue_background_run(
1485            &os,
1486            &mailbox_store,
1487            "nonexistent",
1488            RunRequest {
1489                agent_id: "nonexistent".to_string(),
1490                thread_id: Some("thread-no-agent".to_string()),
1491                run_id: Some("run-no-agent".to_string()),
1492                parent_run_id: None,
1493                parent_thread_id: None,
1494                resource_id: None,
1495                origin: Default::default(),
1496                state: None,
1497                messages: vec![],
1498                initial_decisions: vec![],
1499                source_mailbox_entry_id: None,
1500            },
1501            EnqueueOptions::default(),
1502        )
1503        .await;
1504
1505        assert!(result.is_err());
1506    }
1507
1508    #[tokio::test]
1509    async fn load_background_task_finds_by_entry_id() {
1510        let store = Arc::new(MemoryStore::new());
1511        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1512        let os = make_os(store.clone());
1513
1514        let (_thread_id, _run_id, entry_id) = enqueue_background_run(
1515            &os,
1516            &mailbox_store,
1517            "test",
1518            RunRequest {
1519                agent_id: "test".to_string(),
1520                thread_id: Some("thread-lookup".to_string()),
1521                run_id: Some("run-lookup".to_string()),
1522                parent_run_id: None,
1523                parent_thread_id: None,
1524                resource_id: None,
1525                origin: Default::default(),
1526                state: None,
1527                messages: vec![],
1528                initial_decisions: vec![],
1529                source_mailbox_entry_id: None,
1530            },
1531            EnqueueOptions::default(),
1532        )
1533        .await
1534        .expect("enqueue");
1535
1536        // Lookup by entry_id returns mailbox entry (queued, no run record yet)
1537        let task = load_background_task(store.as_ref(), store.as_ref(), &entry_id)
1538            .await
1539            .expect("load background task");
1540        assert!(matches!(task, Some(BackgroundTaskLookup::Mailbox(_))));
1541    }
1542
1543    #[tokio::test]
1544    async fn load_background_task_cross_references_accepted_entry() {
1545        let store = Arc::new(MemoryStore::new());
1546        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1547        let os = make_os(store.clone());
1548
1549        let (thread_id, run_id, entry_id) = enqueue_background_run(
1550            &os,
1551            &mailbox_store,
1552            "test",
1553            RunRequest {
1554                agent_id: "test".to_string(),
1555                thread_id: Some("thread-xref".to_string()),
1556                run_id: Some("run-xref".to_string()),
1557                parent_run_id: None,
1558                parent_thread_id: None,
1559                resource_id: None,
1560                origin: Default::default(),
1561                state: None,
1562                messages: vec![],
1563                initial_decisions: vec![],
1564                source_mailbox_entry_id: None,
1565            },
1566            EnqueueOptions::default(),
1567        )
1568        .await
1569        .expect("enqueue");
1570
1571        // Dispatch to accept and create run record
1572        let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
1573        MailboxDispatcher::new(mailbox_store.clone(), receiver)
1574            .with_consumer_id("test-xref")
1575            .dispatch_ready_once()
1576            .await
1577            .expect("dispatch");
1578
1579        // Give drain task time to finish
1580        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1581
1582        // Look up by entry_id — should cross-reference to RunRecord
1583        let task = load_background_task(store.as_ref(), store.as_ref(), &entry_id)
1584            .await
1585            .expect("load background task");
1586        match task {
1587            Some(BackgroundTaskLookup::Run(record)) => {
1588                assert_eq!(record.run_id, run_id);
1589                assert_eq!(record.thread_id, thread_id);
1590            }
1591            other => panic!("expected Run variant, got {other:?}"),
1592        }
1593    }
1594
1595    #[tokio::test]
1596    async fn load_background_task_returns_none_for_unknown_id() {
1597        let store = Arc::new(MemoryStore::new());
1598        let task = load_background_task(store.as_ref(), store.as_ref(), "nonexistent")
1599            .await
1600            .expect("load background task");
1601        assert!(task.is_none());
1602    }
1603
1604    #[tokio::test]
1605    async fn try_cancel_queued_entry_by_entry_id() {
1606        let store = Arc::new(MemoryStore::new());
1607        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1608        let os = make_os(store.clone());
1609
1610        let (_thread_id, _run_id, entry_id) = enqueue_background_run(
1611            &os,
1612            &mailbox_store,
1613            "test",
1614            RunRequest {
1615                agent_id: "test".to_string(),
1616                thread_id: Some("thread-cancel-q".to_string()),
1617                run_id: Some("run-cancel-q".to_string()),
1618                parent_run_id: None,
1619                parent_thread_id: None,
1620                resource_id: None,
1621                origin: Default::default(),
1622                state: None,
1623                messages: vec![],
1624                initial_decisions: vec![],
1625                source_mailbox_entry_id: None,
1626            },
1627            EnqueueOptions::default(),
1628        )
1629        .await
1630        .expect("enqueue");
1631
1632        let result = try_cancel_active_or_queued_run_by_id(&os, &mailbox_store, &entry_id).await;
1633        assert!(matches!(
1634            result,
1635            Ok(Some(CancelBackgroundRunResult::Pending))
1636        ));
1637
1638        let entry = store.load_mailbox_entry(&entry_id).await.unwrap().unwrap();
1639        assert_eq!(entry.status, MailboxEntryStatus::Cancelled);
1640    }
1641
1642    #[tokio::test]
1643    async fn try_cancel_returns_none_for_unknown_id() {
1644        let store = Arc::new(MemoryStore::new());
1645        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1646        let os = make_os(store.clone());
1647
1648        let result =
1649            try_cancel_active_or_queued_run_by_id(&os, &mailbox_store, "nonexistent").await;
1650        assert!(matches!(result, Ok(None)));
1651    }
1652
1653    #[tokio::test]
1654    async fn cancel_pending_for_mailbox_excludes_specified_entry() {
1655        let store = Arc::new(MemoryStore::new());
1656        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1657        let os = make_os(store.clone());
1658
1659        let (_t1, _r1, entry1) = enqueue_background_run(
1660            &os,
1661            &mailbox_store,
1662            "test",
1663            RunRequest {
1664                agent_id: "test".to_string(),
1665                thread_id: Some("thread-cancel-pen".to_string()),
1666                run_id: Some("run-cancel-pen-1".to_string()),
1667                parent_run_id: None,
1668                parent_thread_id: None,
1669                resource_id: None,
1670                origin: Default::default(),
1671                state: None,
1672                messages: vec![],
1673                initial_decisions: vec![],
1674                source_mailbox_entry_id: None,
1675            },
1676            EnqueueOptions::default(),
1677        )
1678        .await
1679        .expect("enqueue 1");
1680
1681        let (_t2, _r2, entry2) = enqueue_background_run(
1682            &os,
1683            &mailbox_store,
1684            "test",
1685            RunRequest {
1686                agent_id: "test".to_string(),
1687                thread_id: Some("thread-cancel-pen".to_string()),
1688                run_id: Some("run-cancel-pen-2".to_string()),
1689                parent_run_id: None,
1690                parent_thread_id: None,
1691                resource_id: None,
1692                origin: Default::default(),
1693                state: None,
1694                messages: vec![],
1695                initial_decisions: vec![],
1696                source_mailbox_entry_id: None,
1697            },
1698            EnqueueOptions::default(),
1699        )
1700        .await
1701        .expect("enqueue 2");
1702
1703        // Cancel all pending except entry1
1704        let cancelled =
1705            cancel_pending_for_mailbox(&mailbox_store, "thread-cancel-pen", Some(&entry1))
1706                .await
1707                .expect("cancel pending");
1708        assert_eq!(cancelled.len(), 1);
1709        assert_eq!(cancelled[0].entry_id, entry2);
1710
1711        // entry1 still queued
1712        let e1 = store.load_mailbox_entry(&entry1).await.unwrap().unwrap();
1713        assert_eq!(e1.status, MailboxEntryStatus::Queued);
1714
1715        // entry2 cancelled
1716        let e2 = store.load_mailbox_entry(&entry2).await.unwrap().unwrap();
1717        assert_eq!(e2.status, MailboxEntryStatus::Cancelled);
1718    }
1719
1720    #[tokio::test]
1721    async fn enqueue_generates_ids_when_not_provided() {
1722        let store = Arc::new(MemoryStore::new());
1723        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1724        let os = make_os(store.clone());
1725
1726        let (thread_id, run_id, entry_id) = enqueue_background_run(
1727            &os,
1728            &mailbox_store,
1729            "test",
1730            RunRequest {
1731                agent_id: "test".to_string(),
1732                thread_id: None,
1733                run_id: None,
1734                parent_run_id: None,
1735                parent_thread_id: None,
1736                resource_id: None,
1737                origin: Default::default(),
1738                state: None,
1739                messages: vec![],
1740                initial_decisions: vec![],
1741                source_mailbox_entry_id: None,
1742            },
1743            EnqueueOptions::default(),
1744        )
1745        .await
1746        .expect("enqueue");
1747
1748        assert!(!thread_id.is_empty());
1749        assert!(!run_id.is_empty());
1750        assert!(!entry_id.is_empty());
1751        // All three should be distinct
1752        assert_ne!(thread_id, run_id);
1753        assert_ne!(run_id, entry_id);
1754    }
1755}