tirea_agentos_server/service/
mailbox_service.rs

1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::StreamExt;
8use tirea_agentos::contracts::storage::{
9    MailboxEntry, MailboxEntryStatus, MailboxQuery, MailboxStore,
10};
11use tirea_agentos::contracts::RunRequest;
12use tirea_agentos::runtime::{AgentOs, RunStream};
13use tirea_contract::storage::RunOrigin;
14
15use super::mailbox::{
16    ack_claimed_entry, build_parent_completion_notification_message, dead_letter_claimed_entry,
17    drain_background_run, is_generation_mismatch, mailbox_entry_from_request, mailbox_error,
18    normalize_background_run_request, now_unix_millis, parent_completion_notification_dedupe_key,
19    start_agent_run_for_entry, DEFAULT_MAILBOX_LEASE_MS, INLINE_MAILBOX_AVAILABLE_AT,
20};
21use super::mailbox::{EnqueueOptions, MailboxRunStartError};
22use super::ApiError;
23
24/// Interval between lease renewal heartbeats.
25const LEASE_RENEWAL_INTERVAL: Duration = Duration::from_secs(10);
26
27/// Spawn a background task that periodically extends a mailbox entry's lease.
28///
29/// Returns a [`tokio::task::JoinHandle`] that should be aborted when the run
30/// completes (the handle is cancel-safe).
31fn spawn_lease_renewal(
32    store: Arc<dyn MailboxStore>,
33    entry_id: String,
34    claim_token: String,
35    lease_ms: u64,
36) -> tokio::task::JoinHandle<()> {
37    tokio::spawn(async move {
38        let mut interval = tokio::time::interval(LEASE_RENEWAL_INTERVAL);
39        // Skip the first immediate tick.
40        interval.tick().await;
41        loop {
42            interval.tick().await;
43            let now = now_unix_millis();
44            match store
45                .extend_lease(&entry_id, &claim_token, lease_ms, now)
46                .await
47            {
48                Ok(true) => {}
49                Ok(false) => {
50                    tracing::debug!(entry_id, "lease renewal: entry no longer claimed, stopping");
51                    break;
52                }
53                Err(err) => {
54                    tracing::warn!(entry_id, %err, "lease renewal failed");
55                    break;
56                }
57            }
58        }
59    })
60}
61
62// ---------------------------------------------------------------------------
63// Control signals
64// ---------------------------------------------------------------------------
65
66/// Control signal for a thread's mailbox.
67#[derive(Debug, Clone)]
68pub enum ControlSignal {
69    /// Cancel the active run but keep pending entries.
70    Cancel,
71    /// Cancel the active run, clear all pending entries, and bump generation.
72    Interrupt,
73}
74
75// ---------------------------------------------------------------------------
76// Per-thread state machine
77// ---------------------------------------------------------------------------
78
79#[derive(Debug, Clone)]
80struct BufferedEntry {
81    entry: MailboxEntry,
82}
83
84enum ThreadStatus {
85    Idle,
86    Running {
87        entry_id: String,
88        claim_token: String,
89    },
90}
91
92struct MailboxInner {
93    status: ThreadStatus,
94    generation: u64,
95    pending: VecDeque<BufferedEntry>,
96    /// Background task that extends the mailbox entry lease while a run is active.
97    lease_renewal: Option<tokio::task::JoinHandle<()>>,
98}
99
100struct ThreadMailbox {
101    thread_id: String,
102    inner: tokio::sync::Mutex<MailboxInner>,
103}
104
105impl ThreadMailbox {
106    fn new(thread_id: String, generation: u64) -> Self {
107        Self {
108            thread_id,
109            inner: tokio::sync::Mutex::new(MailboxInner {
110                status: ThreadStatus::Idle,
111                generation,
112                pending: VecDeque::new(),
113                lease_renewal: None,
114            }),
115        }
116    }
117}
118
119// ---------------------------------------------------------------------------
120// MailboxService
121// ---------------------------------------------------------------------------
122
123/// Event-driven mailbox service that replaces polling-based dispatch.
124///
125/// Each thread gets a `ThreadMailbox` that tracks whether it's idle or running.
126/// When a message arrives and the thread is idle, dispatch happens immediately
127/// (zero-latency). When the thread is busy, messages are buffered and dispatched
128/// automatically on run completion.
129pub struct MailboxService {
130    os: Arc<AgentOs>,
131    mailbox_store: Arc<dyn MailboxStore>,
132    consumer_id: String,
133    mailboxes: tokio::sync::RwLock<HashMap<String, Arc<ThreadMailbox>>>,
134}
135
136impl MailboxService {
137    pub fn new(
138        os: Arc<AgentOs>,
139        mailbox_store: Arc<dyn MailboxStore>,
140        consumer_id: impl Into<String>,
141    ) -> Self {
142        Self {
143            os,
144            mailbox_store,
145            consumer_id: consumer_id.into(),
146            mailboxes: tokio::sync::RwLock::new(HashMap::new()),
147        }
148    }
149
150    /// Returns a reference to the underlying mailbox store.
151    pub fn mailbox_store(&self) -> &Arc<dyn MailboxStore> {
152        &self.mailbox_store
153    }
154
155    /// Get or create the per-thread mailbox.
156    async fn get_or_create_mailbox(
157        self: &Arc<Self>,
158        thread_id: &str,
159        generation: u64,
160    ) -> Arc<ThreadMailbox> {
161        // Fast path: read lock
162        {
163            let map = self.mailboxes.read().await;
164            if let Some(mb) = map.get(thread_id) {
165                return mb.clone();
166            }
167        }
168        // Slow path: write lock
169        let mut map = self.mailboxes.write().await;
170        map.entry(thread_id.to_string())
171            .or_insert_with(|| Arc::new(ThreadMailbox::new(thread_id.to_string(), generation)))
172            .clone()
173    }
174
175    /// Persist a mailbox entry to the store, handling generation retries.
176    async fn enqueue_to_store(
177        &self,
178        request: &RunRequest,
179        options: &EnqueueOptions,
180        available_at: u64,
181    ) -> Result<MailboxEntry, ApiError> {
182        let mailbox_id = request
183            .thread_id
184            .as_ref()
185            .expect("normalized request should have thread_id");
186
187        for _ in 0..2 {
188            let now = now_unix_millis();
189            let state = self
190                .mailbox_store
191                .ensure_mailbox_state(mailbox_id, now)
192                .await
193                .map_err(mailbox_error)?;
194            let entry = mailbox_entry_from_request(
195                request,
196                state.current_generation,
197                options,
198                available_at,
199            );
200            match self.mailbox_store.enqueue_mailbox_entry(&entry).await {
201                Ok(()) => return Ok(entry),
202                Err(tirea_agentos::contracts::storage::MailboxStoreError::AlreadyExists(_))
203                    if options.dedupe_key.is_some() =>
204                {
205                    let dedupe_key = options
206                        .dedupe_key
207                        .as_deref()
208                        .expect("guarded by options.dedupe_key.is_some()");
209                    if let Some(existing) = self
210                        .find_mailbox_entry_by_dedupe_key(mailbox_id, dedupe_key)
211                        .await?
212                    {
213                        return Ok(existing);
214                    }
215                    return Err(ApiError::Internal(format!(
216                        "mailbox dedupe collision for '{mailbox_id}' and key '{dedupe_key}' but existing entry was not found"
217                    )));
218                }
219                Err(err) if is_generation_mismatch(&err) => continue,
220                Err(err) => return Err(mailbox_error(err)),
221            }
222        }
223
224        Err(ApiError::Internal(format!(
225            "mailbox enqueue raced with interrupt for mailbox '{mailbox_id}'"
226        )))
227    }
228
229    async fn find_mailbox_entry_by_dedupe_key(
230        &self,
231        mailbox_id: &str,
232        dedupe_key: &str,
233    ) -> Result<Option<MailboxEntry>, ApiError> {
234        let mut offset = 0;
235        loop {
236            let page = self
237                .mailbox_store
238                .list_mailbox_entries(&MailboxQuery {
239                    mailbox_id: Some(mailbox_id.to_string()),
240                    offset,
241                    limit: 200,
242                    ..Default::default()
243                })
244                .await
245                .map_err(mailbox_error)?;
246            if let Some(entry) = page
247                .items
248                .into_iter()
249                .find(|entry| entry.dedupe_key.as_deref() == Some(dedupe_key))
250            {
251                return Ok(Some(entry));
252            }
253            if !page.has_more {
254                return Ok(None);
255            }
256            offset += 200;
257        }
258    }
259
260    /// Submit a background run request. Returns (thread_id, run_id, entry_id).
261    ///
262    /// If the thread is idle, the run is dispatched immediately.
263    /// If the thread is busy, the entry is buffered for later dispatch.
264    pub async fn submit(
265        self: &Arc<Self>,
266        agent_id: &str,
267        request: RunRequest,
268        options: EnqueueOptions,
269    ) -> Result<(String, String, String), ApiError> {
270        self.os
271            .resolve(agent_id)
272            .map_err(|e| ApiError::from(tirea_agentos::runtime::AgentOsRunError::from(e)))?;
273
274        let request = normalize_background_run_request(agent_id, request);
275        let thread_id = request
276            .thread_id
277            .clone()
278            .expect("normalized request should have thread_id");
279        let run_id = request
280            .run_id
281            .clone()
282            .expect("normalized request should have run_id");
283
284        // WAL: persist to store first
285        let entry = self
286            .enqueue_to_store(&request, &options, now_unix_millis())
287            .await?;
288        let entry_id = entry.entry_id.clone();
289        let generation = entry.generation;
290
291        let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
292        let mut inner = mailbox.inner.lock().await;
293
294        // Update generation if store is ahead
295        if generation > inner.generation {
296            inner.generation = generation;
297        }
298
299        match &inner.status {
300            ThreadStatus::Idle => {
301                drop(inner);
302                self.dispatch_entry(mailbox, entry).await;
303            }
304            ThreadStatus::Running { .. } => {
305                inner.pending.push_back(BufferedEntry { entry });
306            }
307        }
308
309        Ok((thread_id, run_id, entry_id))
310    }
311
312    /// Submit a streaming run request. Returns a RunStream for SSE consumption.
313    ///
314    /// This bypasses the buffering path — the entry is claimed inline and
315    /// the run stream is returned directly. The stream is wrapped so that
316    /// on_run_complete fires when it exhausts.
317    pub async fn submit_streaming(
318        self: &Arc<Self>,
319        agent_id: &str,
320        request: RunRequest,
321        options: EnqueueOptions,
322    ) -> Result<RunStream, ApiError> {
323        self.os
324            .resolve(agent_id)
325            .map_err(|e| ApiError::from(tirea_agentos::runtime::AgentOsRunError::from(e)))?;
326
327        let request = normalize_background_run_request(agent_id, request);
328        let thread_id = request
329            .thread_id
330            .clone()
331            .expect("normalized request should have thread_id");
332
333        // WAL: persist with sentinel available_at so the sweep doesn't grab it
334        let entry = self
335            .enqueue_to_store(&request, &options, INLINE_MAILBOX_AVAILABLE_AT)
336            .await?;
337        let entry_id = entry.entry_id.clone();
338        let generation = entry.generation;
339
340        // Claim the entry inline
341        let Some(claimed) = self
342            .mailbox_store
343            .claim_mailbox_entry(
344                &entry_id,
345                &self.consumer_id,
346                now_unix_millis(),
347                DEFAULT_MAILBOX_LEASE_MS,
348            )
349            .await
350            .map_err(mailbox_error)?
351        else {
352            return Err(ApiError::Internal(format!(
353                "mailbox entry '{entry_id}' could not be claimed for streaming"
354            )));
355        };
356
357        let claim_token = claimed.claim_token.clone().ok_or_else(|| {
358            ApiError::Internal(format!(
359                "mailbox entry '{entry_id}' was claimed without claim_token"
360            ))
361        })?;
362
363        // Register the thread mailbox as Running before starting
364        let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
365        let renewal = spawn_lease_renewal(
366            self.mailbox_store.clone(),
367            entry_id.clone(),
368            claim_token.clone(),
369            DEFAULT_MAILBOX_LEASE_MS,
370        );
371        {
372            let mut inner = mailbox.inner.lock().await;
373            if generation > inner.generation {
374                inner.generation = generation;
375            }
376            inner.status = ThreadStatus::Running {
377                entry_id: entry_id.clone(),
378                claim_token: claim_token.clone(),
379            };
380            inner.lease_renewal = Some(renewal);
381        }
382
383        match start_agent_run_for_entry(&self.os, &self.mailbox_store, &claimed, false).await {
384            Ok(run) => {
385                // Defer ack until stream exhausts via wrap_with_completion.
386                Ok(self.wrap_with_completion(thread_id, run))
387            }
388            Err(MailboxRunStartError::Superseded(error)) => {
389                let _ = self
390                    .mailbox_store
391                    .supersede_mailbox_entry(&entry_id, now_unix_millis(), &error)
392                    .await;
393                // Reset to idle
394                let mut inner = mailbox.inner.lock().await;
395                inner.status = ThreadStatus::Idle;
396                Err(ApiError::BadRequest(error))
397            }
398            Err(MailboxRunStartError::Busy(error)) => {
399                self.mailbox_store
400                    .cancel_mailbox_entry(&entry_id, now_unix_millis())
401                    .await
402                    .map_err(mailbox_error)?;
403                let mut inner = mailbox.inner.lock().await;
404                inner.status = ThreadStatus::Idle;
405                Err(ApiError::BadRequest(error))
406            }
407            Err(MailboxRunStartError::Permanent(error))
408            | Err(MailboxRunStartError::Retryable(error)) => {
409                dead_letter_claimed_entry(&self.mailbox_store, &entry_id, &claim_token, &error)
410                    .await?;
411                let mut inner = mailbox.inner.lock().await;
412                inner.status = ThreadStatus::Idle;
413                Err(ApiError::Internal(error))
414            }
415            Err(MailboxRunStartError::Internal(error)) => {
416                dead_letter_claimed_entry(
417                    &self.mailbox_store,
418                    &entry_id,
419                    &claim_token,
420                    &error.to_string(),
421                )
422                .await?;
423                let mut inner = mailbox.inner.lock().await;
424                inner.status = ThreadStatus::Idle;
425                Err(error)
426            }
427        }
428    }
429
430    /// Dispatch a single entry on a thread. Claims, starts the run, and
431    /// transitions the mailbox to Running.
432    ///
433    /// Returns a boxed future to break the recursive async type cycle
434    /// (dispatch_entry → on_run_complete → dispatch_entry).
435    fn dispatch_entry(
436        self: &Arc<Self>,
437        mailbox: Arc<ThreadMailbox>,
438        entry: MailboxEntry,
439    ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
440        Box::pin(async move {
441            let entry_id = entry.entry_id.clone();
442
443            let claimed = match self
444                .mailbox_store
445                .claim_mailbox_entry(
446                    &entry_id,
447                    &self.consumer_id,
448                    now_unix_millis(),
449                    DEFAULT_MAILBOX_LEASE_MS,
450                )
451                .await
452            {
453                Ok(Some(c)) => c,
454                Ok(None) => {
455                    tracing::debug!(entry_id, "entry already claimed/consumed, skipping");
456                    return;
457                }
458                Err(err) => {
459                    tracing::error!(entry_id, %err, "failed to claim mailbox entry");
460                    return;
461                }
462            };
463
464            let claim_token = match claimed.claim_token.as_deref() {
465                Some(t) => t.to_string(),
466                None => {
467                    tracing::error!(entry_id, "claimed entry missing claim_token");
468                    return;
469                }
470            };
471
472            // Check generation mismatch
473            if self
474                .mailbox_store
475                .load_mailbox_state(&entry.mailbox_id)
476                .await
477                .ok()
478                .flatten()
479                .is_some_and(|state| state.current_generation != entry.generation)
480            {
481                let _ = self
482                    .mailbox_store
483                    .supersede_mailbox_entry(
484                        &entry_id,
485                        now_unix_millis(),
486                        "superseded by interrupt",
487                    )
488                    .await;
489                tracing::debug!(entry_id, "entry superseded by generation mismatch");
490                return;
491            }
492
493            match start_agent_run_for_entry(&self.os, &self.mailbox_store, &claimed, true).await {
494                Ok(run) => {
495                    // Defer ack until run completes — the Claimed status with
496                    // active lease serves as a distributed lock preventing
497                    // concurrent runs on the same thread across nodes.
498                    let renewal = spawn_lease_renewal(
499                        self.mailbox_store.clone(),
500                        entry_id.clone(),
501                        claim_token.clone(),
502                        DEFAULT_MAILBOX_LEASE_MS,
503                    );
504
505                    let run_id = run.run_id.clone();
506                    let completed_run_id = run_id.clone();
507                    let thread_id = mailbox.thread_id.clone();
508
509                    {
510                        let mut inner = mailbox.inner.lock().await;
511                        inner.status = ThreadStatus::Running {
512                            entry_id: entry_id.clone(),
513                            claim_token: claim_token.clone(),
514                        };
515                        inner.lease_renewal = Some(renewal);
516                    }
517
518                    // Spawn background drain with completion callback
519                    let svc = self.clone();
520                    tokio::spawn(async move {
521                        drain_background_run(run).await;
522                        svc.on_run_complete(&thread_id, &completed_run_id).await;
523                    });
524                }
525                Err(MailboxRunStartError::Busy(reason)) => {
526                    // Thread is busy (rare race). Buffer the entry for later.
527                    let _ = super::mailbox::nack_claimed_entry(
528                        &self.mailbox_store,
529                        &entry_id,
530                        &claim_token,
531                        250,
532                        &reason,
533                    )
534                    .await;
535                    let mut inner = mailbox.inner.lock().await;
536                    inner.pending.push_front(BufferedEntry { entry: claimed });
537                    tracing::debug!(entry_id, %reason, "dispatch busy, buffered entry");
538                }
539                Err(MailboxRunStartError::Superseded(error)) => {
540                    let _ = self
541                        .mailbox_store
542                        .supersede_mailbox_entry(&entry_id, now_unix_millis(), &error)
543                        .await;
544                    tracing::debug!(entry_id, %error, "entry superseded during dispatch");
545                    // Try next pending
546                    let svc = self.clone();
547                    let tid = mailbox.thread_id.clone();
548                    tokio::spawn(async move {
549                        svc.try_dispatch_next(&tid).await;
550                    });
551                }
552                Err(MailboxRunStartError::Permanent(error)) => {
553                    let _ = dead_letter_claimed_entry(
554                        &self.mailbox_store,
555                        &entry_id,
556                        &claim_token,
557                        &error,
558                    )
559                    .await;
560                    tracing::warn!(entry_id, %error, "permanent dispatch error");
561                    let svc = self.clone();
562                    let tid = mailbox.thread_id.clone();
563                    tokio::spawn(async move {
564                        svc.try_dispatch_next(&tid).await;
565                    });
566                }
567                Err(MailboxRunStartError::Retryable(error)) => {
568                    let _ = super::mailbox::nack_claimed_entry(
569                        &self.mailbox_store,
570                        &entry_id,
571                        &claim_token,
572                        250,
573                        &error,
574                    )
575                    .await;
576                    tracing::warn!(entry_id, %error, "retryable dispatch error");
577                    // Re-buffer at front for next attempt
578                    let mut inner = mailbox.inner.lock().await;
579                    inner.pending.push_front(BufferedEntry { entry: claimed });
580                }
581                Err(MailboxRunStartError::Internal(error)) => {
582                    let _ = dead_letter_claimed_entry(
583                        &self.mailbox_store,
584                        &entry_id,
585                        &claim_token,
586                        &error.to_string(),
587                    )
588                    .await;
589                    tracing::error!(entry_id, %error, "internal dispatch error");
590                    let svc = self.clone();
591                    let tid = mailbox.thread_id.clone();
592                    tokio::spawn(async move {
593                        svc.try_dispatch_next(&tid).await;
594                    });
595                }
596            }
597        })
598    }
599
600    /// Wrap a RunStream so that `on_run_complete` fires when the stream exhausts.
601    fn wrap_with_completion(self: &Arc<Self>, thread_id: String, run: RunStream) -> RunStream {
602        let svc = self.clone();
603        let tid = thread_id;
604        let completed_run_id = run.run_id.clone();
605        let wrapped = futures::stream::unfold(
606            (run.events, Some(svc), Some(tid), Some(completed_run_id)),
607            |(mut events, svc, tid, run_id)| async move {
608                match events.next().await {
609                    Some(event) => Some((event, (events, svc, tid, run_id))),
610                    None => {
611                        // Stream exhausted — fire completion
612                        if let (Some(svc), Some(tid), Some(run_id)) = (svc, tid, run_id) {
613                            svc.on_run_complete(&tid, &run_id).await;
614                        }
615                        None
616                    }
617                }
618            },
619        );
620        RunStream {
621            thread_id: run.thread_id,
622            run_id: run.run_id,
623            decision_tx: run.decision_tx,
624            events: Box::pin(wrapped),
625        }
626    }
627
628    /// Called when a run completes on a thread. Acks the mailbox entry,
629    /// stops lease renewal, transitions to Idle, and dispatches the next
630    /// pending entry.
631    async fn on_run_complete(self: &Arc<Self>, thread_id: &str, run_id: &str) {
632        if let Err(err) = self.submit_parent_completion_notification(run_id).await {
633            tracing::warn!(thread_id, run_id, %err, "failed to submit parent completion notification");
634        }
635
636        let mailbox = {
637            let map = self.mailboxes.read().await;
638            match map.get(thread_id) {
639                Some(mb) => mb.clone(),
640                None => return,
641            }
642        };
643
644        let next = {
645            let mut inner = mailbox.inner.lock().await;
646
647            // Abort lease renewal and ack the completed entry.
648            if let Some(handle) = inner.lease_renewal.take() {
649                handle.abort();
650            }
651            if let ThreadStatus::Running {
652                entry_id,
653                claim_token,
654                ..
655            } = &inner.status
656            {
657                if let Err(err) =
658                    ack_claimed_entry(&self.mailbox_store, entry_id, claim_token).await
659                {
660                    tracing::error!(entry_id, %err, "failed to ack entry on run completion");
661                }
662            }
663
664            inner.status = ThreadStatus::Idle;
665            inner.pending.pop_front()
666        };
667
668        if let Some(buffered) = next {
669            let svc = self.clone();
670            tokio::spawn(async move {
671                svc.dispatch_entry(mailbox, buffered.entry).await;
672            });
673        }
674    }
675
676    async fn submit_parent_completion_notification(
677        self: &Arc<Self>,
678        completed_run_id: &str,
679    ) -> Result<(), ApiError> {
680        let Some(store) = self.os.agent_state_store().cloned() else {
681            return Ok(());
682        };
683        let Some(child_record) = store
684            .load_run(completed_run_id)
685            .await
686            .map_err(|err| ApiError::Internal(err.to_string()))?
687        else {
688            return Ok(());
689        };
690        if child_record.source_mailbox_entry_id.is_none() {
691            return Ok(());
692        }
693        let Some(parent_run_id) = child_record.parent_run_id.clone() else {
694            return Ok(());
695        };
696        let Some(message) = build_parent_completion_notification_message(&child_record) else {
697            return Ok(());
698        };
699        let Some(parent_record) = store
700            .load_run(&parent_run_id)
701            .await
702            .map_err(|err| ApiError::Internal(err.to_string()))?
703        else {
704            return Err(ApiError::Internal(format!(
705                "parent run '{parent_run_id}' not found for completed child run '{completed_run_id}'"
706            )));
707        };
708        let agent_id = parent_record.agent_id.trim();
709        if agent_id.is_empty() {
710            return Err(ApiError::Internal(format!(
711                "parent run '{parent_run_id}' is missing agent_id"
712            )));
713        }
714
715        let request = RunRequest {
716            agent_id: agent_id.to_string(),
717            thread_id: Some(parent_record.thread_id.clone()),
718            run_id: None,
719            parent_run_id: None,
720            parent_thread_id: parent_record.parent_thread_id.clone(),
721            resource_id: None,
722            origin: RunOrigin::Internal,
723            state: None,
724            messages: vec![message],
725            initial_decisions: Vec::new(),
726            source_mailbox_entry_id: None,
727        };
728        let options = EnqueueOptions {
729            sender_id: Some(format!("run:{completed_run_id}")),
730            priority: 0,
731            dedupe_key: Some(parent_completion_notification_dedupe_key(
732                &parent_run_id,
733                completed_run_id,
734            )),
735        };
736        self.submit(agent_id, request, options).await?;
737        Ok(())
738    }
739
740    /// Try to dispatch the next pending entry for a thread (if idle).
741    async fn try_dispatch_next(self: &Arc<Self>, thread_id: &str) {
742        let mailbox = {
743            let map = self.mailboxes.read().await;
744            match map.get(thread_id) {
745                Some(mb) => mb.clone(),
746                None => return,
747            }
748        };
749
750        let next = {
751            let mut inner = mailbox.inner.lock().await;
752            if !matches!(inner.status, ThreadStatus::Idle) {
753                return;
754            }
755            inner.pending.pop_front()
756        };
757
758        if let Some(buffered) = next {
759            let svc = self.clone();
760            tokio::spawn(async move {
761                svc.dispatch_entry(mailbox, buffered.entry).await;
762            });
763        }
764    }
765
766    /// Send a control signal to a thread's mailbox.
767    pub async fn control(
768        self: &Arc<Self>,
769        thread_id: &str,
770        signal: ControlSignal,
771    ) -> Result<ControlResult, ApiError> {
772        match signal {
773            ControlSignal::Cancel => {
774                let cancelled_run_id = self.os.cancel_active_run_by_thread(thread_id).await;
775                Ok(ControlResult {
776                    cancelled_run_id,
777                    generation: None,
778                    superseded_entries: vec![],
779                })
780            }
781            ControlSignal::Interrupt => {
782                // Interrupt the store (bumps generation, supersedes pending)
783                let interrupted = self
784                    .mailbox_store
785                    .interrupt_mailbox(thread_id, now_unix_millis())
786                    .await
787                    .map_err(mailbox_error)?;
788
789                // Cancel active run
790                let cancelled_run_id = self.os.cancel_active_run_by_thread(thread_id).await;
791
792                let new_generation = interrupted.mailbox_state.current_generation;
793
794                // Clear in-memory pending and update generation
795                let mailbox = {
796                    let map = self.mailboxes.read().await;
797                    map.get(thread_id).cloned()
798                };
799                if let Some(mb) = mailbox {
800                    let mut inner = mb.inner.lock().await;
801                    inner.pending.clear();
802                    inner.generation = new_generation;
803                    // If there's no active run to complete, mark idle
804                    if cancelled_run_id.is_none() {
805                        if let Some(handle) = inner.lease_renewal.take() {
806                            handle.abort();
807                        }
808                        inner.status = ThreadStatus::Idle;
809                    }
810                }
811
812                Ok(ControlResult {
813                    cancelled_run_id,
814                    generation: Some(new_generation),
815                    superseded_entries: interrupted.superseded_entries,
816                })
817            }
818        }
819    }
820
821    /// Recover mailbox state from the persistent store on startup.
822    ///
823    /// Loads all queued entries and buffers them in the appropriate ThreadMailbox.
824    pub async fn recover(self: &Arc<Self>) -> Result<usize, ApiError> {
825        let page = self
826            .mailbox_store
827            .list_mailbox_entries(&MailboxQuery {
828                status: Some(MailboxEntryStatus::Queued),
829                limit: 10_000,
830                ..Default::default()
831            })
832            .await
833            .map_err(mailbox_error)?;
834
835        let mut recovered = 0;
836        for entry in page.items {
837            let thread_id = entry.mailbox_id.clone();
838            let generation = entry.generation;
839
840            // Skip entries with sentinel available_at (inline streaming claims)
841            if entry.available_at == INLINE_MAILBOX_AVAILABLE_AT {
842                continue;
843            }
844
845            let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
846            let mut inner = mailbox.inner.lock().await;
847
848            if generation > inner.generation {
849                inner.generation = generation;
850            }
851
852            match &inner.status {
853                ThreadStatus::Idle if inner.pending.is_empty() => {
854                    // Dispatch immediately
855                    drop(inner);
856                    self.dispatch_entry(mailbox, entry).await;
857                }
858                _ => {
859                    inner.pending.push_back(BufferedEntry { entry });
860                }
861            }
862            recovered += 1;
863        }
864
865        if recovered > 0 {
866            tracing::info!(recovered, "recovered mailbox entries from store");
867        }
868        Ok(recovered)
869    }
870
871    /// Background sweep that picks up orphaned entries.
872    ///
873    /// Runs at a low frequency (30s) as a safety net. Most dispatch happens
874    /// via the event-driven path.
875    pub async fn run_sweep_forever(self: Arc<Self>) {
876        let sweep_interval = Duration::from_secs(30);
877        let gc_interval = Duration::from_secs(60);
878        let gc_ttl_ms: u64 = 24 * 60 * 60 * 1000;
879        let mut last_gc = std::time::Instant::now();
880
881        loop {
882            tokio::time::sleep(sweep_interval).await;
883
884            // Sweep: claim and dispatch any orphaned queued entries
885            match self
886                .mailbox_store
887                .claim_mailbox_entries(
888                    None,
889                    16,
890                    &self.consumer_id,
891                    now_unix_millis(),
892                    DEFAULT_MAILBOX_LEASE_MS,
893                )
894                .await
895            {
896                Ok(claimed) if !claimed.is_empty() => {
897                    tracing::info!(count = claimed.len(), "sweep picked up orphaned entries");
898                    for entry in claimed {
899                        let thread_id = entry.mailbox_id.clone();
900                        let generation = entry.generation;
901                        let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
902                        let is_idle = {
903                            let inner = mailbox.inner.lock().await;
904                            matches!(inner.status, ThreadStatus::Idle)
905                        };
906                        if is_idle {
907                            // Already claimed, dispatch directly
908                            let claim_token = entry.claim_token.clone().unwrap_or_default();
909                            match start_agent_run_for_entry(
910                                &self.os,
911                                &self.mailbox_store,
912                                &entry,
913                                true,
914                            )
915                            .await
916                            {
917                                Ok(run) => {
918                                    let renewal = spawn_lease_renewal(
919                                        self.mailbox_store.clone(),
920                                        entry.entry_id.clone(),
921                                        claim_token.clone(),
922                                        DEFAULT_MAILBOX_LEASE_MS,
923                                    );
924                                    let run_id = run.run_id.clone();
925                                    let completed_run_id = run_id.clone();
926                                    let entry_id = entry.entry_id.clone();
927                                    {
928                                        let mut inner = mailbox.inner.lock().await;
929                                        inner.status = ThreadStatus::Running {
930                                            entry_id,
931                                            claim_token,
932                                        };
933                                        inner.lease_renewal = Some(renewal);
934                                    }
935                                    let svc = self.clone();
936                                    let tid = thread_id.clone();
937                                    tokio::spawn(async move {
938                                        drain_background_run(run).await;
939                                        svc.on_run_complete(&tid, &completed_run_id).await;
940                                    });
941                                }
942                                Err(MailboxRunStartError::Busy(_)) => {
943                                    let _ = super::mailbox::nack_claimed_entry(
944                                        &self.mailbox_store,
945                                        &entry.entry_id,
946                                        &claim_token,
947                                        250,
948                                        "thread busy during sweep",
949                                    )
950                                    .await;
951                                }
952                                Err(MailboxRunStartError::Permanent(error)) => {
953                                    let _ = dead_letter_claimed_entry(
954                                        &self.mailbox_store,
955                                        &entry.entry_id,
956                                        &claim_token,
957                                        &error,
958                                    )
959                                    .await;
960                                }
961                                Err(_) => {
962                                    let _ = super::mailbox::nack_claimed_entry(
963                                        &self.mailbox_store,
964                                        &entry.entry_id,
965                                        &claim_token,
966                                        1000,
967                                        "sweep dispatch failed",
968                                    )
969                                    .await;
970                                }
971                            }
972                        } else {
973                            // Thread is busy, nack for later
974                            let claim_token = entry.claim_token.clone().unwrap_or_default();
975                            let _ = super::mailbox::nack_claimed_entry(
976                                &self.mailbox_store,
977                                &entry.entry_id,
978                                &claim_token,
979                                1000,
980                                "thread busy during sweep",
981                            )
982                            .await;
983                        }
984                    }
985                }
986                Ok(_) => {} // nothing to sweep
987                Err(err) => {
988                    tracing::error!(%err, "sweep failed to claim entries");
989                }
990            }
991
992            // Periodic GC of terminal entries
993            if last_gc.elapsed() >= gc_interval {
994                let cutoff = now_unix_millis().saturating_sub(gc_ttl_ms);
995                match self
996                    .mailbox_store
997                    .purge_terminal_mailbox_entries(cutoff)
998                    .await
999                {
1000                    Ok(0) => {}
1001                    Ok(n) => tracing::debug!(purged = n, "mailbox GC purged terminal entries"),
1002                    Err(err) => tracing::warn!(%err, "mailbox GC failed"),
1003                }
1004                last_gc = std::time::Instant::now();
1005            }
1006        }
1007    }
1008}
1009
1010/// Result of a control signal.
1011pub struct ControlResult {
1012    pub cancelled_run_id: Option<String>,
1013    pub generation: Option<u64>,
1014    pub superseded_entries: Vec<MailboxEntry>,
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use super::*;
1020    use async_trait::async_trait;
1021    use tirea_agentos::composition::{AgentDefinition, AgentDefinitionSpec, AgentOsBuilder};
1022    use tirea_agentos::contracts::runtime::behavior::ReadOnlyContext;
1023    use tirea_agentos::contracts::runtime::phase::{ActionSet, BeforeInferenceAction};
1024    use tirea_agentos::contracts::storage::{MailboxStore, ThreadReader, ThreadWriter};
1025    use tirea_agentos::contracts::{AgentBehavior, TerminationReason};
1026    use tirea_contract::storage::{
1027        MailboxWriter, RunOrigin, RunQuery, RunReader, RunStatus, RunWriter,
1028    };
1029    use tirea_store_adapters::MemoryStore;
1030
1031    struct TerminatePlugin;
1032
1033    #[async_trait]
1034    impl AgentBehavior for TerminatePlugin {
1035        fn id(&self) -> &str {
1036            "svc_terminate"
1037        }
1038
1039        async fn before_inference(
1040            &self,
1041            _ctx: &ReadOnlyContext<'_>,
1042        ) -> ActionSet<BeforeInferenceAction> {
1043            ActionSet::single(BeforeInferenceAction::Terminate(
1044                TerminationReason::BehaviorRequested,
1045            ))
1046        }
1047    }
1048
1049    struct DelayedTerminatePlugin {
1050        id: &'static str,
1051        delay_ms: u64,
1052    }
1053
1054    #[async_trait]
1055    impl AgentBehavior for DelayedTerminatePlugin {
1056        fn id(&self) -> &str {
1057            self.id
1058        }
1059
1060        async fn before_inference(
1061            &self,
1062            _ctx: &ReadOnlyContext<'_>,
1063        ) -> ActionSet<BeforeInferenceAction> {
1064            tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1065            ActionSet::single(BeforeInferenceAction::Terminate(
1066                TerminationReason::BehaviorRequested,
1067            ))
1068        }
1069    }
1070
1071    fn make_os_with_agents(store: Arc<MemoryStore>, agent_ids: &[&str]) -> Arc<AgentOs> {
1072        let mut builder = AgentOsBuilder::new()
1073            .with_registered_behavior("svc_terminate", Arc::new(TerminatePlugin))
1074            .with_agent_state_store(store);
1075        for agent_id in agent_ids {
1076            builder = builder.with_agent_spec(AgentDefinitionSpec::local_with_id(
1077                *agent_id,
1078                AgentDefinition {
1079                    id: (*agent_id).to_string(),
1080                    behavior_ids: vec!["svc_terminate".to_string()],
1081                    ..Default::default()
1082                },
1083            ));
1084        }
1085        Arc::new(builder.build().expect("build AgentOs"))
1086    }
1087
1088    fn make_os(store: Arc<MemoryStore>) -> Arc<AgentOs> {
1089        make_os_with_agents(store, &["test"])
1090    }
1091
1092    fn make_request(thread_id: &str, run_id: &str) -> RunRequest {
1093        RunRequest {
1094            agent_id: "test".to_string(),
1095            thread_id: Some(thread_id.to_string()),
1096            run_id: Some(run_id.to_string()),
1097            parent_run_id: None,
1098            parent_thread_id: None,
1099            resource_id: None,
1100            origin: Default::default(),
1101            state: None,
1102            messages: vec![],
1103            initial_decisions: vec![],
1104            source_mailbox_entry_id: None,
1105        }
1106    }
1107
1108    fn completion_message_for_parent(
1109        messages: &[std::sync::Arc<tirea_agentos::contracts::thread::Message>],
1110        parent_run_id: &str,
1111    ) -> Option<serde_json::Value> {
1112        completion_messages_for_parent(messages, parent_run_id)
1113            .into_iter()
1114            .next()
1115    }
1116
1117    fn completion_messages_for_parent(
1118        messages: &[std::sync::Arc<tirea_agentos::contracts::thread::Message>],
1119        parent_run_id: &str,
1120    ) -> Vec<serde_json::Value> {
1121        messages
1122            .iter()
1123            .map(std::sync::Arc::as_ref)
1124            .filter_map(|message| {
1125                let parsed: serde_json::Value = serde_json::from_str(&message.content).ok()?;
1126                if parsed["type"].as_str() == Some("background_task_notification")
1127                    && parsed["recipient_task_id"].as_str() == Some(parent_run_id)
1128                {
1129                    Some(parsed)
1130                } else {
1131                    None
1132                }
1133            })
1134            .collect()
1135    }
1136
1137    async fn seed_completed_run(
1138        store: &MemoryStore,
1139        run_id: &str,
1140        thread_id: &str,
1141        agent_id: &str,
1142        origin: RunOrigin,
1143        parent_thread_id: Option<&str>,
1144    ) {
1145        let now = now_unix_millis();
1146        let mut record = tirea_contract::storage::RunRecord::new(
1147            run_id.to_string(),
1148            thread_id.to_string(),
1149            agent_id.to_string(),
1150            origin,
1151            RunStatus::Done,
1152            now,
1153        );
1154        record.parent_thread_id = parent_thread_id.map(ToString::to_string);
1155        record.termination_code = Some("natural".to_string());
1156        record.updated_at = now;
1157        store.upsert_run(&record).await.expect("seed completed run");
1158    }
1159
1160    async fn seed_child_terminal_run(
1161        store: &MemoryStore,
1162        run_id: &str,
1163        thread_id: &str,
1164        agent_id: &str,
1165        parent_run_id: &str,
1166        source_mailbox_entry_id: Option<&str>,
1167    ) {
1168        seed_child_terminal_run_with_status(
1169            store,
1170            run_id,
1171            thread_id,
1172            agent_id,
1173            parent_run_id,
1174            source_mailbox_entry_id,
1175            Some("natural"),
1176            None,
1177        )
1178        .await;
1179    }
1180
1181    #[allow(clippy::too_many_arguments)]
1182    async fn seed_child_terminal_run_with_status(
1183        store: &MemoryStore,
1184        run_id: &str,
1185        thread_id: &str,
1186        agent_id: &str,
1187        parent_run_id: &str,
1188        source_mailbox_entry_id: Option<&str>,
1189        termination_code: Option<&str>,
1190        termination_detail: Option<&str>,
1191    ) {
1192        let now = now_unix_millis();
1193        let mut record = tirea_contract::storage::RunRecord::new(
1194            run_id.to_string(),
1195            thread_id.to_string(),
1196            agent_id.to_string(),
1197            RunOrigin::User,
1198            RunStatus::Done,
1199            now,
1200        );
1201        record.parent_run_id = Some(parent_run_id.to_string());
1202        record.source_mailbox_entry_id = source_mailbox_entry_id.map(ToString::to_string);
1203        record.termination_code = termination_code.map(ToString::to_string);
1204        record.termination_detail = termination_detail.map(ToString::to_string);
1205        record.updated_at = now;
1206        store.upsert_run(&record).await.expect("seed child run");
1207    }
1208
1209    async fn wait_for_completion_messages(
1210        store: &MemoryStore,
1211        thread_id: &str,
1212        parent_run_id: &str,
1213        expected: usize,
1214    ) -> Vec<serde_json::Value> {
1215        for _ in 0..40 {
1216            if let Some(thread) = store
1217                .load_thread(thread_id)
1218                .await
1219                .expect("load thread should succeed")
1220            {
1221                let messages = completion_messages_for_parent(&thread.messages, parent_run_id);
1222                if messages.len() >= expected {
1223                    return messages;
1224                }
1225            }
1226            tokio::time::sleep(Duration::from_millis(25)).await;
1227        }
1228
1229        store
1230            .load_thread(thread_id)
1231            .await
1232            .expect("load thread should succeed")
1233            .map(|thread| completion_messages_for_parent(&thread.messages, parent_run_id))
1234            .unwrap_or_default()
1235    }
1236
1237    async fn wait_for_run_record(
1238        store: &MemoryStore,
1239        run_id: &str,
1240    ) -> tirea_contract::storage::RunRecord {
1241        for _ in 0..40 {
1242            if let Some(record) = RunReader::load_run(store, run_id)
1243                .await
1244                .expect("load run should succeed")
1245            {
1246                return record;
1247            }
1248            tokio::time::sleep(Duration::from_millis(25)).await;
1249        }
1250
1251        RunReader::load_run(store, run_id)
1252            .await
1253            .expect("load run should succeed")
1254            .unwrap_or_else(|| panic!("timed out waiting for run '{run_id}'"))
1255    }
1256
1257    #[tokio::test]
1258    async fn submit_dispatches_immediately_when_idle() {
1259        let store = Arc::new(MemoryStore::new());
1260        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1261        let os = make_os(store.clone());
1262
1263        let svc = Arc::new(MailboxService::new(
1264            os.clone(),
1265            mailbox_store.clone(),
1266            "test-svc",
1267        ));
1268
1269        let (thread_id, run_id, entry_id) = svc
1270            .submit(
1271                "test",
1272                make_request("svc-thread-1", "svc-run-1"),
1273                EnqueueOptions::default(),
1274            )
1275            .await
1276            .expect("submit");
1277
1278        assert_eq!(thread_id, "svc-thread-1");
1279        assert_eq!(run_id, "svc-run-1");
1280        assert!(!entry_id.is_empty());
1281
1282        // Give the spawned drain task time to finish
1283        tokio::time::sleep(Duration::from_millis(200)).await;
1284
1285        // Entry should be accepted
1286        let loaded = mailbox_store
1287            .load_mailbox_entry(&entry_id)
1288            .await
1289            .unwrap()
1290            .expect("entry should exist");
1291        assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1292    }
1293
1294    #[tokio::test]
1295    async fn submit_buffers_when_busy_then_dispatches_on_completion() {
1296        let store = Arc::new(MemoryStore::new());
1297        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1298        let os = make_os(store.clone());
1299
1300        let svc = Arc::new(MailboxService::new(
1301            os.clone(),
1302            mailbox_store.clone(),
1303            "test-svc",
1304        ));
1305
1306        // Submit first run
1307        let (_t1, _r1, entry1) = svc
1308            .submit(
1309                "test",
1310                make_request("svc-thread-2", "svc-run-2a"),
1311                EnqueueOptions::default(),
1312            )
1313            .await
1314            .expect("submit 1");
1315
1316        // Submit second run while first is running (should buffer)
1317        let (_t2, _r2, entry2) = svc
1318            .submit(
1319                "test",
1320                make_request("svc-thread-2", "svc-run-2b"),
1321                EnqueueOptions::default(),
1322            )
1323            .await
1324            .expect("submit 2");
1325
1326        // Give time for both runs to complete
1327        tokio::time::sleep(Duration::from_millis(500)).await;
1328
1329        // Both entries should be accepted
1330        let e1 = mailbox_store
1331            .load_mailbox_entry(&entry1)
1332            .await
1333            .unwrap()
1334            .expect("entry1");
1335        assert_eq!(e1.status, MailboxEntryStatus::Accepted);
1336
1337        let e2 = mailbox_store
1338            .load_mailbox_entry(&entry2)
1339            .await
1340            .unwrap()
1341            .expect("entry2");
1342        assert_eq!(e2.status, MailboxEntryStatus::Accepted);
1343    }
1344
1345    #[tokio::test]
1346    async fn control_interrupt_clears_pending() {
1347        let store = Arc::new(MemoryStore::new());
1348        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1349        let os = make_os(store.clone());
1350
1351        let svc = Arc::new(MailboxService::new(
1352            os.clone(),
1353            mailbox_store.clone(),
1354            "test-svc",
1355        ));
1356
1357        // Submit first (will run)
1358        let _ = svc
1359            .submit(
1360                "test",
1361                make_request("svc-thread-3", "svc-run-3a"),
1362                EnqueueOptions::default(),
1363            )
1364            .await
1365            .expect("submit 1");
1366
1367        // Submit second (will buffer)
1368        let (_, _, entry2) = svc
1369            .submit(
1370                "test",
1371                make_request("svc-thread-3", "svc-run-3b"),
1372                EnqueueOptions::default(),
1373            )
1374            .await
1375            .expect("submit 2");
1376
1377        // Interrupt
1378        let result = svc
1379            .control("svc-thread-3", ControlSignal::Interrupt)
1380            .await
1381            .expect("interrupt");
1382        assert!(result.generation.is_some());
1383
1384        // Give time for completion
1385        tokio::time::sleep(Duration::from_millis(300)).await;
1386
1387        // The second entry should be superseded (interrupt supersedes queued entries)
1388        let e2 = mailbox_store
1389            .load_mailbox_entry(&entry2)
1390            .await
1391            .unwrap()
1392            .expect("entry2");
1393        assert_eq!(e2.status, MailboxEntryStatus::Superseded);
1394    }
1395
1396    #[tokio::test]
1397    async fn submit_rejects_unknown_agent() {
1398        let store = Arc::new(MemoryStore::new());
1399        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1400        let os = make_os(store.clone());
1401
1402        let svc = Arc::new(MailboxService::new(
1403            os.clone(),
1404            mailbox_store.clone(),
1405            "test-svc",
1406        ));
1407
1408        let result = svc
1409            .submit(
1410                "nonexistent",
1411                make_request("svc-thread-4", "svc-run-4"),
1412                EnqueueOptions::default(),
1413            )
1414            .await;
1415        assert!(result.is_err());
1416    }
1417
1418    #[tokio::test]
1419    async fn recover_loads_queued_entries() {
1420        let store = Arc::new(MemoryStore::new());
1421        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1422        let os = make_os(store.clone());
1423
1424        // Pre-populate store with a queued entry
1425        let now = now_unix_millis();
1426        store
1427            .ensure_mailbox_state("svc-thread-5", now)
1428            .await
1429            .unwrap();
1430        let entry = mailbox_entry_from_request(
1431            &make_request("svc-thread-5", "svc-run-5"),
1432            0,
1433            &EnqueueOptions::default(),
1434            now,
1435        );
1436        store.enqueue_mailbox_entry(&entry).await.unwrap();
1437
1438        let svc = Arc::new(MailboxService::new(
1439            os.clone(),
1440            mailbox_store.clone(),
1441            "test-svc",
1442        ));
1443
1444        let count = svc.recover().await.expect("recover");
1445        assert_eq!(count, 1);
1446
1447        // Give time for dispatch
1448        tokio::time::sleep(Duration::from_millis(300)).await;
1449
1450        // Entry should be accepted
1451        let loaded = mailbox_store
1452            .load_mailbox_entry(&entry.entry_id)
1453            .await
1454            .unwrap()
1455            .expect("entry");
1456        assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1457    }
1458
1459    #[tokio::test]
1460    async fn submit_streaming_returns_stream() {
1461        let store = Arc::new(MemoryStore::new());
1462        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1463        let os = make_os(store.clone());
1464
1465        let svc = Arc::new(MailboxService::new(
1466            os.clone(),
1467            mailbox_store.clone(),
1468            "test-svc",
1469        ));
1470
1471        let mut run = svc
1472            .submit_streaming(
1473                "test",
1474                make_request("svc-thread-6", "svc-run-6"),
1475                EnqueueOptions::default(),
1476            )
1477            .await
1478            .expect("submit_streaming");
1479
1480        // Drain the stream
1481        while run.events.next().await.is_some() {}
1482
1483        // Give completion callback time
1484        tokio::time::sleep(Duration::from_millis(100)).await;
1485
1486        // Check mailbox state is idle after stream exhaustion
1487        let map = svc.mailboxes.read().await;
1488        if let Some(mb) = map.get("svc-thread-6") {
1489            let inner = mb.inner.lock().await;
1490            assert!(
1491                matches!(inner.status, ThreadStatus::Idle),
1492                "expected idle after stream exhaustion"
1493            );
1494        }
1495    }
1496
1497    #[tokio::test]
1498    async fn completed_background_run_notifies_parent_task_on_same_thread() {
1499        let store = Arc::new(MemoryStore::new());
1500        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1501        let os = make_os(store.clone());
1502        let svc = Arc::new(MailboxService::new(
1503            os.clone(),
1504            mailbox_store.clone(),
1505            "test-svc",
1506        ));
1507
1508        let parent_run_id = "svc-parent-run-1";
1509        seed_completed_run(
1510            store.as_ref(),
1511            parent_run_id,
1512            "svc-thread-parent-1",
1513            "test",
1514            RunOrigin::User,
1515            None,
1516        )
1517        .await;
1518        let mut request = make_request("svc-thread-parent-1", "svc-child-run-1");
1519        request.parent_run_id = Some(parent_run_id.to_string());
1520
1521        let (_thread_id, run_id, entry_id) = svc
1522            .submit("test", request, EnqueueOptions::default())
1523            .await
1524            .expect("submit");
1525
1526        tokio::time::sleep(Duration::from_millis(400)).await;
1527
1528        let thread = store
1529            .load_thread("svc-thread-parent-1")
1530            .await
1531            .unwrap()
1532            .expect("thread should exist");
1533        let notification = completion_message_for_parent(&thread.messages, parent_run_id)
1534            .expect("expected parent completion notification");
1535        assert_eq!(
1536            notification["child_task_id"].as_str(),
1537            Some(entry_id.as_str())
1538        );
1539        assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1540        assert_eq!(notification["status"].as_str(), Some("completed"));
1541
1542        let internal_runs = RunReader::list_runs(
1543            store.as_ref(),
1544            &RunQuery {
1545                thread_id: Some("svc-thread-parent-1".to_string()),
1546                origin: Some(RunOrigin::Internal),
1547                ..Default::default()
1548            },
1549        )
1550        .await
1551        .expect("list internal runs");
1552        assert!(
1553            !internal_runs.items.is_empty(),
1554            "expected completion notification to execute via internal run"
1555        );
1556    }
1557
1558    #[tokio::test]
1559    async fn completed_background_run_notifies_parent_task_on_parent_thread() {
1560        let store = Arc::new(MemoryStore::new());
1561        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1562        let os = make_os(store.clone());
1563        let svc = Arc::new(MailboxService::new(
1564            os.clone(),
1565            mailbox_store.clone(),
1566            "test-svc",
1567        ));
1568
1569        store
1570            .save(&tirea_agentos::contracts::thread::Thread::new(
1571                "svc-parent-thread-2",
1572            ))
1573            .await
1574            .expect("seed parent thread");
1575
1576        let parent_run_id = "svc-parent-run-2";
1577        seed_completed_run(
1578            store.as_ref(),
1579            parent_run_id,
1580            "svc-parent-thread-2",
1581            "test",
1582            RunOrigin::User,
1583            None,
1584        )
1585        .await;
1586        let mut request = make_request("svc-child-thread-2", "svc-child-run-2");
1587        request.parent_run_id = Some(parent_run_id.to_string());
1588        request.parent_thread_id = Some("svc-parent-thread-2".to_string());
1589
1590        let (_thread_id, run_id, _entry_id) = svc
1591            .submit("test", request, EnqueueOptions::default())
1592            .await
1593            .expect("submit");
1594
1595        tokio::time::sleep(Duration::from_millis(400)).await;
1596
1597        let parent_thread = store
1598            .load_thread("svc-parent-thread-2")
1599            .await
1600            .unwrap()
1601            .expect("parent thread should exist");
1602        let notification = completion_message_for_parent(&parent_thread.messages, parent_run_id)
1603            .expect("expected parent-thread completion notification");
1604        assert_eq!(
1605            notification["child_thread_id"].as_str(),
1606            Some("svc-child-thread-2")
1607        );
1608        assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1609
1610        let child_thread = store
1611            .load_thread("svc-child-thread-2")
1612            .await
1613            .unwrap()
1614            .expect("child thread should exist");
1615        assert!(
1616            completion_message_for_parent(&child_thread.messages, parent_run_id).is_none(),
1617            "notification should not be appended to child thread when parent_thread_id is set"
1618        );
1619
1620        let internal_runs = RunReader::list_runs(
1621            store.as_ref(),
1622            &RunQuery {
1623                thread_id: Some("svc-parent-thread-2".to_string()),
1624                origin: Some(RunOrigin::Internal),
1625                ..Default::default()
1626            },
1627        )
1628        .await
1629        .expect("list internal runs");
1630        assert!(
1631            !internal_runs.items.is_empty(),
1632            "expected completion notification to execute on the parent thread"
1633        );
1634    }
1635
1636    #[tokio::test]
1637    async fn completed_foreground_child_run_does_not_notify_parent_task() {
1638        let store = Arc::new(MemoryStore::new());
1639        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1640        let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1641        let svc = Arc::new(MailboxService::new(
1642            os.clone(),
1643            mailbox_store.clone(),
1644            "test-svc",
1645        ));
1646
1647        store
1648            .save(&tirea_agentos::contracts::thread::Thread::new(
1649                "svc-parent-thread-foreground",
1650            ))
1651            .await
1652            .expect("seed parent thread");
1653        seed_completed_run(
1654            store.as_ref(),
1655            "svc-parent-run-foreground",
1656            "svc-parent-thread-foreground",
1657            "parent-agent",
1658            RunOrigin::User,
1659            None,
1660        )
1661        .await;
1662        seed_child_terminal_run(
1663            store.as_ref(),
1664            "svc-child-run-foreground",
1665            "svc-child-thread-foreground",
1666            "worker-agent",
1667            "svc-parent-run-foreground",
1668            None,
1669        )
1670        .await;
1671
1672        svc.submit_parent_completion_notification("svc-child-run-foreground")
1673            .await
1674            .expect("foreground notification path should no-op");
1675        tokio::time::sleep(Duration::from_millis(250)).await;
1676
1677        let parent_thread = store
1678            .load_thread("svc-parent-thread-foreground")
1679            .await
1680            .unwrap()
1681            .expect("parent thread should exist");
1682        assert!(
1683            completion_message_for_parent(&parent_thread.messages, "svc-parent-run-foreground")
1684                .is_none(),
1685            "foreground child run should not enqueue parent completion notification"
1686        );
1687
1688        let internal_runs = RunReader::list_runs(
1689            store.as_ref(),
1690            &RunQuery {
1691                thread_id: Some("svc-parent-thread-foreground".to_string()),
1692                origin: Some(RunOrigin::Internal),
1693                ..Default::default()
1694            },
1695        )
1696        .await
1697        .expect("list internal runs");
1698        assert!(
1699            internal_runs.items.is_empty(),
1700            "foreground child run should not spawn internal notification runs"
1701        );
1702    }
1703
1704    #[tokio::test]
1705    async fn multi_agent_background_notification_runs_under_parent_agent() {
1706        let store = Arc::new(MemoryStore::new());
1707        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1708        let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1709        let svc = Arc::new(MailboxService::new(
1710            os.clone(),
1711            mailbox_store.clone(),
1712            "test-svc",
1713        ));
1714
1715        store
1716            .save(&tirea_agentos::contracts::thread::Thread::new(
1717                "svc-parent-thread-multi-agent",
1718            ))
1719            .await
1720            .expect("seed parent thread");
1721        seed_completed_run(
1722            store.as_ref(),
1723            "svc-parent-run-multi-agent",
1724            "svc-parent-thread-multi-agent",
1725            "parent-agent",
1726            RunOrigin::User,
1727            None,
1728        )
1729        .await;
1730
1731        let mut request = RunRequest {
1732            agent_id: "worker-agent".to_string(),
1733            ..make_request("svc-child-thread-multi-agent", "svc-child-run-multi-agent")
1734        };
1735        request.parent_run_id = Some("svc-parent-run-multi-agent".to_string());
1736        request.parent_thread_id = Some("svc-parent-thread-multi-agent".to_string());
1737
1738        let (_thread_id, run_id, entry_id) = svc
1739            .submit("worker-agent", request, EnqueueOptions::default())
1740            .await
1741            .expect("submit child background run");
1742
1743        tokio::time::sleep(Duration::from_millis(450)).await;
1744
1745        let parent_thread = store
1746            .load_thread("svc-parent-thread-multi-agent")
1747            .await
1748            .unwrap()
1749            .expect("parent thread should exist");
1750        let notification =
1751            completion_message_for_parent(&parent_thread.messages, "svc-parent-run-multi-agent")
1752                .expect("expected multi-agent completion notification");
1753        assert_eq!(
1754            notification["child_task_id"].as_str(),
1755            Some(entry_id.as_str())
1756        );
1757        assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1758        assert_eq!(notification["status"].as_str(), Some("completed"));
1759
1760        let internal_runs = RunReader::list_runs(
1761            store.as_ref(),
1762            &RunQuery {
1763                thread_id: Some("svc-parent-thread-multi-agent".to_string()),
1764                origin: Some(RunOrigin::Internal),
1765                ..Default::default()
1766            },
1767        )
1768        .await
1769        .expect("list internal runs");
1770        assert!(
1771            internal_runs
1772                .items
1773                .iter()
1774                .any(|record| record.agent_id == "parent-agent"),
1775            "expected notification run to execute under the parent agent"
1776        );
1777        assert!(
1778            internal_runs
1779                .items
1780                .iter()
1781                .all(|record| record.agent_id != "worker-agent"),
1782            "notification run should not execute under the child agent"
1783        );
1784    }
1785
1786    #[tokio::test]
1787    async fn parent_completion_notification_maps_terminal_statuses() {
1788        let store = Arc::new(MemoryStore::new());
1789        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1790        let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1791        let svc = Arc::new(MailboxService::new(
1792            os.clone(),
1793            mailbox_store.clone(),
1794            "test-svc",
1795        ));
1796
1797        let parent_run_id = "svc-parent-run-status-map";
1798        seed_completed_run(
1799            store.as_ref(),
1800            parent_run_id,
1801            "svc-parent-thread-status-map",
1802            "parent-agent",
1803            RunOrigin::User,
1804            None,
1805        )
1806        .await;
1807
1808        let cases = [
1809            (
1810                "svc-child-run-failed",
1811                Some("error"),
1812                Some("worker crashed"),
1813                "failed",
1814            ),
1815            (
1816                "svc-child-run-cancelled",
1817                Some("cancelled"),
1818                Some("user requested stop"),
1819                "cancelled",
1820            ),
1821            (
1822                "svc-child-run-stopped",
1823                Some("stopped:max_turns"),
1824                Some("max turns reached"),
1825                "stopped",
1826            ),
1827        ];
1828
1829        for (idx, (run_id, termination_code, termination_detail, _expected_status)) in
1830            cases.iter().enumerate()
1831        {
1832            seed_child_terminal_run_with_status(
1833                store.as_ref(),
1834                run_id,
1835                &format!("svc-child-thread-status-{idx}"),
1836                "worker-agent",
1837                parent_run_id,
1838                Some(&format!("svc-child-entry-status-{idx}")),
1839                *termination_code,
1840                *termination_detail,
1841            )
1842            .await;
1843
1844            svc.submit_parent_completion_notification(run_id)
1845                .await
1846                .expect("submit parent completion notification");
1847        }
1848
1849        let notifications = wait_for_completion_messages(
1850            store.as_ref(),
1851            "svc-parent-thread-status-map",
1852            parent_run_id,
1853            3,
1854        )
1855        .await;
1856        assert_eq!(
1857            notifications.len(),
1858            3,
1859            "expected three completion notifications"
1860        );
1861
1862        let by_run_id: std::collections::HashMap<_, _> = notifications
1863            .into_iter()
1864            .map(|notification| {
1865                (
1866                    notification["child_run_id"]
1867                        .as_str()
1868                        .expect("child_run_id should be present")
1869                        .to_string(),
1870                    notification,
1871                )
1872            })
1873            .collect();
1874
1875        for (run_id, termination_code, termination_detail, expected_status) in cases {
1876            let notification = by_run_id
1877                .get(run_id)
1878                .unwrap_or_else(|| panic!("missing notification for {run_id}"));
1879            assert_eq!(notification["status"].as_str(), Some(expected_status));
1880            assert_eq!(notification["termination_code"].as_str(), termination_code,);
1881            assert_eq!(
1882                notification["termination_detail"].as_str(),
1883                termination_detail,
1884            );
1885        }
1886    }
1887
1888    #[tokio::test]
1889    async fn parent_completion_notification_is_idempotent_for_repeated_completion_callbacks() {
1890        let store = Arc::new(MemoryStore::new());
1891        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1892        let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1893        let svc = Arc::new(MailboxService::new(
1894            os.clone(),
1895            mailbox_store.clone(),
1896            "test-svc",
1897        ));
1898
1899        let parent_run_id = "svc-parent-run-idempotent";
1900        let child_run_id = "svc-child-run-idempotent";
1901        seed_completed_run(
1902            store.as_ref(),
1903            parent_run_id,
1904            "svc-parent-thread-idempotent",
1905            "parent-agent",
1906            RunOrigin::User,
1907            None,
1908        )
1909        .await;
1910        seed_child_terminal_run(
1911            store.as_ref(),
1912            child_run_id,
1913            "svc-child-thread-idempotent",
1914            "worker-agent",
1915            parent_run_id,
1916            Some("svc-child-entry-idempotent"),
1917        )
1918        .await;
1919
1920        svc.submit_parent_completion_notification(child_run_id)
1921            .await
1922            .expect("first completion callback");
1923        svc.submit_parent_completion_notification(child_run_id)
1924            .await
1925            .expect("duplicate completion callback should no-op");
1926        svc.on_run_complete("svc-child-thread-idempotent", child_run_id)
1927            .await;
1928
1929        let notifications = wait_for_completion_messages(
1930            store.as_ref(),
1931            "svc-parent-thread-idempotent",
1932            parent_run_id,
1933            1,
1934        )
1935        .await;
1936        assert_eq!(
1937            notifications.len(),
1938            1,
1939            "duplicate completion callbacks must not duplicate notifications"
1940        );
1941
1942        let internal_runs = RunReader::list_runs(
1943            store.as_ref(),
1944            &RunQuery {
1945                thread_id: Some("svc-parent-thread-idempotent".to_string()),
1946                origin: Some(RunOrigin::Internal),
1947                ..Default::default()
1948            },
1949        )
1950        .await
1951        .expect("list internal runs");
1952        assert_eq!(
1953            internal_runs.items.len(),
1954            1,
1955            "idempotent delivery should create exactly one internal notification run"
1956        );
1957    }
1958
1959    #[tokio::test]
1960    async fn parent_busy_thread_buffers_multiple_child_completion_notifications() {
1961        let store = Arc::new(MemoryStore::new());
1962        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1963        let os = Arc::new(
1964            AgentOsBuilder::new()
1965                .with_registered_behavior("svc_terminate", Arc::new(TerminatePlugin))
1966                .with_registered_behavior(
1967                    "svc_parent_busy",
1968                    Arc::new(DelayedTerminatePlugin {
1969                        id: "svc_parent_busy",
1970                        delay_ms: 350,
1971                    }),
1972                )
1973                .with_registered_behavior(
1974                    "svc_worker_slow",
1975                    Arc::new(DelayedTerminatePlugin {
1976                        id: "svc_worker_slow",
1977                        delay_ms: 150,
1978                    }),
1979                )
1980                .with_agent_state_store(store.clone())
1981                .with_agent_spec(AgentDefinitionSpec::local_with_id(
1982                    "parent-agent",
1983                    AgentDefinition {
1984                        id: "parent-agent".to_string(),
1985                        behavior_ids: vec!["svc_parent_busy".to_string()],
1986                        ..Default::default()
1987                    },
1988                ))
1989                .with_agent_spec(AgentDefinitionSpec::local_with_id(
1990                    "worker-fast",
1991                    AgentDefinition {
1992                        id: "worker-fast".to_string(),
1993                        behavior_ids: vec!["svc_terminate".to_string()],
1994                        ..Default::default()
1995                    },
1996                ))
1997                .with_agent_spec(AgentDefinitionSpec::local_with_id(
1998                    "worker-slow",
1999                    AgentDefinition {
2000                        id: "worker-slow".to_string(),
2001                        behavior_ids: vec!["svc_worker_slow".to_string()],
2002                        ..Default::default()
2003                    },
2004                ))
2005                .build()
2006                .expect("build AgentOs"),
2007        );
2008        let svc = Arc::new(MailboxService::new(
2009            os.clone(),
2010            mailbox_store.clone(),
2011            "test-svc",
2012        ));
2013
2014        let parent_thread_id = "svc-parent-thread-busy";
2015        let parent_run_id = "svc-parent-run-busy";
2016        let parent_request = RunRequest {
2017            agent_id: "parent-agent".to_string(),
2018            ..make_request(parent_thread_id, parent_run_id)
2019        };
2020        svc.submit("parent-agent", parent_request, EnqueueOptions::default())
2021            .await
2022            .expect("submit parent background run");
2023        let _parent_run = wait_for_run_record(store.as_ref(), parent_run_id).await;
2024
2025        let mut slow_request = RunRequest {
2026            agent_id: "worker-slow".to_string(),
2027            ..make_request("svc-child-thread-busy-slow", "svc-child-run-busy-slow")
2028        };
2029        slow_request.parent_run_id = Some(parent_run_id.to_string());
2030        slow_request.parent_thread_id = Some(parent_thread_id.to_string());
2031        svc.submit("worker-slow", slow_request, EnqueueOptions::default())
2032            .await
2033            .expect("submit slow child");
2034
2035        let mut fast_request = RunRequest {
2036            agent_id: "worker-fast".to_string(),
2037            ..make_request("svc-child-thread-busy-fast", "svc-child-run-busy-fast")
2038        };
2039        fast_request.parent_run_id = Some(parent_run_id.to_string());
2040        fast_request.parent_thread_id = Some(parent_thread_id.to_string());
2041        svc.submit("worker-fast", fast_request, EnqueueOptions::default())
2042            .await
2043            .expect("submit fast child");
2044
2045        tokio::time::sleep(Duration::from_millis(225)).await;
2046        let pending_while_busy = store
2047            .load_thread(parent_thread_id)
2048            .await
2049            .expect("load parent thread should succeed")
2050            .map(|thread| completion_messages_for_parent(&thread.messages, parent_run_id))
2051            .unwrap_or_default();
2052        assert!(
2053            pending_while_busy.is_empty(),
2054            "busy parent thread should not receive completion messages before its active run finishes"
2055        );
2056
2057        let notifications =
2058            wait_for_completion_messages(store.as_ref(), parent_thread_id, parent_run_id, 2).await;
2059        assert_eq!(
2060            notifications.len(),
2061            2,
2062            "both child completions should be delivered after the parent thread becomes idle"
2063        );
2064        let child_run_ids: Vec<_> = notifications
2065            .iter()
2066            .map(|notification| {
2067                notification["child_run_id"]
2068                    .as_str()
2069                    .expect("child_run_id should be present")
2070                    .to_string()
2071            })
2072            .collect();
2073        assert_eq!(
2074            child_run_ids,
2075            vec![
2076                "svc-child-run-busy-fast".to_string(),
2077                "svc-child-run-busy-slow".to_string(),
2078            ],
2079            "completion notifications should follow child completion order while buffering on the parent thread"
2080        );
2081    }
2082
2083    #[tokio::test]
2084    async fn grandchild_completion_notifies_immediate_parent_not_root_parent() {
2085        let store = Arc::new(MemoryStore::new());
2086        let mailbox_store: Arc<dyn MailboxStore> = store.clone();
2087        let os = make_os_with_agents(
2088            store.clone(),
2089            &["root-agent", "child-agent", "worker-agent"],
2090        );
2091        let svc = Arc::new(MailboxService::new(
2092            os.clone(),
2093            mailbox_store.clone(),
2094            "test-svc",
2095        ));
2096
2097        seed_completed_run(
2098            store.as_ref(),
2099            "svc-root-run-nested",
2100            "svc-root-thread-nested",
2101            "root-agent",
2102            RunOrigin::User,
2103            None,
2104        )
2105        .await;
2106        seed_child_terminal_run(
2107            store.as_ref(),
2108            "svc-child-run-nested",
2109            "svc-child-thread-nested",
2110            "child-agent",
2111            "svc-root-run-nested",
2112            Some("svc-child-entry-nested"),
2113        )
2114        .await;
2115        seed_child_terminal_run(
2116            store.as_ref(),
2117            "svc-grandchild-run-nested",
2118            "svc-grandchild-thread-nested",
2119            "worker-agent",
2120            "svc-child-run-nested",
2121            Some("svc-grandchild-entry-nested"),
2122        )
2123        .await;
2124
2125        svc.submit_parent_completion_notification("svc-grandchild-run-nested")
2126            .await
2127            .expect("submit grandchild completion notification");
2128
2129        let child_notifications = wait_for_completion_messages(
2130            store.as_ref(),
2131            "svc-child-thread-nested",
2132            "svc-child-run-nested",
2133            1,
2134        )
2135        .await;
2136        assert_eq!(child_notifications.len(), 1);
2137        assert_eq!(
2138            child_notifications[0]["child_run_id"].as_str(),
2139            Some("svc-grandchild-run-nested")
2140        );
2141
2142        let root_notifications = wait_for_completion_messages(
2143            store.as_ref(),
2144            "svc-root-thread-nested",
2145            "svc-root-run-nested",
2146            1,
2147        )
2148        .await;
2149        assert!(
2150            root_notifications.is_empty(),
2151            "grandchild completion should notify the immediate parent task, not the root parent"
2152        );
2153
2154        let child_internal_runs = RunReader::list_runs(
2155            store.as_ref(),
2156            &RunQuery {
2157                thread_id: Some("svc-child-thread-nested".to_string()),
2158                origin: Some(RunOrigin::Internal),
2159                ..Default::default()
2160            },
2161        )
2162        .await
2163        .expect("list child internal runs");
2164        assert!(
2165            child_internal_runs
2166                .items
2167                .iter()
2168                .any(|record| record.agent_id == "child-agent"),
2169            "nested completion notification should execute under the immediate parent agent"
2170        );
2171
2172        let root_internal_runs = RunReader::list_runs(
2173            store.as_ref(),
2174            &RunQuery {
2175                thread_id: Some("svc-root-thread-nested".to_string()),
2176                origin: Some(RunOrigin::Internal),
2177                ..Default::default()
2178            },
2179        )
2180        .await
2181        .expect("list root internal runs");
2182        assert!(
2183            root_internal_runs.items.is_empty(),
2184            "nested completion should not spawn a root-level notification run"
2185        );
2186    }
2187}