1use std::collections::{HashMap, VecDeque};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures::StreamExt;
8use tirea_agentos::contracts::storage::{
9 MailboxEntry, MailboxEntryStatus, MailboxQuery, MailboxStore,
10};
11use tirea_agentos::contracts::RunRequest;
12use tirea_agentos::runtime::{AgentOs, RunStream};
13use tirea_contract::storage::RunOrigin;
14
15use super::mailbox::{
16 ack_claimed_entry, build_parent_completion_notification_message, dead_letter_claimed_entry,
17 drain_background_run, is_generation_mismatch, mailbox_entry_from_request, mailbox_error,
18 normalize_background_run_request, now_unix_millis, parent_completion_notification_dedupe_key,
19 start_agent_run_for_entry, DEFAULT_MAILBOX_LEASE_MS, INLINE_MAILBOX_AVAILABLE_AT,
20};
21use super::mailbox::{EnqueueOptions, MailboxRunStartError};
22use super::ApiError;
23
24const LEASE_RENEWAL_INTERVAL: Duration = Duration::from_secs(10);
26
27fn spawn_lease_renewal(
32 store: Arc<dyn MailboxStore>,
33 entry_id: String,
34 claim_token: String,
35 lease_ms: u64,
36) -> tokio::task::JoinHandle<()> {
37 tokio::spawn(async move {
38 let mut interval = tokio::time::interval(LEASE_RENEWAL_INTERVAL);
39 interval.tick().await;
41 loop {
42 interval.tick().await;
43 let now = now_unix_millis();
44 match store
45 .extend_lease(&entry_id, &claim_token, lease_ms, now)
46 .await
47 {
48 Ok(true) => {}
49 Ok(false) => {
50 tracing::debug!(entry_id, "lease renewal: entry no longer claimed, stopping");
51 break;
52 }
53 Err(err) => {
54 tracing::warn!(entry_id, %err, "lease renewal failed");
55 break;
56 }
57 }
58 }
59 })
60}
61
62#[derive(Debug, Clone)]
68pub enum ControlSignal {
69 Cancel,
71 Interrupt,
73}
74
75#[derive(Debug, Clone)]
80struct BufferedEntry {
81 entry: MailboxEntry,
82}
83
84enum ThreadStatus {
85 Idle,
86 Running {
87 entry_id: String,
88 claim_token: String,
89 },
90}
91
92struct MailboxInner {
93 status: ThreadStatus,
94 generation: u64,
95 pending: VecDeque<BufferedEntry>,
96 lease_renewal: Option<tokio::task::JoinHandle<()>>,
98}
99
100struct ThreadMailbox {
101 thread_id: String,
102 inner: tokio::sync::Mutex<MailboxInner>,
103}
104
105impl ThreadMailbox {
106 fn new(thread_id: String, generation: u64) -> Self {
107 Self {
108 thread_id,
109 inner: tokio::sync::Mutex::new(MailboxInner {
110 status: ThreadStatus::Idle,
111 generation,
112 pending: VecDeque::new(),
113 lease_renewal: None,
114 }),
115 }
116 }
117}
118
119pub struct MailboxService {
130 os: Arc<AgentOs>,
131 mailbox_store: Arc<dyn MailboxStore>,
132 consumer_id: String,
133 mailboxes: tokio::sync::RwLock<HashMap<String, Arc<ThreadMailbox>>>,
134}
135
136impl MailboxService {
137 pub fn new(
138 os: Arc<AgentOs>,
139 mailbox_store: Arc<dyn MailboxStore>,
140 consumer_id: impl Into<String>,
141 ) -> Self {
142 Self {
143 os,
144 mailbox_store,
145 consumer_id: consumer_id.into(),
146 mailboxes: tokio::sync::RwLock::new(HashMap::new()),
147 }
148 }
149
150 pub fn mailbox_store(&self) -> &Arc<dyn MailboxStore> {
152 &self.mailbox_store
153 }
154
155 async fn get_or_create_mailbox(
157 self: &Arc<Self>,
158 thread_id: &str,
159 generation: u64,
160 ) -> Arc<ThreadMailbox> {
161 {
163 let map = self.mailboxes.read().await;
164 if let Some(mb) = map.get(thread_id) {
165 return mb.clone();
166 }
167 }
168 let mut map = self.mailboxes.write().await;
170 map.entry(thread_id.to_string())
171 .or_insert_with(|| Arc::new(ThreadMailbox::new(thread_id.to_string(), generation)))
172 .clone()
173 }
174
175 async fn enqueue_to_store(
177 &self,
178 request: &RunRequest,
179 options: &EnqueueOptions,
180 available_at: u64,
181 ) -> Result<MailboxEntry, ApiError> {
182 let mailbox_id = request
183 .thread_id
184 .as_ref()
185 .expect("normalized request should have thread_id");
186
187 for _ in 0..2 {
188 let now = now_unix_millis();
189 let state = self
190 .mailbox_store
191 .ensure_mailbox_state(mailbox_id, now)
192 .await
193 .map_err(mailbox_error)?;
194 let entry = mailbox_entry_from_request(
195 request,
196 state.current_generation,
197 options,
198 available_at,
199 );
200 match self.mailbox_store.enqueue_mailbox_entry(&entry).await {
201 Ok(()) => return Ok(entry),
202 Err(tirea_agentos::contracts::storage::MailboxStoreError::AlreadyExists(_))
203 if options.dedupe_key.is_some() =>
204 {
205 let dedupe_key = options
206 .dedupe_key
207 .as_deref()
208 .expect("guarded by options.dedupe_key.is_some()");
209 if let Some(existing) = self
210 .find_mailbox_entry_by_dedupe_key(mailbox_id, dedupe_key)
211 .await?
212 {
213 return Ok(existing);
214 }
215 return Err(ApiError::Internal(format!(
216 "mailbox dedupe collision for '{mailbox_id}' and key '{dedupe_key}' but existing entry was not found"
217 )));
218 }
219 Err(err) if is_generation_mismatch(&err) => continue,
220 Err(err) => return Err(mailbox_error(err)),
221 }
222 }
223
224 Err(ApiError::Internal(format!(
225 "mailbox enqueue raced with interrupt for mailbox '{mailbox_id}'"
226 )))
227 }
228
229 async fn find_mailbox_entry_by_dedupe_key(
230 &self,
231 mailbox_id: &str,
232 dedupe_key: &str,
233 ) -> Result<Option<MailboxEntry>, ApiError> {
234 let mut offset = 0;
235 loop {
236 let page = self
237 .mailbox_store
238 .list_mailbox_entries(&MailboxQuery {
239 mailbox_id: Some(mailbox_id.to_string()),
240 offset,
241 limit: 200,
242 ..Default::default()
243 })
244 .await
245 .map_err(mailbox_error)?;
246 if let Some(entry) = page
247 .items
248 .into_iter()
249 .find(|entry| entry.dedupe_key.as_deref() == Some(dedupe_key))
250 {
251 return Ok(Some(entry));
252 }
253 if !page.has_more {
254 return Ok(None);
255 }
256 offset += 200;
257 }
258 }
259
260 pub async fn submit(
265 self: &Arc<Self>,
266 agent_id: &str,
267 request: RunRequest,
268 options: EnqueueOptions,
269 ) -> Result<(String, String, String), ApiError> {
270 self.os
271 .resolve(agent_id)
272 .map_err(|e| ApiError::from(tirea_agentos::runtime::AgentOsRunError::from(e)))?;
273
274 let request = normalize_background_run_request(agent_id, request);
275 let thread_id = request
276 .thread_id
277 .clone()
278 .expect("normalized request should have thread_id");
279 let run_id = request
280 .run_id
281 .clone()
282 .expect("normalized request should have run_id");
283
284 let entry = self
286 .enqueue_to_store(&request, &options, now_unix_millis())
287 .await?;
288 let entry_id = entry.entry_id.clone();
289 let generation = entry.generation;
290
291 let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
292 let mut inner = mailbox.inner.lock().await;
293
294 if generation > inner.generation {
296 inner.generation = generation;
297 }
298
299 match &inner.status {
300 ThreadStatus::Idle => {
301 drop(inner);
302 self.dispatch_entry(mailbox, entry).await;
303 }
304 ThreadStatus::Running { .. } => {
305 inner.pending.push_back(BufferedEntry { entry });
306 }
307 }
308
309 Ok((thread_id, run_id, entry_id))
310 }
311
312 pub async fn submit_streaming(
318 self: &Arc<Self>,
319 agent_id: &str,
320 request: RunRequest,
321 options: EnqueueOptions,
322 ) -> Result<RunStream, ApiError> {
323 self.os
324 .resolve(agent_id)
325 .map_err(|e| ApiError::from(tirea_agentos::runtime::AgentOsRunError::from(e)))?;
326
327 let request = normalize_background_run_request(agent_id, request);
328 let thread_id = request
329 .thread_id
330 .clone()
331 .expect("normalized request should have thread_id");
332
333 let entry = self
335 .enqueue_to_store(&request, &options, INLINE_MAILBOX_AVAILABLE_AT)
336 .await?;
337 let entry_id = entry.entry_id.clone();
338 let generation = entry.generation;
339
340 let Some(claimed) = self
342 .mailbox_store
343 .claim_mailbox_entry(
344 &entry_id,
345 &self.consumer_id,
346 now_unix_millis(),
347 DEFAULT_MAILBOX_LEASE_MS,
348 )
349 .await
350 .map_err(mailbox_error)?
351 else {
352 return Err(ApiError::Internal(format!(
353 "mailbox entry '{entry_id}' could not be claimed for streaming"
354 )));
355 };
356
357 let claim_token = claimed.claim_token.clone().ok_or_else(|| {
358 ApiError::Internal(format!(
359 "mailbox entry '{entry_id}' was claimed without claim_token"
360 ))
361 })?;
362
363 let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
365 let renewal = spawn_lease_renewal(
366 self.mailbox_store.clone(),
367 entry_id.clone(),
368 claim_token.clone(),
369 DEFAULT_MAILBOX_LEASE_MS,
370 );
371 {
372 let mut inner = mailbox.inner.lock().await;
373 if generation > inner.generation {
374 inner.generation = generation;
375 }
376 inner.status = ThreadStatus::Running {
377 entry_id: entry_id.clone(),
378 claim_token: claim_token.clone(),
379 };
380 inner.lease_renewal = Some(renewal);
381 }
382
383 match start_agent_run_for_entry(&self.os, &self.mailbox_store, &claimed, false).await {
384 Ok(run) => {
385 Ok(self.wrap_with_completion(thread_id, run))
387 }
388 Err(MailboxRunStartError::Superseded(error)) => {
389 let _ = self
390 .mailbox_store
391 .supersede_mailbox_entry(&entry_id, now_unix_millis(), &error)
392 .await;
393 let mut inner = mailbox.inner.lock().await;
395 inner.status = ThreadStatus::Idle;
396 Err(ApiError::BadRequest(error))
397 }
398 Err(MailboxRunStartError::Busy(error)) => {
399 self.mailbox_store
400 .cancel_mailbox_entry(&entry_id, now_unix_millis())
401 .await
402 .map_err(mailbox_error)?;
403 let mut inner = mailbox.inner.lock().await;
404 inner.status = ThreadStatus::Idle;
405 Err(ApiError::BadRequest(error))
406 }
407 Err(MailboxRunStartError::Permanent(error))
408 | Err(MailboxRunStartError::Retryable(error)) => {
409 dead_letter_claimed_entry(&self.mailbox_store, &entry_id, &claim_token, &error)
410 .await?;
411 let mut inner = mailbox.inner.lock().await;
412 inner.status = ThreadStatus::Idle;
413 Err(ApiError::Internal(error))
414 }
415 Err(MailboxRunStartError::Internal(error)) => {
416 dead_letter_claimed_entry(
417 &self.mailbox_store,
418 &entry_id,
419 &claim_token,
420 &error.to_string(),
421 )
422 .await?;
423 let mut inner = mailbox.inner.lock().await;
424 inner.status = ThreadStatus::Idle;
425 Err(error)
426 }
427 }
428 }
429
430 fn dispatch_entry(
436 self: &Arc<Self>,
437 mailbox: Arc<ThreadMailbox>,
438 entry: MailboxEntry,
439 ) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
440 Box::pin(async move {
441 let entry_id = entry.entry_id.clone();
442
443 let claimed = match self
444 .mailbox_store
445 .claim_mailbox_entry(
446 &entry_id,
447 &self.consumer_id,
448 now_unix_millis(),
449 DEFAULT_MAILBOX_LEASE_MS,
450 )
451 .await
452 {
453 Ok(Some(c)) => c,
454 Ok(None) => {
455 tracing::debug!(entry_id, "entry already claimed/consumed, skipping");
456 return;
457 }
458 Err(err) => {
459 tracing::error!(entry_id, %err, "failed to claim mailbox entry");
460 return;
461 }
462 };
463
464 let claim_token = match claimed.claim_token.as_deref() {
465 Some(t) => t.to_string(),
466 None => {
467 tracing::error!(entry_id, "claimed entry missing claim_token");
468 return;
469 }
470 };
471
472 if self
474 .mailbox_store
475 .load_mailbox_state(&entry.mailbox_id)
476 .await
477 .ok()
478 .flatten()
479 .is_some_and(|state| state.current_generation != entry.generation)
480 {
481 let _ = self
482 .mailbox_store
483 .supersede_mailbox_entry(
484 &entry_id,
485 now_unix_millis(),
486 "superseded by interrupt",
487 )
488 .await;
489 tracing::debug!(entry_id, "entry superseded by generation mismatch");
490 return;
491 }
492
493 match start_agent_run_for_entry(&self.os, &self.mailbox_store, &claimed, true).await {
494 Ok(run) => {
495 let renewal = spawn_lease_renewal(
499 self.mailbox_store.clone(),
500 entry_id.clone(),
501 claim_token.clone(),
502 DEFAULT_MAILBOX_LEASE_MS,
503 );
504
505 let run_id = run.run_id.clone();
506 let completed_run_id = run_id.clone();
507 let thread_id = mailbox.thread_id.clone();
508
509 {
510 let mut inner = mailbox.inner.lock().await;
511 inner.status = ThreadStatus::Running {
512 entry_id: entry_id.clone(),
513 claim_token: claim_token.clone(),
514 };
515 inner.lease_renewal = Some(renewal);
516 }
517
518 let svc = self.clone();
520 tokio::spawn(async move {
521 drain_background_run(run).await;
522 svc.on_run_complete(&thread_id, &completed_run_id).await;
523 });
524 }
525 Err(MailboxRunStartError::Busy(reason)) => {
526 let _ = super::mailbox::nack_claimed_entry(
528 &self.mailbox_store,
529 &entry_id,
530 &claim_token,
531 250,
532 &reason,
533 )
534 .await;
535 let mut inner = mailbox.inner.lock().await;
536 inner.pending.push_front(BufferedEntry { entry: claimed });
537 tracing::debug!(entry_id, %reason, "dispatch busy, buffered entry");
538 }
539 Err(MailboxRunStartError::Superseded(error)) => {
540 let _ = self
541 .mailbox_store
542 .supersede_mailbox_entry(&entry_id, now_unix_millis(), &error)
543 .await;
544 tracing::debug!(entry_id, %error, "entry superseded during dispatch");
545 let svc = self.clone();
547 let tid = mailbox.thread_id.clone();
548 tokio::spawn(async move {
549 svc.try_dispatch_next(&tid).await;
550 });
551 }
552 Err(MailboxRunStartError::Permanent(error)) => {
553 let _ = dead_letter_claimed_entry(
554 &self.mailbox_store,
555 &entry_id,
556 &claim_token,
557 &error,
558 )
559 .await;
560 tracing::warn!(entry_id, %error, "permanent dispatch error");
561 let svc = self.clone();
562 let tid = mailbox.thread_id.clone();
563 tokio::spawn(async move {
564 svc.try_dispatch_next(&tid).await;
565 });
566 }
567 Err(MailboxRunStartError::Retryable(error)) => {
568 let _ = super::mailbox::nack_claimed_entry(
569 &self.mailbox_store,
570 &entry_id,
571 &claim_token,
572 250,
573 &error,
574 )
575 .await;
576 tracing::warn!(entry_id, %error, "retryable dispatch error");
577 let mut inner = mailbox.inner.lock().await;
579 inner.pending.push_front(BufferedEntry { entry: claimed });
580 }
581 Err(MailboxRunStartError::Internal(error)) => {
582 let _ = dead_letter_claimed_entry(
583 &self.mailbox_store,
584 &entry_id,
585 &claim_token,
586 &error.to_string(),
587 )
588 .await;
589 tracing::error!(entry_id, %error, "internal dispatch error");
590 let svc = self.clone();
591 let tid = mailbox.thread_id.clone();
592 tokio::spawn(async move {
593 svc.try_dispatch_next(&tid).await;
594 });
595 }
596 }
597 })
598 }
599
600 fn wrap_with_completion(self: &Arc<Self>, thread_id: String, run: RunStream) -> RunStream {
602 let svc = self.clone();
603 let tid = thread_id;
604 let completed_run_id = run.run_id.clone();
605 let wrapped = futures::stream::unfold(
606 (run.events, Some(svc), Some(tid), Some(completed_run_id)),
607 |(mut events, svc, tid, run_id)| async move {
608 match events.next().await {
609 Some(event) => Some((event, (events, svc, tid, run_id))),
610 None => {
611 if let (Some(svc), Some(tid), Some(run_id)) = (svc, tid, run_id) {
613 svc.on_run_complete(&tid, &run_id).await;
614 }
615 None
616 }
617 }
618 },
619 );
620 RunStream {
621 thread_id: run.thread_id,
622 run_id: run.run_id,
623 decision_tx: run.decision_tx,
624 events: Box::pin(wrapped),
625 }
626 }
627
628 async fn on_run_complete(self: &Arc<Self>, thread_id: &str, run_id: &str) {
632 if let Err(err) = self.submit_parent_completion_notification(run_id).await {
633 tracing::warn!(thread_id, run_id, %err, "failed to submit parent completion notification");
634 }
635
636 let mailbox = {
637 let map = self.mailboxes.read().await;
638 match map.get(thread_id) {
639 Some(mb) => mb.clone(),
640 None => return,
641 }
642 };
643
644 let next = {
645 let mut inner = mailbox.inner.lock().await;
646
647 if let Some(handle) = inner.lease_renewal.take() {
649 handle.abort();
650 }
651 if let ThreadStatus::Running {
652 entry_id,
653 claim_token,
654 ..
655 } = &inner.status
656 {
657 if let Err(err) =
658 ack_claimed_entry(&self.mailbox_store, entry_id, claim_token).await
659 {
660 tracing::error!(entry_id, %err, "failed to ack entry on run completion");
661 }
662 }
663
664 inner.status = ThreadStatus::Idle;
665 inner.pending.pop_front()
666 };
667
668 if let Some(buffered) = next {
669 let svc = self.clone();
670 tokio::spawn(async move {
671 svc.dispatch_entry(mailbox, buffered.entry).await;
672 });
673 }
674 }
675
676 async fn submit_parent_completion_notification(
677 self: &Arc<Self>,
678 completed_run_id: &str,
679 ) -> Result<(), ApiError> {
680 let Some(store) = self.os.agent_state_store().cloned() else {
681 return Ok(());
682 };
683 let Some(child_record) = store
684 .load_run(completed_run_id)
685 .await
686 .map_err(|err| ApiError::Internal(err.to_string()))?
687 else {
688 return Ok(());
689 };
690 if child_record.source_mailbox_entry_id.is_none() {
691 return Ok(());
692 }
693 let Some(parent_run_id) = child_record.parent_run_id.clone() else {
694 return Ok(());
695 };
696 let Some(message) = build_parent_completion_notification_message(&child_record) else {
697 return Ok(());
698 };
699 let Some(parent_record) = store
700 .load_run(&parent_run_id)
701 .await
702 .map_err(|err| ApiError::Internal(err.to_string()))?
703 else {
704 return Err(ApiError::Internal(format!(
705 "parent run '{parent_run_id}' not found for completed child run '{completed_run_id}'"
706 )));
707 };
708 let agent_id = parent_record.agent_id.trim();
709 if agent_id.is_empty() {
710 return Err(ApiError::Internal(format!(
711 "parent run '{parent_run_id}' is missing agent_id"
712 )));
713 }
714
715 let request = RunRequest {
716 agent_id: agent_id.to_string(),
717 thread_id: Some(parent_record.thread_id.clone()),
718 run_id: None,
719 parent_run_id: None,
720 parent_thread_id: parent_record.parent_thread_id.clone(),
721 resource_id: None,
722 origin: RunOrigin::Internal,
723 state: None,
724 messages: vec![message],
725 initial_decisions: Vec::new(),
726 source_mailbox_entry_id: None,
727 };
728 let options = EnqueueOptions {
729 sender_id: Some(format!("run:{completed_run_id}")),
730 priority: 0,
731 dedupe_key: Some(parent_completion_notification_dedupe_key(
732 &parent_run_id,
733 completed_run_id,
734 )),
735 };
736 self.submit(agent_id, request, options).await?;
737 Ok(())
738 }
739
740 async fn try_dispatch_next(self: &Arc<Self>, thread_id: &str) {
742 let mailbox = {
743 let map = self.mailboxes.read().await;
744 match map.get(thread_id) {
745 Some(mb) => mb.clone(),
746 None => return,
747 }
748 };
749
750 let next = {
751 let mut inner = mailbox.inner.lock().await;
752 if !matches!(inner.status, ThreadStatus::Idle) {
753 return;
754 }
755 inner.pending.pop_front()
756 };
757
758 if let Some(buffered) = next {
759 let svc = self.clone();
760 tokio::spawn(async move {
761 svc.dispatch_entry(mailbox, buffered.entry).await;
762 });
763 }
764 }
765
766 pub async fn control(
768 self: &Arc<Self>,
769 thread_id: &str,
770 signal: ControlSignal,
771 ) -> Result<ControlResult, ApiError> {
772 match signal {
773 ControlSignal::Cancel => {
774 let cancelled_run_id = self.os.cancel_active_run_by_thread(thread_id).await;
775 Ok(ControlResult {
776 cancelled_run_id,
777 generation: None,
778 superseded_entries: vec![],
779 })
780 }
781 ControlSignal::Interrupt => {
782 let interrupted = self
784 .mailbox_store
785 .interrupt_mailbox(thread_id, now_unix_millis())
786 .await
787 .map_err(mailbox_error)?;
788
789 let cancelled_run_id = self.os.cancel_active_run_by_thread(thread_id).await;
791
792 let new_generation = interrupted.mailbox_state.current_generation;
793
794 let mailbox = {
796 let map = self.mailboxes.read().await;
797 map.get(thread_id).cloned()
798 };
799 if let Some(mb) = mailbox {
800 let mut inner = mb.inner.lock().await;
801 inner.pending.clear();
802 inner.generation = new_generation;
803 if cancelled_run_id.is_none() {
805 if let Some(handle) = inner.lease_renewal.take() {
806 handle.abort();
807 }
808 inner.status = ThreadStatus::Idle;
809 }
810 }
811
812 Ok(ControlResult {
813 cancelled_run_id,
814 generation: Some(new_generation),
815 superseded_entries: interrupted.superseded_entries,
816 })
817 }
818 }
819 }
820
821 pub async fn recover(self: &Arc<Self>) -> Result<usize, ApiError> {
825 let page = self
826 .mailbox_store
827 .list_mailbox_entries(&MailboxQuery {
828 status: Some(MailboxEntryStatus::Queued),
829 limit: 10_000,
830 ..Default::default()
831 })
832 .await
833 .map_err(mailbox_error)?;
834
835 let mut recovered = 0;
836 for entry in page.items {
837 let thread_id = entry.mailbox_id.clone();
838 let generation = entry.generation;
839
840 if entry.available_at == INLINE_MAILBOX_AVAILABLE_AT {
842 continue;
843 }
844
845 let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
846 let mut inner = mailbox.inner.lock().await;
847
848 if generation > inner.generation {
849 inner.generation = generation;
850 }
851
852 match &inner.status {
853 ThreadStatus::Idle if inner.pending.is_empty() => {
854 drop(inner);
856 self.dispatch_entry(mailbox, entry).await;
857 }
858 _ => {
859 inner.pending.push_back(BufferedEntry { entry });
860 }
861 }
862 recovered += 1;
863 }
864
865 if recovered > 0 {
866 tracing::info!(recovered, "recovered mailbox entries from store");
867 }
868 Ok(recovered)
869 }
870
871 pub async fn run_sweep_forever(self: Arc<Self>) {
876 let sweep_interval = Duration::from_secs(30);
877 let gc_interval = Duration::from_secs(60);
878 let gc_ttl_ms: u64 = 24 * 60 * 60 * 1000;
879 let mut last_gc = std::time::Instant::now();
880
881 loop {
882 tokio::time::sleep(sweep_interval).await;
883
884 match self
886 .mailbox_store
887 .claim_mailbox_entries(
888 None,
889 16,
890 &self.consumer_id,
891 now_unix_millis(),
892 DEFAULT_MAILBOX_LEASE_MS,
893 )
894 .await
895 {
896 Ok(claimed) if !claimed.is_empty() => {
897 tracing::info!(count = claimed.len(), "sweep picked up orphaned entries");
898 for entry in claimed {
899 let thread_id = entry.mailbox_id.clone();
900 let generation = entry.generation;
901 let mailbox = self.get_or_create_mailbox(&thread_id, generation).await;
902 let is_idle = {
903 let inner = mailbox.inner.lock().await;
904 matches!(inner.status, ThreadStatus::Idle)
905 };
906 if is_idle {
907 let claim_token = entry.claim_token.clone().unwrap_or_default();
909 match start_agent_run_for_entry(
910 &self.os,
911 &self.mailbox_store,
912 &entry,
913 true,
914 )
915 .await
916 {
917 Ok(run) => {
918 let renewal = spawn_lease_renewal(
919 self.mailbox_store.clone(),
920 entry.entry_id.clone(),
921 claim_token.clone(),
922 DEFAULT_MAILBOX_LEASE_MS,
923 );
924 let run_id = run.run_id.clone();
925 let completed_run_id = run_id.clone();
926 let entry_id = entry.entry_id.clone();
927 {
928 let mut inner = mailbox.inner.lock().await;
929 inner.status = ThreadStatus::Running {
930 entry_id,
931 claim_token,
932 };
933 inner.lease_renewal = Some(renewal);
934 }
935 let svc = self.clone();
936 let tid = thread_id.clone();
937 tokio::spawn(async move {
938 drain_background_run(run).await;
939 svc.on_run_complete(&tid, &completed_run_id).await;
940 });
941 }
942 Err(MailboxRunStartError::Busy(_)) => {
943 let _ = super::mailbox::nack_claimed_entry(
944 &self.mailbox_store,
945 &entry.entry_id,
946 &claim_token,
947 250,
948 "thread busy during sweep",
949 )
950 .await;
951 }
952 Err(MailboxRunStartError::Permanent(error)) => {
953 let _ = dead_letter_claimed_entry(
954 &self.mailbox_store,
955 &entry.entry_id,
956 &claim_token,
957 &error,
958 )
959 .await;
960 }
961 Err(_) => {
962 let _ = super::mailbox::nack_claimed_entry(
963 &self.mailbox_store,
964 &entry.entry_id,
965 &claim_token,
966 1000,
967 "sweep dispatch failed",
968 )
969 .await;
970 }
971 }
972 } else {
973 let claim_token = entry.claim_token.clone().unwrap_or_default();
975 let _ = super::mailbox::nack_claimed_entry(
976 &self.mailbox_store,
977 &entry.entry_id,
978 &claim_token,
979 1000,
980 "thread busy during sweep",
981 )
982 .await;
983 }
984 }
985 }
986 Ok(_) => {} Err(err) => {
988 tracing::error!(%err, "sweep failed to claim entries");
989 }
990 }
991
992 if last_gc.elapsed() >= gc_interval {
994 let cutoff = now_unix_millis().saturating_sub(gc_ttl_ms);
995 match self
996 .mailbox_store
997 .purge_terminal_mailbox_entries(cutoff)
998 .await
999 {
1000 Ok(0) => {}
1001 Ok(n) => tracing::debug!(purged = n, "mailbox GC purged terminal entries"),
1002 Err(err) => tracing::warn!(%err, "mailbox GC failed"),
1003 }
1004 last_gc = std::time::Instant::now();
1005 }
1006 }
1007 }
1008}
1009
1010pub struct ControlResult {
1012 pub cancelled_run_id: Option<String>,
1013 pub generation: Option<u64>,
1014 pub superseded_entries: Vec<MailboxEntry>,
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::*;
1020 use async_trait::async_trait;
1021 use tirea_agentos::composition::{AgentDefinition, AgentDefinitionSpec, AgentOsBuilder};
1022 use tirea_agentos::contracts::runtime::behavior::ReadOnlyContext;
1023 use tirea_agentos::contracts::runtime::phase::{ActionSet, BeforeInferenceAction};
1024 use tirea_agentos::contracts::storage::{MailboxStore, ThreadReader, ThreadWriter};
1025 use tirea_agentos::contracts::{AgentBehavior, TerminationReason};
1026 use tirea_contract::storage::{
1027 MailboxWriter, RunOrigin, RunQuery, RunReader, RunStatus, RunWriter,
1028 };
1029 use tirea_store_adapters::MemoryStore;
1030
1031 struct TerminatePlugin;
1032
1033 #[async_trait]
1034 impl AgentBehavior for TerminatePlugin {
1035 fn id(&self) -> &str {
1036 "svc_terminate"
1037 }
1038
1039 async fn before_inference(
1040 &self,
1041 _ctx: &ReadOnlyContext<'_>,
1042 ) -> ActionSet<BeforeInferenceAction> {
1043 ActionSet::single(BeforeInferenceAction::Terminate(
1044 TerminationReason::BehaviorRequested,
1045 ))
1046 }
1047 }
1048
1049 struct DelayedTerminatePlugin {
1050 id: &'static str,
1051 delay_ms: u64,
1052 }
1053
1054 #[async_trait]
1055 impl AgentBehavior for DelayedTerminatePlugin {
1056 fn id(&self) -> &str {
1057 self.id
1058 }
1059
1060 async fn before_inference(
1061 &self,
1062 _ctx: &ReadOnlyContext<'_>,
1063 ) -> ActionSet<BeforeInferenceAction> {
1064 tokio::time::sleep(Duration::from_millis(self.delay_ms)).await;
1065 ActionSet::single(BeforeInferenceAction::Terminate(
1066 TerminationReason::BehaviorRequested,
1067 ))
1068 }
1069 }
1070
1071 fn make_os_with_agents(store: Arc<MemoryStore>, agent_ids: &[&str]) -> Arc<AgentOs> {
1072 let mut builder = AgentOsBuilder::new()
1073 .with_registered_behavior("svc_terminate", Arc::new(TerminatePlugin))
1074 .with_agent_state_store(store);
1075 for agent_id in agent_ids {
1076 builder = builder.with_agent_spec(AgentDefinitionSpec::local_with_id(
1077 *agent_id,
1078 AgentDefinition {
1079 id: (*agent_id).to_string(),
1080 behavior_ids: vec!["svc_terminate".to_string()],
1081 ..Default::default()
1082 },
1083 ));
1084 }
1085 Arc::new(builder.build().expect("build AgentOs"))
1086 }
1087
1088 fn make_os(store: Arc<MemoryStore>) -> Arc<AgentOs> {
1089 make_os_with_agents(store, &["test"])
1090 }
1091
1092 fn make_request(thread_id: &str, run_id: &str) -> RunRequest {
1093 RunRequest {
1094 agent_id: "test".to_string(),
1095 thread_id: Some(thread_id.to_string()),
1096 run_id: Some(run_id.to_string()),
1097 parent_run_id: None,
1098 parent_thread_id: None,
1099 resource_id: None,
1100 origin: Default::default(),
1101 state: None,
1102 messages: vec![],
1103 initial_decisions: vec![],
1104 source_mailbox_entry_id: None,
1105 }
1106 }
1107
1108 fn completion_message_for_parent(
1109 messages: &[std::sync::Arc<tirea_agentos::contracts::thread::Message>],
1110 parent_run_id: &str,
1111 ) -> Option<serde_json::Value> {
1112 completion_messages_for_parent(messages, parent_run_id)
1113 .into_iter()
1114 .next()
1115 }
1116
1117 fn completion_messages_for_parent(
1118 messages: &[std::sync::Arc<tirea_agentos::contracts::thread::Message>],
1119 parent_run_id: &str,
1120 ) -> Vec<serde_json::Value> {
1121 messages
1122 .iter()
1123 .map(std::sync::Arc::as_ref)
1124 .filter_map(|message| {
1125 let parsed: serde_json::Value = serde_json::from_str(&message.content).ok()?;
1126 if parsed["type"].as_str() == Some("background_task_notification")
1127 && parsed["recipient_task_id"].as_str() == Some(parent_run_id)
1128 {
1129 Some(parsed)
1130 } else {
1131 None
1132 }
1133 })
1134 .collect()
1135 }
1136
1137 async fn seed_completed_run(
1138 store: &MemoryStore,
1139 run_id: &str,
1140 thread_id: &str,
1141 agent_id: &str,
1142 origin: RunOrigin,
1143 parent_thread_id: Option<&str>,
1144 ) {
1145 let now = now_unix_millis();
1146 let mut record = tirea_contract::storage::RunRecord::new(
1147 run_id.to_string(),
1148 thread_id.to_string(),
1149 agent_id.to_string(),
1150 origin,
1151 RunStatus::Done,
1152 now,
1153 );
1154 record.parent_thread_id = parent_thread_id.map(ToString::to_string);
1155 record.termination_code = Some("natural".to_string());
1156 record.updated_at = now;
1157 store.upsert_run(&record).await.expect("seed completed run");
1158 }
1159
1160 async fn seed_child_terminal_run(
1161 store: &MemoryStore,
1162 run_id: &str,
1163 thread_id: &str,
1164 agent_id: &str,
1165 parent_run_id: &str,
1166 source_mailbox_entry_id: Option<&str>,
1167 ) {
1168 seed_child_terminal_run_with_status(
1169 store,
1170 run_id,
1171 thread_id,
1172 agent_id,
1173 parent_run_id,
1174 source_mailbox_entry_id,
1175 Some("natural"),
1176 None,
1177 )
1178 .await;
1179 }
1180
1181 #[allow(clippy::too_many_arguments)]
1182 async fn seed_child_terminal_run_with_status(
1183 store: &MemoryStore,
1184 run_id: &str,
1185 thread_id: &str,
1186 agent_id: &str,
1187 parent_run_id: &str,
1188 source_mailbox_entry_id: Option<&str>,
1189 termination_code: Option<&str>,
1190 termination_detail: Option<&str>,
1191 ) {
1192 let now = now_unix_millis();
1193 let mut record = tirea_contract::storage::RunRecord::new(
1194 run_id.to_string(),
1195 thread_id.to_string(),
1196 agent_id.to_string(),
1197 RunOrigin::User,
1198 RunStatus::Done,
1199 now,
1200 );
1201 record.parent_run_id = Some(parent_run_id.to_string());
1202 record.source_mailbox_entry_id = source_mailbox_entry_id.map(ToString::to_string);
1203 record.termination_code = termination_code.map(ToString::to_string);
1204 record.termination_detail = termination_detail.map(ToString::to_string);
1205 record.updated_at = now;
1206 store.upsert_run(&record).await.expect("seed child run");
1207 }
1208
1209 async fn wait_for_completion_messages(
1210 store: &MemoryStore,
1211 thread_id: &str,
1212 parent_run_id: &str,
1213 expected: usize,
1214 ) -> Vec<serde_json::Value> {
1215 for _ in 0..40 {
1216 if let Some(thread) = store
1217 .load_thread(thread_id)
1218 .await
1219 .expect("load thread should succeed")
1220 {
1221 let messages = completion_messages_for_parent(&thread.messages, parent_run_id);
1222 if messages.len() >= expected {
1223 return messages;
1224 }
1225 }
1226 tokio::time::sleep(Duration::from_millis(25)).await;
1227 }
1228
1229 store
1230 .load_thread(thread_id)
1231 .await
1232 .expect("load thread should succeed")
1233 .map(|thread| completion_messages_for_parent(&thread.messages, parent_run_id))
1234 .unwrap_or_default()
1235 }
1236
1237 async fn wait_for_run_record(
1238 store: &MemoryStore,
1239 run_id: &str,
1240 ) -> tirea_contract::storage::RunRecord {
1241 for _ in 0..40 {
1242 if let Some(record) = RunReader::load_run(store, run_id)
1243 .await
1244 .expect("load run should succeed")
1245 {
1246 return record;
1247 }
1248 tokio::time::sleep(Duration::from_millis(25)).await;
1249 }
1250
1251 RunReader::load_run(store, run_id)
1252 .await
1253 .expect("load run should succeed")
1254 .unwrap_or_else(|| panic!("timed out waiting for run '{run_id}'"))
1255 }
1256
1257 #[tokio::test]
1258 async fn submit_dispatches_immediately_when_idle() {
1259 let store = Arc::new(MemoryStore::new());
1260 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1261 let os = make_os(store.clone());
1262
1263 let svc = Arc::new(MailboxService::new(
1264 os.clone(),
1265 mailbox_store.clone(),
1266 "test-svc",
1267 ));
1268
1269 let (thread_id, run_id, entry_id) = svc
1270 .submit(
1271 "test",
1272 make_request("svc-thread-1", "svc-run-1"),
1273 EnqueueOptions::default(),
1274 )
1275 .await
1276 .expect("submit");
1277
1278 assert_eq!(thread_id, "svc-thread-1");
1279 assert_eq!(run_id, "svc-run-1");
1280 assert!(!entry_id.is_empty());
1281
1282 tokio::time::sleep(Duration::from_millis(200)).await;
1284
1285 let loaded = mailbox_store
1287 .load_mailbox_entry(&entry_id)
1288 .await
1289 .unwrap()
1290 .expect("entry should exist");
1291 assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1292 }
1293
1294 #[tokio::test]
1295 async fn submit_buffers_when_busy_then_dispatches_on_completion() {
1296 let store = Arc::new(MemoryStore::new());
1297 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1298 let os = make_os(store.clone());
1299
1300 let svc = Arc::new(MailboxService::new(
1301 os.clone(),
1302 mailbox_store.clone(),
1303 "test-svc",
1304 ));
1305
1306 let (_t1, _r1, entry1) = svc
1308 .submit(
1309 "test",
1310 make_request("svc-thread-2", "svc-run-2a"),
1311 EnqueueOptions::default(),
1312 )
1313 .await
1314 .expect("submit 1");
1315
1316 let (_t2, _r2, entry2) = svc
1318 .submit(
1319 "test",
1320 make_request("svc-thread-2", "svc-run-2b"),
1321 EnqueueOptions::default(),
1322 )
1323 .await
1324 .expect("submit 2");
1325
1326 tokio::time::sleep(Duration::from_millis(500)).await;
1328
1329 let e1 = mailbox_store
1331 .load_mailbox_entry(&entry1)
1332 .await
1333 .unwrap()
1334 .expect("entry1");
1335 assert_eq!(e1.status, MailboxEntryStatus::Accepted);
1336
1337 let e2 = mailbox_store
1338 .load_mailbox_entry(&entry2)
1339 .await
1340 .unwrap()
1341 .expect("entry2");
1342 assert_eq!(e2.status, MailboxEntryStatus::Accepted);
1343 }
1344
1345 #[tokio::test]
1346 async fn control_interrupt_clears_pending() {
1347 let store = Arc::new(MemoryStore::new());
1348 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1349 let os = make_os(store.clone());
1350
1351 let svc = Arc::new(MailboxService::new(
1352 os.clone(),
1353 mailbox_store.clone(),
1354 "test-svc",
1355 ));
1356
1357 let _ = svc
1359 .submit(
1360 "test",
1361 make_request("svc-thread-3", "svc-run-3a"),
1362 EnqueueOptions::default(),
1363 )
1364 .await
1365 .expect("submit 1");
1366
1367 let (_, _, entry2) = svc
1369 .submit(
1370 "test",
1371 make_request("svc-thread-3", "svc-run-3b"),
1372 EnqueueOptions::default(),
1373 )
1374 .await
1375 .expect("submit 2");
1376
1377 let result = svc
1379 .control("svc-thread-3", ControlSignal::Interrupt)
1380 .await
1381 .expect("interrupt");
1382 assert!(result.generation.is_some());
1383
1384 tokio::time::sleep(Duration::from_millis(300)).await;
1386
1387 let e2 = mailbox_store
1389 .load_mailbox_entry(&entry2)
1390 .await
1391 .unwrap()
1392 .expect("entry2");
1393 assert_eq!(e2.status, MailboxEntryStatus::Superseded);
1394 }
1395
1396 #[tokio::test]
1397 async fn submit_rejects_unknown_agent() {
1398 let store = Arc::new(MemoryStore::new());
1399 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1400 let os = make_os(store.clone());
1401
1402 let svc = Arc::new(MailboxService::new(
1403 os.clone(),
1404 mailbox_store.clone(),
1405 "test-svc",
1406 ));
1407
1408 let result = svc
1409 .submit(
1410 "nonexistent",
1411 make_request("svc-thread-4", "svc-run-4"),
1412 EnqueueOptions::default(),
1413 )
1414 .await;
1415 assert!(result.is_err());
1416 }
1417
1418 #[tokio::test]
1419 async fn recover_loads_queued_entries() {
1420 let store = Arc::new(MemoryStore::new());
1421 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1422 let os = make_os(store.clone());
1423
1424 let now = now_unix_millis();
1426 store
1427 .ensure_mailbox_state("svc-thread-5", now)
1428 .await
1429 .unwrap();
1430 let entry = mailbox_entry_from_request(
1431 &make_request("svc-thread-5", "svc-run-5"),
1432 0,
1433 &EnqueueOptions::default(),
1434 now,
1435 );
1436 store.enqueue_mailbox_entry(&entry).await.unwrap();
1437
1438 let svc = Arc::new(MailboxService::new(
1439 os.clone(),
1440 mailbox_store.clone(),
1441 "test-svc",
1442 ));
1443
1444 let count = svc.recover().await.expect("recover");
1445 assert_eq!(count, 1);
1446
1447 tokio::time::sleep(Duration::from_millis(300)).await;
1449
1450 let loaded = mailbox_store
1452 .load_mailbox_entry(&entry.entry_id)
1453 .await
1454 .unwrap()
1455 .expect("entry");
1456 assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1457 }
1458
1459 #[tokio::test]
1460 async fn submit_streaming_returns_stream() {
1461 let store = Arc::new(MemoryStore::new());
1462 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1463 let os = make_os(store.clone());
1464
1465 let svc = Arc::new(MailboxService::new(
1466 os.clone(),
1467 mailbox_store.clone(),
1468 "test-svc",
1469 ));
1470
1471 let mut run = svc
1472 .submit_streaming(
1473 "test",
1474 make_request("svc-thread-6", "svc-run-6"),
1475 EnqueueOptions::default(),
1476 )
1477 .await
1478 .expect("submit_streaming");
1479
1480 while run.events.next().await.is_some() {}
1482
1483 tokio::time::sleep(Duration::from_millis(100)).await;
1485
1486 let map = svc.mailboxes.read().await;
1488 if let Some(mb) = map.get("svc-thread-6") {
1489 let inner = mb.inner.lock().await;
1490 assert!(
1491 matches!(inner.status, ThreadStatus::Idle),
1492 "expected idle after stream exhaustion"
1493 );
1494 }
1495 }
1496
1497 #[tokio::test]
1498 async fn completed_background_run_notifies_parent_task_on_same_thread() {
1499 let store = Arc::new(MemoryStore::new());
1500 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1501 let os = make_os(store.clone());
1502 let svc = Arc::new(MailboxService::new(
1503 os.clone(),
1504 mailbox_store.clone(),
1505 "test-svc",
1506 ));
1507
1508 let parent_run_id = "svc-parent-run-1";
1509 seed_completed_run(
1510 store.as_ref(),
1511 parent_run_id,
1512 "svc-thread-parent-1",
1513 "test",
1514 RunOrigin::User,
1515 None,
1516 )
1517 .await;
1518 let mut request = make_request("svc-thread-parent-1", "svc-child-run-1");
1519 request.parent_run_id = Some(parent_run_id.to_string());
1520
1521 let (_thread_id, run_id, entry_id) = svc
1522 .submit("test", request, EnqueueOptions::default())
1523 .await
1524 .expect("submit");
1525
1526 tokio::time::sleep(Duration::from_millis(400)).await;
1527
1528 let thread = store
1529 .load_thread("svc-thread-parent-1")
1530 .await
1531 .unwrap()
1532 .expect("thread should exist");
1533 let notification = completion_message_for_parent(&thread.messages, parent_run_id)
1534 .expect("expected parent completion notification");
1535 assert_eq!(
1536 notification["child_task_id"].as_str(),
1537 Some(entry_id.as_str())
1538 );
1539 assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1540 assert_eq!(notification["status"].as_str(), Some("completed"));
1541
1542 let internal_runs = RunReader::list_runs(
1543 store.as_ref(),
1544 &RunQuery {
1545 thread_id: Some("svc-thread-parent-1".to_string()),
1546 origin: Some(RunOrigin::Internal),
1547 ..Default::default()
1548 },
1549 )
1550 .await
1551 .expect("list internal runs");
1552 assert!(
1553 !internal_runs.items.is_empty(),
1554 "expected completion notification to execute via internal run"
1555 );
1556 }
1557
1558 #[tokio::test]
1559 async fn completed_background_run_notifies_parent_task_on_parent_thread() {
1560 let store = Arc::new(MemoryStore::new());
1561 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1562 let os = make_os(store.clone());
1563 let svc = Arc::new(MailboxService::new(
1564 os.clone(),
1565 mailbox_store.clone(),
1566 "test-svc",
1567 ));
1568
1569 store
1570 .save(&tirea_agentos::contracts::thread::Thread::new(
1571 "svc-parent-thread-2",
1572 ))
1573 .await
1574 .expect("seed parent thread");
1575
1576 let parent_run_id = "svc-parent-run-2";
1577 seed_completed_run(
1578 store.as_ref(),
1579 parent_run_id,
1580 "svc-parent-thread-2",
1581 "test",
1582 RunOrigin::User,
1583 None,
1584 )
1585 .await;
1586 let mut request = make_request("svc-child-thread-2", "svc-child-run-2");
1587 request.parent_run_id = Some(parent_run_id.to_string());
1588 request.parent_thread_id = Some("svc-parent-thread-2".to_string());
1589
1590 let (_thread_id, run_id, _entry_id) = svc
1591 .submit("test", request, EnqueueOptions::default())
1592 .await
1593 .expect("submit");
1594
1595 tokio::time::sleep(Duration::from_millis(400)).await;
1596
1597 let parent_thread = store
1598 .load_thread("svc-parent-thread-2")
1599 .await
1600 .unwrap()
1601 .expect("parent thread should exist");
1602 let notification = completion_message_for_parent(&parent_thread.messages, parent_run_id)
1603 .expect("expected parent-thread completion notification");
1604 assert_eq!(
1605 notification["child_thread_id"].as_str(),
1606 Some("svc-child-thread-2")
1607 );
1608 assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1609
1610 let child_thread = store
1611 .load_thread("svc-child-thread-2")
1612 .await
1613 .unwrap()
1614 .expect("child thread should exist");
1615 assert!(
1616 completion_message_for_parent(&child_thread.messages, parent_run_id).is_none(),
1617 "notification should not be appended to child thread when parent_thread_id is set"
1618 );
1619
1620 let internal_runs = RunReader::list_runs(
1621 store.as_ref(),
1622 &RunQuery {
1623 thread_id: Some("svc-parent-thread-2".to_string()),
1624 origin: Some(RunOrigin::Internal),
1625 ..Default::default()
1626 },
1627 )
1628 .await
1629 .expect("list internal runs");
1630 assert!(
1631 !internal_runs.items.is_empty(),
1632 "expected completion notification to execute on the parent thread"
1633 );
1634 }
1635
1636 #[tokio::test]
1637 async fn completed_foreground_child_run_does_not_notify_parent_task() {
1638 let store = Arc::new(MemoryStore::new());
1639 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1640 let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1641 let svc = Arc::new(MailboxService::new(
1642 os.clone(),
1643 mailbox_store.clone(),
1644 "test-svc",
1645 ));
1646
1647 store
1648 .save(&tirea_agentos::contracts::thread::Thread::new(
1649 "svc-parent-thread-foreground",
1650 ))
1651 .await
1652 .expect("seed parent thread");
1653 seed_completed_run(
1654 store.as_ref(),
1655 "svc-parent-run-foreground",
1656 "svc-parent-thread-foreground",
1657 "parent-agent",
1658 RunOrigin::User,
1659 None,
1660 )
1661 .await;
1662 seed_child_terminal_run(
1663 store.as_ref(),
1664 "svc-child-run-foreground",
1665 "svc-child-thread-foreground",
1666 "worker-agent",
1667 "svc-parent-run-foreground",
1668 None,
1669 )
1670 .await;
1671
1672 svc.submit_parent_completion_notification("svc-child-run-foreground")
1673 .await
1674 .expect("foreground notification path should no-op");
1675 tokio::time::sleep(Duration::from_millis(250)).await;
1676
1677 let parent_thread = store
1678 .load_thread("svc-parent-thread-foreground")
1679 .await
1680 .unwrap()
1681 .expect("parent thread should exist");
1682 assert!(
1683 completion_message_for_parent(&parent_thread.messages, "svc-parent-run-foreground")
1684 .is_none(),
1685 "foreground child run should not enqueue parent completion notification"
1686 );
1687
1688 let internal_runs = RunReader::list_runs(
1689 store.as_ref(),
1690 &RunQuery {
1691 thread_id: Some("svc-parent-thread-foreground".to_string()),
1692 origin: Some(RunOrigin::Internal),
1693 ..Default::default()
1694 },
1695 )
1696 .await
1697 .expect("list internal runs");
1698 assert!(
1699 internal_runs.items.is_empty(),
1700 "foreground child run should not spawn internal notification runs"
1701 );
1702 }
1703
1704 #[tokio::test]
1705 async fn multi_agent_background_notification_runs_under_parent_agent() {
1706 let store = Arc::new(MemoryStore::new());
1707 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1708 let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1709 let svc = Arc::new(MailboxService::new(
1710 os.clone(),
1711 mailbox_store.clone(),
1712 "test-svc",
1713 ));
1714
1715 store
1716 .save(&tirea_agentos::contracts::thread::Thread::new(
1717 "svc-parent-thread-multi-agent",
1718 ))
1719 .await
1720 .expect("seed parent thread");
1721 seed_completed_run(
1722 store.as_ref(),
1723 "svc-parent-run-multi-agent",
1724 "svc-parent-thread-multi-agent",
1725 "parent-agent",
1726 RunOrigin::User,
1727 None,
1728 )
1729 .await;
1730
1731 let mut request = RunRequest {
1732 agent_id: "worker-agent".to_string(),
1733 ..make_request("svc-child-thread-multi-agent", "svc-child-run-multi-agent")
1734 };
1735 request.parent_run_id = Some("svc-parent-run-multi-agent".to_string());
1736 request.parent_thread_id = Some("svc-parent-thread-multi-agent".to_string());
1737
1738 let (_thread_id, run_id, entry_id) = svc
1739 .submit("worker-agent", request, EnqueueOptions::default())
1740 .await
1741 .expect("submit child background run");
1742
1743 tokio::time::sleep(Duration::from_millis(450)).await;
1744
1745 let parent_thread = store
1746 .load_thread("svc-parent-thread-multi-agent")
1747 .await
1748 .unwrap()
1749 .expect("parent thread should exist");
1750 let notification =
1751 completion_message_for_parent(&parent_thread.messages, "svc-parent-run-multi-agent")
1752 .expect("expected multi-agent completion notification");
1753 assert_eq!(
1754 notification["child_task_id"].as_str(),
1755 Some(entry_id.as_str())
1756 );
1757 assert_eq!(notification["child_run_id"].as_str(), Some(run_id.as_str()));
1758 assert_eq!(notification["status"].as_str(), Some("completed"));
1759
1760 let internal_runs = RunReader::list_runs(
1761 store.as_ref(),
1762 &RunQuery {
1763 thread_id: Some("svc-parent-thread-multi-agent".to_string()),
1764 origin: Some(RunOrigin::Internal),
1765 ..Default::default()
1766 },
1767 )
1768 .await
1769 .expect("list internal runs");
1770 assert!(
1771 internal_runs
1772 .items
1773 .iter()
1774 .any(|record| record.agent_id == "parent-agent"),
1775 "expected notification run to execute under the parent agent"
1776 );
1777 assert!(
1778 internal_runs
1779 .items
1780 .iter()
1781 .all(|record| record.agent_id != "worker-agent"),
1782 "notification run should not execute under the child agent"
1783 );
1784 }
1785
1786 #[tokio::test]
1787 async fn parent_completion_notification_maps_terminal_statuses() {
1788 let store = Arc::new(MemoryStore::new());
1789 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1790 let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1791 let svc = Arc::new(MailboxService::new(
1792 os.clone(),
1793 mailbox_store.clone(),
1794 "test-svc",
1795 ));
1796
1797 let parent_run_id = "svc-parent-run-status-map";
1798 seed_completed_run(
1799 store.as_ref(),
1800 parent_run_id,
1801 "svc-parent-thread-status-map",
1802 "parent-agent",
1803 RunOrigin::User,
1804 None,
1805 )
1806 .await;
1807
1808 let cases = [
1809 (
1810 "svc-child-run-failed",
1811 Some("error"),
1812 Some("worker crashed"),
1813 "failed",
1814 ),
1815 (
1816 "svc-child-run-cancelled",
1817 Some("cancelled"),
1818 Some("user requested stop"),
1819 "cancelled",
1820 ),
1821 (
1822 "svc-child-run-stopped",
1823 Some("stopped:max_turns"),
1824 Some("max turns reached"),
1825 "stopped",
1826 ),
1827 ];
1828
1829 for (idx, (run_id, termination_code, termination_detail, _expected_status)) in
1830 cases.iter().enumerate()
1831 {
1832 seed_child_terminal_run_with_status(
1833 store.as_ref(),
1834 run_id,
1835 &format!("svc-child-thread-status-{idx}"),
1836 "worker-agent",
1837 parent_run_id,
1838 Some(&format!("svc-child-entry-status-{idx}")),
1839 *termination_code,
1840 *termination_detail,
1841 )
1842 .await;
1843
1844 svc.submit_parent_completion_notification(run_id)
1845 .await
1846 .expect("submit parent completion notification");
1847 }
1848
1849 let notifications = wait_for_completion_messages(
1850 store.as_ref(),
1851 "svc-parent-thread-status-map",
1852 parent_run_id,
1853 3,
1854 )
1855 .await;
1856 assert_eq!(
1857 notifications.len(),
1858 3,
1859 "expected three completion notifications"
1860 );
1861
1862 let by_run_id: std::collections::HashMap<_, _> = notifications
1863 .into_iter()
1864 .map(|notification| {
1865 (
1866 notification["child_run_id"]
1867 .as_str()
1868 .expect("child_run_id should be present")
1869 .to_string(),
1870 notification,
1871 )
1872 })
1873 .collect();
1874
1875 for (run_id, termination_code, termination_detail, expected_status) in cases {
1876 let notification = by_run_id
1877 .get(run_id)
1878 .unwrap_or_else(|| panic!("missing notification for {run_id}"));
1879 assert_eq!(notification["status"].as_str(), Some(expected_status));
1880 assert_eq!(notification["termination_code"].as_str(), termination_code,);
1881 assert_eq!(
1882 notification["termination_detail"].as_str(),
1883 termination_detail,
1884 );
1885 }
1886 }
1887
1888 #[tokio::test]
1889 async fn parent_completion_notification_is_idempotent_for_repeated_completion_callbacks() {
1890 let store = Arc::new(MemoryStore::new());
1891 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1892 let os = make_os_with_agents(store.clone(), &["parent-agent", "worker-agent"]);
1893 let svc = Arc::new(MailboxService::new(
1894 os.clone(),
1895 mailbox_store.clone(),
1896 "test-svc",
1897 ));
1898
1899 let parent_run_id = "svc-parent-run-idempotent";
1900 let child_run_id = "svc-child-run-idempotent";
1901 seed_completed_run(
1902 store.as_ref(),
1903 parent_run_id,
1904 "svc-parent-thread-idempotent",
1905 "parent-agent",
1906 RunOrigin::User,
1907 None,
1908 )
1909 .await;
1910 seed_child_terminal_run(
1911 store.as_ref(),
1912 child_run_id,
1913 "svc-child-thread-idempotent",
1914 "worker-agent",
1915 parent_run_id,
1916 Some("svc-child-entry-idempotent"),
1917 )
1918 .await;
1919
1920 svc.submit_parent_completion_notification(child_run_id)
1921 .await
1922 .expect("first completion callback");
1923 svc.submit_parent_completion_notification(child_run_id)
1924 .await
1925 .expect("duplicate completion callback should no-op");
1926 svc.on_run_complete("svc-child-thread-idempotent", child_run_id)
1927 .await;
1928
1929 let notifications = wait_for_completion_messages(
1930 store.as_ref(),
1931 "svc-parent-thread-idempotent",
1932 parent_run_id,
1933 1,
1934 )
1935 .await;
1936 assert_eq!(
1937 notifications.len(),
1938 1,
1939 "duplicate completion callbacks must not duplicate notifications"
1940 );
1941
1942 let internal_runs = RunReader::list_runs(
1943 store.as_ref(),
1944 &RunQuery {
1945 thread_id: Some("svc-parent-thread-idempotent".to_string()),
1946 origin: Some(RunOrigin::Internal),
1947 ..Default::default()
1948 },
1949 )
1950 .await
1951 .expect("list internal runs");
1952 assert_eq!(
1953 internal_runs.items.len(),
1954 1,
1955 "idempotent delivery should create exactly one internal notification run"
1956 );
1957 }
1958
1959 #[tokio::test]
1960 async fn parent_busy_thread_buffers_multiple_child_completion_notifications() {
1961 let store = Arc::new(MemoryStore::new());
1962 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1963 let os = Arc::new(
1964 AgentOsBuilder::new()
1965 .with_registered_behavior("svc_terminate", Arc::new(TerminatePlugin))
1966 .with_registered_behavior(
1967 "svc_parent_busy",
1968 Arc::new(DelayedTerminatePlugin {
1969 id: "svc_parent_busy",
1970 delay_ms: 350,
1971 }),
1972 )
1973 .with_registered_behavior(
1974 "svc_worker_slow",
1975 Arc::new(DelayedTerminatePlugin {
1976 id: "svc_worker_slow",
1977 delay_ms: 150,
1978 }),
1979 )
1980 .with_agent_state_store(store.clone())
1981 .with_agent_spec(AgentDefinitionSpec::local_with_id(
1982 "parent-agent",
1983 AgentDefinition {
1984 id: "parent-agent".to_string(),
1985 behavior_ids: vec!["svc_parent_busy".to_string()],
1986 ..Default::default()
1987 },
1988 ))
1989 .with_agent_spec(AgentDefinitionSpec::local_with_id(
1990 "worker-fast",
1991 AgentDefinition {
1992 id: "worker-fast".to_string(),
1993 behavior_ids: vec!["svc_terminate".to_string()],
1994 ..Default::default()
1995 },
1996 ))
1997 .with_agent_spec(AgentDefinitionSpec::local_with_id(
1998 "worker-slow",
1999 AgentDefinition {
2000 id: "worker-slow".to_string(),
2001 behavior_ids: vec!["svc_worker_slow".to_string()],
2002 ..Default::default()
2003 },
2004 ))
2005 .build()
2006 .expect("build AgentOs"),
2007 );
2008 let svc = Arc::new(MailboxService::new(
2009 os.clone(),
2010 mailbox_store.clone(),
2011 "test-svc",
2012 ));
2013
2014 let parent_thread_id = "svc-parent-thread-busy";
2015 let parent_run_id = "svc-parent-run-busy";
2016 let parent_request = RunRequest {
2017 agent_id: "parent-agent".to_string(),
2018 ..make_request(parent_thread_id, parent_run_id)
2019 };
2020 svc.submit("parent-agent", parent_request, EnqueueOptions::default())
2021 .await
2022 .expect("submit parent background run");
2023 let _parent_run = wait_for_run_record(store.as_ref(), parent_run_id).await;
2024
2025 let mut slow_request = RunRequest {
2026 agent_id: "worker-slow".to_string(),
2027 ..make_request("svc-child-thread-busy-slow", "svc-child-run-busy-slow")
2028 };
2029 slow_request.parent_run_id = Some(parent_run_id.to_string());
2030 slow_request.parent_thread_id = Some(parent_thread_id.to_string());
2031 svc.submit("worker-slow", slow_request, EnqueueOptions::default())
2032 .await
2033 .expect("submit slow child");
2034
2035 let mut fast_request = RunRequest {
2036 agent_id: "worker-fast".to_string(),
2037 ..make_request("svc-child-thread-busy-fast", "svc-child-run-busy-fast")
2038 };
2039 fast_request.parent_run_id = Some(parent_run_id.to_string());
2040 fast_request.parent_thread_id = Some(parent_thread_id.to_string());
2041 svc.submit("worker-fast", fast_request, EnqueueOptions::default())
2042 .await
2043 .expect("submit fast child");
2044
2045 tokio::time::sleep(Duration::from_millis(225)).await;
2046 let pending_while_busy = store
2047 .load_thread(parent_thread_id)
2048 .await
2049 .expect("load parent thread should succeed")
2050 .map(|thread| completion_messages_for_parent(&thread.messages, parent_run_id))
2051 .unwrap_or_default();
2052 assert!(
2053 pending_while_busy.is_empty(),
2054 "busy parent thread should not receive completion messages before its active run finishes"
2055 );
2056
2057 let notifications =
2058 wait_for_completion_messages(store.as_ref(), parent_thread_id, parent_run_id, 2).await;
2059 assert_eq!(
2060 notifications.len(),
2061 2,
2062 "both child completions should be delivered after the parent thread becomes idle"
2063 );
2064 let child_run_ids: Vec<_> = notifications
2065 .iter()
2066 .map(|notification| {
2067 notification["child_run_id"]
2068 .as_str()
2069 .expect("child_run_id should be present")
2070 .to_string()
2071 })
2072 .collect();
2073 assert_eq!(
2074 child_run_ids,
2075 vec![
2076 "svc-child-run-busy-fast".to_string(),
2077 "svc-child-run-busy-slow".to_string(),
2078 ],
2079 "completion notifications should follow child completion order while buffering on the parent thread"
2080 );
2081 }
2082
2083 #[tokio::test]
2084 async fn grandchild_completion_notifies_immediate_parent_not_root_parent() {
2085 let store = Arc::new(MemoryStore::new());
2086 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
2087 let os = make_os_with_agents(
2088 store.clone(),
2089 &["root-agent", "child-agent", "worker-agent"],
2090 );
2091 let svc = Arc::new(MailboxService::new(
2092 os.clone(),
2093 mailbox_store.clone(),
2094 "test-svc",
2095 ));
2096
2097 seed_completed_run(
2098 store.as_ref(),
2099 "svc-root-run-nested",
2100 "svc-root-thread-nested",
2101 "root-agent",
2102 RunOrigin::User,
2103 None,
2104 )
2105 .await;
2106 seed_child_terminal_run(
2107 store.as_ref(),
2108 "svc-child-run-nested",
2109 "svc-child-thread-nested",
2110 "child-agent",
2111 "svc-root-run-nested",
2112 Some("svc-child-entry-nested"),
2113 )
2114 .await;
2115 seed_child_terminal_run(
2116 store.as_ref(),
2117 "svc-grandchild-run-nested",
2118 "svc-grandchild-thread-nested",
2119 "worker-agent",
2120 "svc-child-run-nested",
2121 Some("svc-grandchild-entry-nested"),
2122 )
2123 .await;
2124
2125 svc.submit_parent_completion_notification("svc-grandchild-run-nested")
2126 .await
2127 .expect("submit grandchild completion notification");
2128
2129 let child_notifications = wait_for_completion_messages(
2130 store.as_ref(),
2131 "svc-child-thread-nested",
2132 "svc-child-run-nested",
2133 1,
2134 )
2135 .await;
2136 assert_eq!(child_notifications.len(), 1);
2137 assert_eq!(
2138 child_notifications[0]["child_run_id"].as_str(),
2139 Some("svc-grandchild-run-nested")
2140 );
2141
2142 let root_notifications = wait_for_completion_messages(
2143 store.as_ref(),
2144 "svc-root-thread-nested",
2145 "svc-root-run-nested",
2146 1,
2147 )
2148 .await;
2149 assert!(
2150 root_notifications.is_empty(),
2151 "grandchild completion should notify the immediate parent task, not the root parent"
2152 );
2153
2154 let child_internal_runs = RunReader::list_runs(
2155 store.as_ref(),
2156 &RunQuery {
2157 thread_id: Some("svc-child-thread-nested".to_string()),
2158 origin: Some(RunOrigin::Internal),
2159 ..Default::default()
2160 },
2161 )
2162 .await
2163 .expect("list child internal runs");
2164 assert!(
2165 child_internal_runs
2166 .items
2167 .iter()
2168 .any(|record| record.agent_id == "child-agent"),
2169 "nested completion notification should execute under the immediate parent agent"
2170 );
2171
2172 let root_internal_runs = RunReader::list_runs(
2173 store.as_ref(),
2174 &RunQuery {
2175 thread_id: Some("svc-root-thread-nested".to_string()),
2176 origin: Some(RunOrigin::Internal),
2177 ..Default::default()
2178 },
2179 )
2180 .await
2181 .expect("list root internal runs");
2182 assert!(
2183 root_internal_runs.items.is_empty(),
2184 "nested completion should not spawn a root-level notification run"
2185 );
2186 }
2187}