1#![allow(dead_code)]
3
4use std::sync::Arc;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use async_trait::async_trait;
8use futures::stream::FuturesUnordered;
9use futures::StreamExt;
10use tirea_agentos::contracts::storage::{
11 MailboxEntry, MailboxEntryOrigin, MailboxEntryStatus, MailboxQuery, MailboxReader,
12 MailboxStore, MailboxStoreError, RunStatus, ThreadReader,
13};
14use tirea_agentos::contracts::thread::Message;
15use tirea_agentos::contracts::RunRequest;
16use tirea_agentos::{AgentOs, AgentOsRunError, RunStream};
17use tirea_contract::storage::{MailboxReceiver, ReceiveOutcome, RunRecord};
18
19use super::ApiError;
20
21const DEFAULT_MAILBOX_POLL_INTERVAL_MS: u64 = 100;
22pub(crate) const DEFAULT_MAILBOX_LEASE_MS: u64 = 30_000;
23const DEFAULT_MAILBOX_RETRY_MS: u64 = 250;
24const DEFAULT_MAILBOX_BATCH_SIZE: usize = 16;
25const DEFAULT_MAILBOX_MAX_ATTEMPTS: u32 = 10;
26const DEFAULT_MAILBOX_GC_INTERVAL_SECS: u64 = 60;
27const DEFAULT_MAILBOX_GC_TTL_MS: u64 = 24 * 60 * 60 * 1000; pub(crate) const INLINE_MAILBOX_AVAILABLE_AT: u64 = i64::MAX as u64;
29
30pub(crate) fn now_unix_millis() -> u64 {
31 SystemTime::now()
32 .duration_since(UNIX_EPOCH)
33 .unwrap_or_default()
34 .as_millis()
35 .min(u128::from(u64::MAX)) as u64
36}
37
38pub(crate) fn new_id() -> String {
39 uuid::Uuid::now_v7().simple().to_string()
40}
41
42pub(crate) fn normalize_background_run_request(
47 agent_id: &str,
48 mut request: RunRequest,
49) -> RunRequest {
50 request.agent_id = agent_id.to_string();
51 if request.thread_id.is_none() {
52 request.thread_id = Some(new_id());
53 }
54 if request.run_id.is_none() {
55 request.run_id = Some(new_id());
56 }
57 request
58}
59
60#[derive(Debug, Clone, Default)]
62pub struct EnqueueOptions {
63 pub sender_id: Option<String>,
65 pub priority: u8,
67 pub dedupe_key: Option<String>,
69}
70
71pub(crate) fn mailbox_entry_from_request(
72 request: &RunRequest,
73 generation: u64,
74 options: &EnqueueOptions,
75 available_at: u64,
76) -> MailboxEntry {
77 let now = now_unix_millis();
78 let mailbox_id = request
79 .thread_id
80 .clone()
81 .expect("background mailbox request should have thread_id");
82 let payload = serde_json::to_value(request).expect("RunRequest should be serializable");
83 MailboxEntry {
84 entry_id: new_id(),
85 mailbox_id,
86 origin: MailboxEntryOrigin::from_run_origin(request.origin),
87 sender_id: options.sender_id.clone(),
88 payload,
89 priority: options.priority,
90 dedupe_key: options.dedupe_key.clone(),
91 generation,
92 status: MailboxEntryStatus::Queued,
93 available_at,
94 attempt_count: 0,
95 last_error: None,
96 claim_token: None,
97 claimed_by: None,
98 lease_until: None,
99 created_at: now,
100 updated_at: now,
101 }
102}
103
104pub(crate) fn mailbox_error(err: MailboxStoreError) -> ApiError {
105 ApiError::Internal(err.to_string())
106}
107
108pub(crate) fn is_generation_mismatch(err: &MailboxStoreError) -> bool {
109 matches!(err, MailboxStoreError::GenerationMismatch { .. })
110}
111
112pub(crate) fn is_permanent_dispatch_error(err: &AgentOsRunError) -> bool {
113 matches!(err, AgentOsRunError::Resolve(_))
114}
115
116pub(crate) async fn drain_background_run(mut run: RunStream) {
117 while run.events.next().await.is_some() {}
118}
119
120pub(crate) fn parent_completion_notification_dedupe_key(
121 parent_run_id: &str,
122 child_run_id: &str,
123) -> String {
124 format!("parent-task-completion:{parent_run_id}:{child_run_id}")
125}
126
127fn parent_completion_notification_status(record: &RunRecord) -> Option<&'static str> {
128 if record.status != RunStatus::Done {
129 return None;
130 }
131
132 match record.termination_code.as_deref() {
133 Some("error") => Some("failed"),
134 Some("cancelled") => Some("cancelled"),
135 Some(code) if code.starts_with("stopped:") => Some("stopped"),
136 Some("behavior_requested" | "natural") | None => Some("completed"),
137 Some(_) => Some("completed"),
138 }
139}
140
141pub(crate) fn build_parent_completion_notification_message(record: &RunRecord) -> Option<Message> {
142 let recipient_task_id = record.parent_run_id.clone()?;
143 let status = parent_completion_notification_status(record)?;
144 let task_id = record
145 .source_mailbox_entry_id
146 .as_deref()
147 .unwrap_or(record.run_id.as_str());
148 let content = serde_json::json!({
149 "type": "background_task_notification",
150 "recipient_task_id": recipient_task_id,
151 "child_task_id": task_id,
152 "child_run_id": record.run_id,
153 "child_thread_id": record.thread_id,
154 "status": status,
155 "termination_code": record.termination_code,
156 "termination_detail": record.termination_detail,
157 "result_ref": {
158 "task_id": task_id,
159 "run_id": record.run_id,
160 "thread_id": record.thread_id,
161 },
162 });
163
164 Some(Message::internal_system(content.to_string()))
165}
166
167pub(crate) async fn ack_claimed_entry(
172 mailbox_store: &Arc<dyn MailboxStore>,
173 entry_id: &str,
174 claim_token: &str,
175) -> Result<(), ApiError> {
176 match mailbox_store
177 .ack_mailbox_entry(entry_id, claim_token, now_unix_millis())
178 .await
179 {
180 Ok(()) => Ok(()),
181 Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
182 Err(err) => Err(mailbox_error(err)),
183 }
184}
185
186pub(crate) async fn nack_claimed_entry(
187 mailbox_store: &Arc<dyn MailboxStore>,
188 entry_id: &str,
189 claim_token: &str,
190 retry_delay_ms: u64,
191 error: &str,
192) -> Result<(), ApiError> {
193 let now = now_unix_millis();
194 match mailbox_store
195 .nack_mailbox_entry(
196 entry_id,
197 claim_token,
198 now.saturating_add(retry_delay_ms),
199 error,
200 now,
201 )
202 .await
203 {
204 Ok(()) => Ok(()),
205 Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
206 Err(err) => Err(mailbox_error(err)),
207 }
208}
209
210pub(crate) async fn dead_letter_claimed_entry(
211 mailbox_store: &Arc<dyn MailboxStore>,
212 entry_id: &str,
213 claim_token: &str,
214 error: &str,
215) -> Result<(), ApiError> {
216 match mailbox_store
217 .dead_letter_mailbox_entry(entry_id, claim_token, error, now_unix_millis())
218 .await
219 {
220 Ok(()) => Ok(()),
221 Err(MailboxStoreError::ClaimConflict(_)) => Ok(()),
222 Err(err) => Err(mailbox_error(err)),
223 }
224}
225
226pub struct AgentReceiver {
231 os: Arc<AgentOs>,
232}
233
234impl AgentReceiver {
235 pub fn new(os: Arc<AgentOs>) -> Self {
236 Self { os }
237 }
238}
239
240#[async_trait]
241impl MailboxReceiver for AgentReceiver {
242 async fn receive(&self, entry: &MailboxEntry) -> ReceiveOutcome {
243 let mut request: RunRequest = match serde_json::from_value(entry.payload.clone()) {
244 Ok(r) => r,
245 Err(e) => return ReceiveOutcome::Reject(format!("invalid payload: {e}")),
246 };
247
248 request.source_mailbox_entry_id = Some(entry.entry_id.clone());
249
250 let agent_id = request.agent_id.clone();
251 let thread_id = request
252 .thread_id
253 .clone()
254 .unwrap_or_else(|| entry.mailbox_id.clone());
255
256 match self
257 .os
258 .current_run_id_for_thread(&agent_id, &thread_id)
259 .await
260 {
261 Ok(Some(_)) => return ReceiveOutcome::Retry("thread has active run".into()),
262 Ok(None) => {}
263 Err(e) => return ReceiveOutcome::Retry(e.to_string()),
264 }
265
266 let resolved = match self.os.resolve(&agent_id) {
267 Ok(r) => r,
268 Err(e) => return ReceiveOutcome::Reject(e.to_string()),
269 };
270
271 match self
272 .os
273 .start_active_run_with_persistence(&agent_id, request, resolved, true, false)
274 .await
275 {
276 Ok(run) => {
277 tokio::spawn(drain_background_run(run));
278 ReceiveOutcome::Accepted
279 }
280 Err(e) if is_permanent_dispatch_error(&e) => ReceiveOutcome::Reject(e.to_string()),
281 Err(e) => ReceiveOutcome::Retry(e.to_string()),
282 }
283 }
284}
285
286pub(crate) enum MailboxRunStartError {
291 Busy(String),
292 Superseded(String),
293 Permanent(String),
294 Retryable(String),
295 Internal(ApiError),
296}
297
298pub(crate) async fn start_agent_run_for_entry(
299 os: &Arc<AgentOs>,
300 mailbox_store: &Arc<dyn MailboxStore>,
301 entry: &MailboxEntry,
302 persist_run: bool,
303) -> Result<RunStream, MailboxRunStartError> {
304 if let Some(current) = mailbox_store
306 .load_mailbox_entry(&entry.entry_id)
307 .await
308 .map_err(mailbox_error)
309 .map_err(MailboxRunStartError::Internal)?
310 {
311 if current.status != MailboxEntryStatus::Claimed || current.claim_token != entry.claim_token
312 {
313 return Err(MailboxRunStartError::Superseded(
314 current
315 .last_error
316 .unwrap_or_else(|| "mailbox entry is no longer active".to_string()),
317 ));
318 }
319 }
320
321 if mailbox_store
323 .load_mailbox_state(&entry.mailbox_id)
324 .await
325 .map_err(mailbox_error)
326 .map_err(MailboxRunStartError::Internal)?
327 .is_some_and(|state| state.current_generation != entry.generation)
328 {
329 return Err(MailboxRunStartError::Superseded(
330 "mailbox entry superseded by interrupt".to_string(),
331 ));
332 }
333
334 let mut request: RunRequest = serde_json::from_value(entry.payload.clone())
336 .map_err(|e| MailboxRunStartError::Permanent(format!("invalid payload: {e}")))?;
337 request.source_mailbox_entry_id = Some(entry.entry_id.clone());
338
339 let agent_id = request.agent_id.clone();
340 let thread_id = request
341 .thread_id
342 .clone()
343 .unwrap_or_else(|| entry.mailbox_id.clone());
344
345 match os.current_run_id_for_thread(&agent_id, &thread_id).await {
347 Ok(Some(_)) => {
348 return Err(MailboxRunStartError::Busy(
349 "thread already has an active run".to_string(),
350 ));
351 }
352 Ok(None) => {}
353 Err(err) => return Err(MailboxRunStartError::Internal(ApiError::from(err))),
354 }
355
356 let resolved = os
357 .resolve(&agent_id)
358 .map_err(|err| MailboxRunStartError::Permanent(err.to_string()))?;
359
360 os.start_active_run_with_persistence(&agent_id, request, resolved, persist_run, !persist_run)
361 .await
362 .map_err(|err| {
363 if is_permanent_dispatch_error(&err) {
364 MailboxRunStartError::Permanent(err.to_string())
365 } else {
366 MailboxRunStartError::Retryable(err.to_string())
367 }
368 })
369}
370
371pub(crate) async fn enqueue_mailbox_run(
377 os: &Arc<AgentOs>,
378 mailbox_store: &Arc<dyn MailboxStore>,
379 agent_id: &str,
380 request: RunRequest,
381 available_at: u64,
382 options: EnqueueOptions,
383) -> Result<(String, String, String), ApiError> {
384 os.resolve(agent_id).map_err(AgentOsRunError::from)?;
385
386 let request = normalize_background_run_request(agent_id, request);
387 let mailbox_id = request
388 .thread_id
389 .clone()
390 .expect("normalized mailbox run request should have thread_id");
391 let run_id = request
392 .run_id
393 .clone()
394 .expect("normalized mailbox run request should have run_id");
395
396 for _ in 0..2 {
397 let now = now_unix_millis();
398 let state = mailbox_store
399 .ensure_mailbox_state(&mailbox_id, now)
400 .await
401 .map_err(mailbox_error)?;
402 let entry =
403 mailbox_entry_from_request(&request, state.current_generation, &options, available_at);
404 let entry_id = entry.entry_id.clone();
405
406 match mailbox_store.enqueue_mailbox_entry(&entry).await {
407 Ok(()) => return Ok((mailbox_id, entry_id, run_id)),
408 Err(err) if is_generation_mismatch(&err) => continue,
409 Err(err) => return Err(mailbox_error(err)),
410 }
411 }
412
413 Err(ApiError::Internal(format!(
414 "mailbox enqueue raced with interrupt for mailbox '{mailbox_id}'"
415 )))
416}
417
418pub async fn enqueue_background_run(
420 os: &Arc<AgentOs>,
421 mailbox_store: &Arc<dyn MailboxStore>,
422 agent_id: &str,
423 request: RunRequest,
424 options: EnqueueOptions,
425) -> Result<(String, String, String), ApiError> {
426 let (mailbox_id, entry_id, run_id) = enqueue_mailbox_run(
427 os,
428 mailbox_store,
429 agent_id,
430 request,
431 now_unix_millis(),
432 options,
433 )
434 .await?;
435 Ok((mailbox_id, run_id, entry_id))
436}
437
438pub async fn start_streaming_run_via_mailbox(
439 os: &Arc<AgentOs>,
440 mailbox_store: &Arc<dyn MailboxStore>,
441 agent_id: &str,
442 request: RunRequest,
443 consumer_id: &str,
444 options: EnqueueOptions,
445) -> Result<RunStream, ApiError> {
446 let (_mailbox_id, entry_id, _run_id) = enqueue_mailbox_run(
447 os,
448 mailbox_store,
449 agent_id,
450 request,
451 INLINE_MAILBOX_AVAILABLE_AT,
452 options,
453 )
454 .await?;
455
456 let Some(entry) = mailbox_store
457 .claim_mailbox_entry(
458 &entry_id,
459 consumer_id,
460 now_unix_millis(),
461 DEFAULT_MAILBOX_LEASE_MS,
462 )
463 .await
464 .map_err(mailbox_error)?
465 else {
466 let existing = mailbox_store
467 .load_mailbox_entry(&entry_id)
468 .await
469 .map_err(mailbox_error)?;
470 return Err(match existing {
471 Some(entry) if entry.status == MailboxEntryStatus::Accepted => {
472 ApiError::BadRequest("run has already been accepted".to_string())
473 }
474 Some(entry) if entry.status == MailboxEntryStatus::Superseded => {
475 ApiError::BadRequest("run has been superseded".to_string())
476 }
477 Some(entry) if entry.status == MailboxEntryStatus::Cancelled => {
478 ApiError::BadRequest("run has already been cancelled".to_string())
479 }
480 Some(entry) if entry.status == MailboxEntryStatus::DeadLetter => ApiError::Internal(
481 entry
482 .last_error
483 .unwrap_or_else(|| "mailbox entry moved to dead letter".to_string()),
484 ),
485 Some(_) => ApiError::BadRequest("entry is already claimed".to_string()),
486 None => ApiError::Internal(format!(
487 "mailbox entry '{entry_id}' disappeared before inline dispatch"
488 )),
489 });
490 };
491
492 let claim_token = entry.claim_token.clone().ok_or_else(|| {
493 ApiError::Internal(format!(
494 "mailbox entry '{}' was claimed without claim_token",
495 entry.entry_id
496 ))
497 })?;
498
499 match start_agent_run_for_entry(os, mailbox_store, &entry, false).await {
500 Ok(run) => {
501 ack_claimed_entry(mailbox_store, &entry.entry_id, &claim_token).await?;
502 Ok(run)
503 }
504 Err(MailboxRunStartError::Superseded(error)) => {
505 let _ = mailbox_store
506 .supersede_mailbox_entry(&entry.entry_id, now_unix_millis(), &error)
507 .await
508 .map_err(mailbox_error)?;
509 Err(ApiError::BadRequest(error))
510 }
511 Err(MailboxRunStartError::Busy(error)) => {
512 mailbox_store
513 .cancel_mailbox_entry(&entry.entry_id, now_unix_millis())
514 .await
515 .map_err(mailbox_error)?;
516 Err(ApiError::BadRequest(error))
517 }
518 Err(MailboxRunStartError::Permanent(error))
519 | Err(MailboxRunStartError::Retryable(error)) => {
520 dead_letter_claimed_entry(mailbox_store, &entry.entry_id, &claim_token, &error).await?;
521 Err(ApiError::Internal(error))
522 }
523 Err(MailboxRunStartError::Internal(error)) => {
524 dead_letter_claimed_entry(
525 mailbox_store,
526 &entry.entry_id,
527 &claim_token,
528 &error.to_string(),
529 )
530 .await?;
531 Err(error)
532 }
533 }
534}
535
536pub async fn cancel_pending_for_mailbox(
541 mailbox_store: &Arc<dyn MailboxStore>,
542 mailbox_id: &str,
543 exclude_entry_id: Option<&str>,
544) -> Result<Vec<MailboxEntry>, ApiError> {
545 mailbox_store
546 .cancel_pending_for_mailbox(mailbox_id, now_unix_millis(), exclude_entry_id)
547 .await
548 .map_err(mailbox_error)
549}
550
551pub struct ThreadInterruptResult {
552 pub cancelled_run_id: Option<String>,
553 pub generation: u64,
554 pub superseded_entries: Vec<MailboxEntry>,
555}
556
557pub async fn interrupt_thread(
558 os: &Arc<AgentOs>,
559 read_store: &dyn ThreadReader,
560 mailbox_store: &Arc<dyn MailboxStore>,
561 thread_id: &str,
562) -> Result<ThreadInterruptResult, ApiError> {
563 let interrupted = mailbox_store
564 .interrupt_mailbox(thread_id, now_unix_millis())
565 .await
566 .map_err(mailbox_error)?;
567 let cancelled_run_id = os.cancel_active_run_by_thread(thread_id).await;
568
569 if cancelled_run_id.is_none() && interrupted.superseded_entries.is_empty() {
570 let thread_exists = read_store
571 .load_thread(thread_id)
572 .await
573 .map_err(|err| ApiError::Internal(err.to_string()))?
574 .is_some();
575 if !thread_exists {
576 let mailbox_page = mailbox_store
577 .list_mailbox_entries(&MailboxQuery {
578 mailbox_id: Some(thread_id.to_string()),
579 limit: 1,
580 ..Default::default()
581 })
582 .await
583 .map_err(mailbox_error)?;
584 if mailbox_page.total == 0 {
585 return Err(ApiError::ThreadNotFound(thread_id.to_string()));
586 }
587 }
588 }
589
590 Ok(ThreadInterruptResult {
591 cancelled_run_id,
592 generation: interrupted.mailbox_state.current_generation,
593 superseded_entries: interrupted.superseded_entries,
594 })
595}
596
597#[derive(Debug)]
602pub enum BackgroundTaskLookup {
603 Run(RunRecord),
604 Mailbox(MailboxEntry),
605}
606
607pub async fn load_background_task(
612 read_store: &dyn ThreadReader,
613 mailbox_store: &dyn MailboxReader,
614 id: &str,
615) -> Result<Option<BackgroundTaskLookup>, ApiError> {
616 if let Some(record) = read_store
618 .load_run(id)
619 .await
620 .map_err(|err| ApiError::Internal(err.to_string()))?
621 {
622 return Ok(Some(BackgroundTaskLookup::Run(record)));
623 }
624
625 let Some(entry) = mailbox_store
627 .load_mailbox_entry(id)
628 .await
629 .map_err(mailbox_error)?
630 else {
631 return Ok(None);
632 };
633
634 if entry.status == MailboxEntryStatus::Accepted {
636 if let Some(run_id) = entry.payload.get("run_id").and_then(|v| v.as_str()) {
637 if let Some(record) = read_store
638 .load_run(run_id)
639 .await
640 .map_err(|err| ApiError::Internal(err.to_string()))?
641 {
642 return Ok(Some(BackgroundTaskLookup::Run(record)));
643 }
644 }
645 }
646
647 Ok(Some(BackgroundTaskLookup::Mailbox(entry)))
648}
649
650pub enum CancelBackgroundRunResult {
651 Active,
652 Pending,
653}
654
655pub async fn try_cancel_active_or_queued_run_by_id(
662 os: &Arc<AgentOs>,
663 mailbox_store: &Arc<dyn MailboxStore>,
664 id: &str,
665) -> Result<Option<CancelBackgroundRunResult>, ApiError> {
666 if os.cancel_active_run_by_id(id).await {
668 return Ok(Some(CancelBackgroundRunResult::Active));
669 }
670
671 let cancelled = mailbox_store
673 .cancel_mailbox_entry(id, now_unix_millis())
674 .await
675 .map_err(mailbox_error)?;
676 if cancelled
677 .as_ref()
678 .is_some_and(|entry| entry.status == MailboxEntryStatus::Cancelled)
679 {
680 return Ok(Some(CancelBackgroundRunResult::Pending));
681 }
682
683 let entry = match cancelled {
686 Some(e) => Some(e),
687 None => mailbox_store
688 .load_mailbox_entry(id)
689 .await
690 .map_err(mailbox_error)?,
691 };
692 if let Some(entry) = entry {
693 if entry.status == MailboxEntryStatus::Accepted {
694 if let Some(run_id) = entry
695 .payload
696 .get("run_id")
697 .and_then(|v| v.as_str())
698 .filter(|rid| *rid != id)
699 {
700 if os.cancel_active_run_by_id(run_id).await {
701 return Ok(Some(CancelBackgroundRunResult::Active));
702 }
703 }
704 }
705 }
706
707 Ok(None)
708}
709
710#[derive(Clone)]
715pub struct MailboxDispatcher {
716 mailbox_store: Arc<dyn MailboxStore>,
717 receiver: Arc<dyn MailboxReceiver>,
718 consumer_id: String,
719 poll_interval: Duration,
720 lease_duration_ms: u64,
721 retry_delay_ms: u64,
722 batch_size: usize,
723 max_attempts: u32,
724}
725
726impl MailboxDispatcher {
727 pub fn new(mailbox_store: Arc<dyn MailboxStore>, receiver: Arc<dyn MailboxReceiver>) -> Self {
728 Self {
729 mailbox_store,
730 receiver,
731 consumer_id: format!("mailbox-{}", new_id()),
732 poll_interval: Duration::from_millis(DEFAULT_MAILBOX_POLL_INTERVAL_MS),
733 lease_duration_ms: DEFAULT_MAILBOX_LEASE_MS,
734 retry_delay_ms: DEFAULT_MAILBOX_RETRY_MS,
735 batch_size: DEFAULT_MAILBOX_BATCH_SIZE,
736 max_attempts: DEFAULT_MAILBOX_MAX_ATTEMPTS,
737 }
738 }
739
740 #[must_use]
741 pub fn with_consumer_id(mut self, consumer_id: impl Into<String>) -> Self {
742 self.consumer_id = consumer_id.into();
743 self
744 }
745
746 async fn dispatch_claimed_entry(&self, entry: MailboxEntry) -> Result<(), ApiError> {
747 let claim_token = entry.claim_token.clone().ok_or_else(|| {
748 ApiError::Internal(format!(
749 "mailbox entry '{}' was claimed without claim_token",
750 entry.entry_id
751 ))
752 })?;
753
754 let entry_id = entry.entry_id.as_str();
755
756 if self
758 .mailbox_store
759 .load_mailbox_state(&entry.mailbox_id)
760 .await
761 .map_err(mailbox_error)?
762 .is_some_and(|state| state.current_generation != entry.generation)
763 {
764 let _ = self
765 .mailbox_store
766 .supersede_mailbox_entry(entry_id, now_unix_millis(), "superseded by interrupt")
767 .await;
768 tracing::debug!(entry_id, "mailbox entry superseded by generation mismatch");
769 return Ok(());
770 }
771
772 match self.receiver.receive(&entry).await {
773 ReceiveOutcome::Accepted => {
774 ack_claimed_entry(&self.mailbox_store, entry_id, &claim_token).await?;
775 tracing::debug!(entry_id, "mailbox entry accepted");
776 }
777 ReceiveOutcome::Retry(reason) => {
778 if entry.attempt_count >= self.max_attempts {
779 dead_letter_claimed_entry(
780 &self.mailbox_store,
781 entry_id,
782 &claim_token,
783 &format!("max attempts ({}) exceeded: {reason}", self.max_attempts),
784 )
785 .await?;
786 tracing::warn!(
787 entry_id,
788 attempts = entry.attempt_count,
789 "mailbox entry dead-lettered after max attempts"
790 );
791 } else {
792 nack_claimed_entry(
793 &self.mailbox_store,
794 entry_id,
795 &claim_token,
796 self.retry_delay_ms,
797 &reason,
798 )
799 .await?;
800 tracing::debug!(entry_id, attempts = entry.attempt_count, %reason, "mailbox entry nacked for retry");
801 }
802 }
803 ReceiveOutcome::Reject(reason) => {
804 dead_letter_claimed_entry(&self.mailbox_store, entry_id, &claim_token, &reason)
805 .await?;
806 tracing::warn!(entry_id, %reason, "mailbox entry rejected");
807 }
808 }
809 Ok(())
810 }
811
812 pub async fn dispatch_ready_once(&self) -> Result<usize, ApiError> {
813 let claimed = self
814 .mailbox_store
815 .claim_mailbox_entries(
816 None,
817 self.batch_size,
818 &self.consumer_id,
819 now_unix_millis(),
820 self.lease_duration_ms,
821 )
822 .await
823 .map_err(mailbox_error)?;
824
825 let count = claimed.len();
826 if count == 0 {
827 return Ok(0);
828 }
829
830 let mut futures: FuturesUnordered<_> = claimed
831 .into_iter()
832 .map(|entry| self.dispatch_claimed_entry(entry))
833 .collect();
834
835 let mut first_error: Option<ApiError> = None;
836 while let Some(result) = futures.next().await {
837 if let Err(err) = result {
838 if first_error.is_none() {
839 first_error = Some(err);
840 }
841 }
842 }
843
844 if let Some(err) = first_error {
845 return Err(err);
846 }
847 Ok(count)
848 }
849
850 pub async fn run_forever(self) {
851 let gc_interval = Duration::from_secs(DEFAULT_MAILBOX_GC_INTERVAL_SECS);
852 let mut last_gc = std::time::Instant::now();
853
854 loop {
855 if let Err(err) = self.dispatch_ready_once().await {
856 tracing::error!("mailbox dispatcher failed: {err}");
857 }
858
859 if last_gc.elapsed() >= gc_interval {
860 let cutoff = now_unix_millis().saturating_sub(DEFAULT_MAILBOX_GC_TTL_MS);
861 match self
862 .mailbox_store
863 .purge_terminal_mailbox_entries(cutoff)
864 .await
865 {
866 Ok(0) => {}
867 Ok(n) => tracing::debug!(purged = n, "mailbox GC purged terminal entries"),
868 Err(err) => tracing::warn!("mailbox GC failed: {err}"),
869 }
870 last_gc = std::time::Instant::now();
871 }
872
873 tokio::time::sleep(self.poll_interval).await;
874 }
875 }
876}
877
878#[cfg(test)]
883mod tests {
884 use super::*;
885 use std::sync::Arc;
886 use tirea_agentos::composition::{AgentDefinition, AgentDefinitionSpec, AgentOsBuilder};
887 use tirea_agentos::contracts::runtime::behavior::ReadOnlyContext;
888 use tirea_agentos::contracts::runtime::phase::{ActionSet, BeforeInferenceAction};
889 use tirea_agentos::contracts::{AgentBehavior, TerminationReason};
890 use tirea_contract::storage::{
891 MailboxEntryOrigin, MailboxReader, MailboxWriter, RunOrigin, RunReader, ThreadReader,
892 };
893 use tirea_contract::testing::MailboxEntryBuilder;
894 use tirea_store_adapters::MemoryStore;
895
896 struct TerminatePlugin;
897
898 #[async_trait]
899 impl AgentBehavior for TerminatePlugin {
900 fn id(&self) -> &str {
901 "mailbox_terminate"
902 }
903
904 async fn before_inference(
905 &self,
906 _ctx: &ReadOnlyContext<'_>,
907 ) -> ActionSet<BeforeInferenceAction> {
908 ActionSet::single(BeforeInferenceAction::Terminate(
909 TerminationReason::BehaviorRequested,
910 ))
911 }
912 }
913
914 fn make_os(store: Arc<MemoryStore>) -> Arc<AgentOs> {
915 Arc::new(
916 AgentOsBuilder::new()
917 .with_registered_behavior("mailbox_terminate", Arc::new(TerminatePlugin))
918 .with_agent_spec(AgentDefinitionSpec::local_with_id(
919 "test",
920 AgentDefinition {
921 id: "test".to_string(),
922 behavior_ids: vec!["mailbox_terminate".to_string()],
923 ..Default::default()
924 },
925 ))
926 .with_agent_state_store(store)
927 .build()
928 .expect("build AgentOs"),
929 )
930 }
931
932 #[test]
933 fn mailbox_entry_origin_follows_run_origin() {
934 let mut request = RunRequest {
935 agent_id: "test".to_string(),
936 thread_id: Some("mailbox-origin-thread".to_string()),
937 run_id: Some("mailbox-origin-run".to_string()),
938 parent_run_id: None,
939 parent_thread_id: None,
940 resource_id: None,
941 origin: RunOrigin::Internal,
942 state: None,
943 messages: vec![],
944 initial_decisions: vec![],
945 source_mailbox_entry_id: None,
946 };
947
948 let internal = mailbox_entry_from_request(&request, 0, &EnqueueOptions::default(), 1);
949 assert_eq!(internal.origin, MailboxEntryOrigin::Internal);
950
951 request.origin = RunOrigin::AgUi;
952 let external = mailbox_entry_from_request(&request, 0, &EnqueueOptions::default(), 1);
953 assert_eq!(external.origin, MailboxEntryOrigin::External);
954 }
955
956 #[tokio::test]
957 async fn dispatcher_accepts_enqueued_background_run() {
958 let store = Arc::new(MemoryStore::new());
959 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
960 let os = make_os(store.clone());
961
962 let (thread_id, run_id, _entry_id) = enqueue_background_run(
963 &os,
964 &mailbox_store,
965 "test",
966 RunRequest {
967 agent_id: "test".to_string(),
968 thread_id: Some("mailbox-thread".to_string()),
969 run_id: Some("mailbox-run".to_string()),
970 parent_run_id: None,
971 parent_thread_id: None,
972 resource_id: None,
973 origin: Default::default(),
974 state: None,
975 messages: vec![],
976 initial_decisions: vec![],
977 source_mailbox_entry_id: None,
978 },
979 EnqueueOptions::default(),
980 )
981 .await
982 .expect("enqueue background run");
983 assert_eq!(thread_id, "mailbox-thread");
984 assert_eq!(run_id, "mailbox-run");
985
986 let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
987 MailboxDispatcher::new(mailbox_store.clone(), receiver)
988 .with_consumer_id("test-dispatcher")
989 .dispatch_ready_once()
990 .await
991 .expect("dispatch mailbox run");
992
993 let page = mailbox_store
995 .list_mailbox_entries(&MailboxQuery {
996 mailbox_id: Some("mailbox-thread".to_string()),
997 status: Some(MailboxEntryStatus::Accepted),
998 ..Default::default()
999 })
1000 .await
1001 .expect("list mailbox entries");
1002 assert_eq!(page.items.len(), 1);
1003 assert_eq!(page.items[0].status, MailboxEntryStatus::Accepted);
1004
1005 let run_record = RunReader::load_run(store.as_ref(), &run_id)
1006 .await
1007 .expect("load run record")
1008 .expect("run record should be persisted");
1009 assert_eq!(run_record.thread_id, thread_id);
1010
1011 let thread = ThreadReader::load_thread(store.as_ref(), &thread_id)
1012 .await
1013 .expect("load thread")
1014 .expect("thread should exist");
1015 assert_eq!(thread.id, thread_id);
1016 }
1017
1018 #[tokio::test]
1019 async fn dispatcher_skips_claimed_entry_after_interrupt_supersedes_it() {
1020 let store = Arc::new(MemoryStore::new());
1021 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1022 let os = make_os(store.clone());
1023
1024 let (_thread_id, _run_id, _entry_id) = enqueue_background_run(
1025 &os,
1026 &mailbox_store,
1027 "test",
1028 RunRequest {
1029 agent_id: "test".to_string(),
1030 thread_id: Some("mailbox-supersede-thread".to_string()),
1031 run_id: Some("mailbox-supersede-run".to_string()),
1032 parent_run_id: None,
1033 parent_thread_id: None,
1034 resource_id: None,
1035 origin: Default::default(),
1036 state: None,
1037 messages: vec![],
1038 initial_decisions: vec![],
1039 source_mailbox_entry_id: None,
1040 },
1041 EnqueueOptions::default(),
1042 )
1043 .await
1044 .expect("enqueue background run");
1045
1046 let claimed = mailbox_store
1047 .claim_mailbox_entries(None, 1, "test-dispatcher", now_unix_millis(), 5_000)
1048 .await
1049 .expect("claim mailbox entry");
1050 assert_eq!(claimed.len(), 1);
1051
1052 mailbox_store
1053 .interrupt_mailbox("mailbox-supersede-thread", now_unix_millis())
1054 .await
1055 .expect("interrupt mailbox");
1056
1057 let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
1058 let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1059 .with_consumer_id("test-dispatcher");
1060 dispatcher
1061 .dispatch_claimed_entry(claimed.into_iter().next().expect("claimed entry"))
1062 .await
1063 .expect("dispatch after supersede");
1064
1065 let page = mailbox_store
1067 .list_mailbox_entries(&MailboxQuery {
1068 mailbox_id: Some("mailbox-supersede-thread".to_string()),
1069 ..Default::default()
1070 })
1071 .await
1072 .expect("list mailbox entries");
1073 assert_eq!(page.items.len(), 1);
1074 assert_eq!(page.items[0].status, MailboxEntryStatus::Superseded);
1075 }
1076
1077 #[tokio::test]
1082 async fn agent_receiver_rejects_invalid_payload() {
1083 let store = Arc::new(MemoryStore::new());
1084 let os = make_os(store.clone());
1085 let receiver = AgentReceiver::new(os);
1086
1087 let entry = MailboxEntryBuilder::queued("entry-bad-json", "mailbox-bad")
1088 .with_payload(serde_json::json!("not a valid RunRequest"))
1089 .claimed("token", "test", u64::MAX)
1090 .build();
1091
1092 match receiver.receive(&entry).await {
1093 ReceiveOutcome::Reject(reason) => {
1094 assert!(reason.contains("invalid payload"), "got: {reason}");
1095 }
1096 other => panic!("expected Reject, got {other:?}"),
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn agent_receiver_rejects_unknown_agent() {
1102 let store = Arc::new(MemoryStore::new());
1103 let os = make_os(store.clone());
1104 let receiver = AgentReceiver::new(os);
1105
1106 let request = RunRequest {
1107 agent_id: "nonexistent-agent".to_string(),
1108 thread_id: Some("thread-unknown".to_string()),
1109 run_id: Some("run-unknown".to_string()),
1110 parent_run_id: None,
1111 parent_thread_id: None,
1112 resource_id: None,
1113 origin: Default::default(),
1114 state: None,
1115 messages: vec![],
1116 initial_decisions: vec![],
1117 source_mailbox_entry_id: None,
1118 };
1119
1120 let entry = MailboxEntryBuilder::queued("entry-unknown-agent", "mailbox-unknown")
1121 .with_payload(serde_json::to_value(&request).unwrap())
1122 .claimed("token", "test", u64::MAX)
1123 .build();
1124
1125 match receiver.receive(&entry).await {
1126 ReceiveOutcome::Reject(reason) => {
1127 assert!(
1128 reason.contains("nonexistent-agent")
1129 || reason.contains("not found")
1130 || reason.contains("resolve"),
1131 "expected resolve error, got: {reason}"
1132 );
1133 }
1134 other => panic!("expected Reject for unknown agent, got {other:?}"),
1135 }
1136 }
1137
1138 #[tokio::test]
1139 async fn agent_receiver_accepts_valid_request() {
1140 let store = Arc::new(MemoryStore::new());
1141 let os = make_os(store.clone());
1142 let receiver = AgentReceiver::new(os);
1143
1144 let request = RunRequest {
1145 agent_id: "test".to_string(),
1146 thread_id: Some("thread-valid".to_string()),
1147 run_id: Some("run-valid".to_string()),
1148 parent_run_id: None,
1149 parent_thread_id: None,
1150 resource_id: None,
1151 origin: Default::default(),
1152 state: None,
1153 messages: vec![],
1154 initial_decisions: vec![],
1155 source_mailbox_entry_id: None,
1156 };
1157
1158 let entry = MailboxEntryBuilder::queued("entry-valid", "thread-valid")
1159 .with_payload(serde_json::to_value(&request).unwrap())
1160 .claimed("token", "test", u64::MAX)
1161 .build();
1162
1163 match receiver.receive(&entry).await {
1164 ReceiveOutcome::Accepted => {} other => panic!("expected Accepted, got {other:?}"),
1166 }
1167
1168 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1170
1171 let run = RunReader::load_run(store.as_ref(), "run-valid")
1173 .await
1174 .expect("load run")
1175 .expect("run should be persisted");
1176 assert_eq!(run.thread_id, "thread-valid");
1177 }
1178
1179 struct MockReceiver {
1184 outcome: std::sync::Mutex<Option<ReceiveOutcome>>,
1185 }
1186
1187 impl MockReceiver {
1188 fn always(outcome: ReceiveOutcome) -> Self {
1189 Self {
1190 outcome: std::sync::Mutex::new(Some(outcome)),
1191 }
1192 }
1193 }
1194
1195 #[async_trait]
1196 impl MailboxReceiver for MockReceiver {
1197 async fn receive(&self, _entry: &MailboxEntry) -> ReceiveOutcome {
1198 self.outcome
1199 .lock()
1200 .unwrap()
1201 .clone()
1202 .unwrap_or(ReceiveOutcome::Accepted)
1203 }
1204 }
1205
1206 #[tokio::test]
1207 async fn dispatcher_reject_outcome_dead_letters_entry() {
1208 let store = Arc::new(MemoryStore::new());
1209 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1210
1211 store.ensure_mailbox_state("mbx-reject", 1).await.unwrap();
1212 let entry = MailboxEntryBuilder::queued("entry-reject", "mbx-reject")
1213 .with_payload(serde_json::json!({"test": true}))
1214 .build();
1215 store.enqueue_mailbox_entry(&entry).await.unwrap();
1216
1217 let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1218 ReceiveOutcome::Reject("bad message".to_string()),
1219 ));
1220 MailboxDispatcher::new(mailbox_store.clone(), receiver)
1221 .with_consumer_id("test-dispatcher")
1222 .dispatch_ready_once()
1223 .await
1224 .expect("dispatch should succeed");
1225
1226 let loaded = store
1227 .load_mailbox_entry("entry-reject")
1228 .await
1229 .unwrap()
1230 .unwrap();
1231 assert_eq!(loaded.status, MailboxEntryStatus::DeadLetter);
1232 assert_eq!(loaded.last_error.as_deref(), Some("bad message"));
1233 }
1234
1235 #[tokio::test]
1236 async fn dispatcher_retry_outcome_nacks_entry_for_retry() {
1237 let store = Arc::new(MemoryStore::new());
1238 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1239
1240 store.ensure_mailbox_state("mbx-retry", 1).await.unwrap();
1241 let entry = MailboxEntryBuilder::queued("entry-retry", "mbx-retry")
1242 .with_payload(serde_json::json!({"test": true}))
1243 .build();
1244 store.enqueue_mailbox_entry(&entry).await.unwrap();
1245
1246 let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1247 ReceiveOutcome::Retry("try later".to_string()),
1248 ));
1249 MailboxDispatcher::new(mailbox_store.clone(), receiver)
1250 .with_consumer_id("test-dispatcher")
1251 .dispatch_ready_once()
1252 .await
1253 .expect("dispatch should succeed");
1254
1255 let loaded = store
1256 .load_mailbox_entry("entry-retry")
1257 .await
1258 .unwrap()
1259 .unwrap();
1260 assert_eq!(loaded.status, MailboxEntryStatus::Queued);
1261 assert_eq!(loaded.last_error.as_deref(), Some("try later"));
1262 assert_eq!(loaded.attempt_count, 1);
1263 }
1264
1265 #[tokio::test]
1266 async fn dispatcher_retry_at_max_attempts_dead_letters() {
1267 let store = Arc::new(MemoryStore::new());
1268 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1269
1270 store.ensure_mailbox_state("mbx-max", 1).await.unwrap();
1271 let entry = MailboxEntryBuilder::queued("entry-max", "mbx-max")
1272 .with_payload(serde_json::json!({"test": true}))
1273 .with_attempt_count(10)
1274 .build(); store.enqueue_mailbox_entry(&entry).await.unwrap();
1276
1277 let receiver: Arc<dyn MailboxReceiver> = Arc::new(MockReceiver::always(
1278 ReceiveOutcome::Retry("still failing".to_string()),
1279 ));
1280 let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1281 .with_consumer_id("test-dispatcher");
1282 dispatcher
1283 .dispatch_ready_once()
1284 .await
1285 .expect("dispatch should succeed");
1286
1287 let loaded = store
1288 .load_mailbox_entry("entry-max")
1289 .await
1290 .unwrap()
1291 .unwrap();
1292 assert_eq!(loaded.status, MailboxEntryStatus::DeadLetter);
1293 assert!(
1294 loaded
1295 .last_error
1296 .as_deref()
1297 .unwrap()
1298 .contains("max attempts"),
1299 "error should mention max attempts: {:?}",
1300 loaded.last_error
1301 );
1302 }
1303
1304 #[tokio::test]
1305 async fn dispatcher_generation_mismatch_supersedes_entry() {
1306 let store = Arc::new(MemoryStore::new());
1307 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1308
1309 store.ensure_mailbox_state("mbx-genm", 1).await.unwrap();
1310 let entry = MailboxEntryBuilder::queued("entry-genm", "mbx-genm")
1311 .with_payload(serde_json::json!({"test": true}))
1312 .build();
1313 store.enqueue_mailbox_entry(&entry).await.unwrap();
1314
1315 let claimed = store
1317 .claim_mailbox_entries(None, 1, "test-dispatcher", now_unix_millis(), 30_000)
1318 .await
1319 .unwrap();
1320 assert_eq!(claimed.len(), 1);
1321 let claimed_entry = claimed.into_iter().next().unwrap();
1322
1323 store
1325 .interrupt_mailbox("mbx-genm", now_unix_millis())
1326 .await
1327 .unwrap();
1328
1329 let receiver: Arc<dyn MailboxReceiver> =
1331 Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1332 let dispatcher = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1333 .with_consumer_id("test-dispatcher");
1334 dispatcher
1335 .dispatch_claimed_entry(claimed_entry)
1336 .await
1337 .expect("dispatch should succeed");
1338
1339 let loaded = store
1340 .load_mailbox_entry("entry-genm")
1341 .await
1342 .unwrap()
1343 .unwrap();
1344 assert_eq!(loaded.status, MailboxEntryStatus::Superseded);
1345 }
1346
1347 #[tokio::test]
1348 async fn dispatcher_accept_outcome_acks_entry() {
1349 let store = Arc::new(MemoryStore::new());
1350 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1351
1352 store.ensure_mailbox_state("mbx-ack", 1).await.unwrap();
1353 let entry = MailboxEntryBuilder::queued("entry-ack", "mbx-ack")
1354 .with_payload(serde_json::json!({"test": true}))
1355 .build();
1356 store.enqueue_mailbox_entry(&entry).await.unwrap();
1357
1358 let receiver: Arc<dyn MailboxReceiver> =
1359 Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1360 MailboxDispatcher::new(mailbox_store.clone(), receiver)
1361 .with_consumer_id("test-dispatcher")
1362 .dispatch_ready_once()
1363 .await
1364 .expect("dispatch should succeed");
1365
1366 let loaded = store
1367 .load_mailbox_entry("entry-ack")
1368 .await
1369 .unwrap()
1370 .unwrap();
1371 assert_eq!(loaded.status, MailboxEntryStatus::Accepted);
1372 }
1373
1374 #[tokio::test]
1375 async fn dispatcher_empty_mailbox_returns_zero() {
1376 let store = Arc::new(MemoryStore::new());
1377 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1378
1379 let receiver: Arc<dyn MailboxReceiver> =
1380 Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1381 let count = MailboxDispatcher::new(mailbox_store, receiver)
1382 .with_consumer_id("test-dispatcher")
1383 .dispatch_ready_once()
1384 .await
1385 .expect("dispatch should succeed");
1386 assert_eq!(count, 0);
1387 }
1388
1389 #[tokio::test]
1390 async fn dispatcher_processes_multiple_entries_in_batch() {
1391 let store = Arc::new(MemoryStore::new());
1392 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1393
1394 for i in 0..5 {
1397 let mid = format!("mbx-batch-{i}");
1398 store.ensure_mailbox_state(&mid, 1).await.unwrap();
1399 let entry = MailboxEntryBuilder::queued(format!("entry-batch-{i}"), &mid)
1400 .with_payload(serde_json::json!({"test": true}))
1401 .build();
1402 store.enqueue_mailbox_entry(&entry).await.unwrap();
1403 }
1404
1405 let receiver: Arc<dyn MailboxReceiver> =
1406 Arc::new(MockReceiver::always(ReceiveOutcome::Accepted));
1407 let count = MailboxDispatcher::new(mailbox_store.clone(), receiver)
1408 .with_consumer_id("test-dispatcher")
1409 .dispatch_ready_once()
1410 .await
1411 .expect("dispatch should succeed");
1412 assert_eq!(count, 5);
1413
1414 let page = mailbox_store
1416 .list_mailbox_entries(&MailboxQuery {
1417 status: Some(MailboxEntryStatus::Accepted),
1418 limit: 100,
1419 ..Default::default()
1420 })
1421 .await
1422 .unwrap();
1423 assert_eq!(page.total, 5);
1424 }
1425
1426 #[tokio::test]
1431 async fn enqueue_background_run_returns_three_tuple() {
1432 let store = Arc::new(MemoryStore::new());
1433 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1434 let os = make_os(store.clone());
1435
1436 let (thread_id, run_id, entry_id) = enqueue_background_run(
1437 &os,
1438 &mailbox_store,
1439 "test",
1440 RunRequest {
1441 agent_id: "test".to_string(),
1442 thread_id: Some("thread-3tuple".to_string()),
1443 run_id: Some("run-3tuple".to_string()),
1444 parent_run_id: None,
1445 parent_thread_id: None,
1446 resource_id: None,
1447 origin: Default::default(),
1448 state: None,
1449 messages: vec![],
1450 initial_decisions: vec![],
1451 source_mailbox_entry_id: None,
1452 },
1453 EnqueueOptions::default(),
1454 )
1455 .await
1456 .expect("enqueue");
1457
1458 assert_eq!(thread_id, "thread-3tuple");
1459 assert_eq!(run_id, "run-3tuple");
1460 assert!(!entry_id.is_empty());
1461
1462 let entry = mailbox_store
1464 .load_mailbox_entry(&entry_id)
1465 .await
1466 .unwrap()
1467 .expect("entry should exist");
1468 assert_eq!(entry.mailbox_id, "thread-3tuple");
1469 assert_eq!(entry.status, MailboxEntryStatus::Queued);
1470
1471 assert_eq!(
1473 entry.payload.get("run_id").and_then(|v| v.as_str()),
1474 Some("run-3tuple")
1475 );
1476 }
1477
1478 #[tokio::test]
1479 async fn enqueue_background_run_rejects_unknown_agent() {
1480 let store = Arc::new(MemoryStore::new());
1481 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1482 let os = make_os(store.clone());
1483
1484 let result = enqueue_background_run(
1485 &os,
1486 &mailbox_store,
1487 "nonexistent",
1488 RunRequest {
1489 agent_id: "nonexistent".to_string(),
1490 thread_id: Some("thread-no-agent".to_string()),
1491 run_id: Some("run-no-agent".to_string()),
1492 parent_run_id: None,
1493 parent_thread_id: None,
1494 resource_id: None,
1495 origin: Default::default(),
1496 state: None,
1497 messages: vec![],
1498 initial_decisions: vec![],
1499 source_mailbox_entry_id: None,
1500 },
1501 EnqueueOptions::default(),
1502 )
1503 .await;
1504
1505 assert!(result.is_err());
1506 }
1507
1508 #[tokio::test]
1509 async fn load_background_task_finds_by_entry_id() {
1510 let store = Arc::new(MemoryStore::new());
1511 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1512 let os = make_os(store.clone());
1513
1514 let (_thread_id, _run_id, entry_id) = enqueue_background_run(
1515 &os,
1516 &mailbox_store,
1517 "test",
1518 RunRequest {
1519 agent_id: "test".to_string(),
1520 thread_id: Some("thread-lookup".to_string()),
1521 run_id: Some("run-lookup".to_string()),
1522 parent_run_id: None,
1523 parent_thread_id: None,
1524 resource_id: None,
1525 origin: Default::default(),
1526 state: None,
1527 messages: vec![],
1528 initial_decisions: vec![],
1529 source_mailbox_entry_id: None,
1530 },
1531 EnqueueOptions::default(),
1532 )
1533 .await
1534 .expect("enqueue");
1535
1536 let task = load_background_task(store.as_ref(), store.as_ref(), &entry_id)
1538 .await
1539 .expect("load background task");
1540 assert!(matches!(task, Some(BackgroundTaskLookup::Mailbox(_))));
1541 }
1542
1543 #[tokio::test]
1544 async fn load_background_task_cross_references_accepted_entry() {
1545 let store = Arc::new(MemoryStore::new());
1546 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1547 let os = make_os(store.clone());
1548
1549 let (thread_id, run_id, entry_id) = enqueue_background_run(
1550 &os,
1551 &mailbox_store,
1552 "test",
1553 RunRequest {
1554 agent_id: "test".to_string(),
1555 thread_id: Some("thread-xref".to_string()),
1556 run_id: Some("run-xref".to_string()),
1557 parent_run_id: None,
1558 parent_thread_id: None,
1559 resource_id: None,
1560 origin: Default::default(),
1561 state: None,
1562 messages: vec![],
1563 initial_decisions: vec![],
1564 source_mailbox_entry_id: None,
1565 },
1566 EnqueueOptions::default(),
1567 )
1568 .await
1569 .expect("enqueue");
1570
1571 let receiver: Arc<dyn MailboxReceiver> = Arc::new(AgentReceiver::new(os.clone()));
1573 MailboxDispatcher::new(mailbox_store.clone(), receiver)
1574 .with_consumer_id("test-xref")
1575 .dispatch_ready_once()
1576 .await
1577 .expect("dispatch");
1578
1579 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1581
1582 let task = load_background_task(store.as_ref(), store.as_ref(), &entry_id)
1584 .await
1585 .expect("load background task");
1586 match task {
1587 Some(BackgroundTaskLookup::Run(record)) => {
1588 assert_eq!(record.run_id, run_id);
1589 assert_eq!(record.thread_id, thread_id);
1590 }
1591 other => panic!("expected Run variant, got {other:?}"),
1592 }
1593 }
1594
1595 #[tokio::test]
1596 async fn load_background_task_returns_none_for_unknown_id() {
1597 let store = Arc::new(MemoryStore::new());
1598 let task = load_background_task(store.as_ref(), store.as_ref(), "nonexistent")
1599 .await
1600 .expect("load background task");
1601 assert!(task.is_none());
1602 }
1603
1604 #[tokio::test]
1605 async fn try_cancel_queued_entry_by_entry_id() {
1606 let store = Arc::new(MemoryStore::new());
1607 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1608 let os = make_os(store.clone());
1609
1610 let (_thread_id, _run_id, entry_id) = enqueue_background_run(
1611 &os,
1612 &mailbox_store,
1613 "test",
1614 RunRequest {
1615 agent_id: "test".to_string(),
1616 thread_id: Some("thread-cancel-q".to_string()),
1617 run_id: Some("run-cancel-q".to_string()),
1618 parent_run_id: None,
1619 parent_thread_id: None,
1620 resource_id: None,
1621 origin: Default::default(),
1622 state: None,
1623 messages: vec![],
1624 initial_decisions: vec![],
1625 source_mailbox_entry_id: None,
1626 },
1627 EnqueueOptions::default(),
1628 )
1629 .await
1630 .expect("enqueue");
1631
1632 let result = try_cancel_active_or_queued_run_by_id(&os, &mailbox_store, &entry_id).await;
1633 assert!(matches!(
1634 result,
1635 Ok(Some(CancelBackgroundRunResult::Pending))
1636 ));
1637
1638 let entry = store.load_mailbox_entry(&entry_id).await.unwrap().unwrap();
1639 assert_eq!(entry.status, MailboxEntryStatus::Cancelled);
1640 }
1641
1642 #[tokio::test]
1643 async fn try_cancel_returns_none_for_unknown_id() {
1644 let store = Arc::new(MemoryStore::new());
1645 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1646 let os = make_os(store.clone());
1647
1648 let result =
1649 try_cancel_active_or_queued_run_by_id(&os, &mailbox_store, "nonexistent").await;
1650 assert!(matches!(result, Ok(None)));
1651 }
1652
1653 #[tokio::test]
1654 async fn cancel_pending_for_mailbox_excludes_specified_entry() {
1655 let store = Arc::new(MemoryStore::new());
1656 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1657 let os = make_os(store.clone());
1658
1659 let (_t1, _r1, entry1) = enqueue_background_run(
1660 &os,
1661 &mailbox_store,
1662 "test",
1663 RunRequest {
1664 agent_id: "test".to_string(),
1665 thread_id: Some("thread-cancel-pen".to_string()),
1666 run_id: Some("run-cancel-pen-1".to_string()),
1667 parent_run_id: None,
1668 parent_thread_id: None,
1669 resource_id: None,
1670 origin: Default::default(),
1671 state: None,
1672 messages: vec![],
1673 initial_decisions: vec![],
1674 source_mailbox_entry_id: None,
1675 },
1676 EnqueueOptions::default(),
1677 )
1678 .await
1679 .expect("enqueue 1");
1680
1681 let (_t2, _r2, entry2) = enqueue_background_run(
1682 &os,
1683 &mailbox_store,
1684 "test",
1685 RunRequest {
1686 agent_id: "test".to_string(),
1687 thread_id: Some("thread-cancel-pen".to_string()),
1688 run_id: Some("run-cancel-pen-2".to_string()),
1689 parent_run_id: None,
1690 parent_thread_id: None,
1691 resource_id: None,
1692 origin: Default::default(),
1693 state: None,
1694 messages: vec![],
1695 initial_decisions: vec![],
1696 source_mailbox_entry_id: None,
1697 },
1698 EnqueueOptions::default(),
1699 )
1700 .await
1701 .expect("enqueue 2");
1702
1703 let cancelled =
1705 cancel_pending_for_mailbox(&mailbox_store, "thread-cancel-pen", Some(&entry1))
1706 .await
1707 .expect("cancel pending");
1708 assert_eq!(cancelled.len(), 1);
1709 assert_eq!(cancelled[0].entry_id, entry2);
1710
1711 let e1 = store.load_mailbox_entry(&entry1).await.unwrap().unwrap();
1713 assert_eq!(e1.status, MailboxEntryStatus::Queued);
1714
1715 let e2 = store.load_mailbox_entry(&entry2).await.unwrap().unwrap();
1717 assert_eq!(e2.status, MailboxEntryStatus::Cancelled);
1718 }
1719
1720 #[tokio::test]
1721 async fn enqueue_generates_ids_when_not_provided() {
1722 let store = Arc::new(MemoryStore::new());
1723 let mailbox_store: Arc<dyn MailboxStore> = store.clone();
1724 let os = make_os(store.clone());
1725
1726 let (thread_id, run_id, entry_id) = enqueue_background_run(
1727 &os,
1728 &mailbox_store,
1729 "test",
1730 RunRequest {
1731 agent_id: "test".to_string(),
1732 thread_id: None,
1733 run_id: None,
1734 parent_run_id: None,
1735 parent_thread_id: None,
1736 resource_id: None,
1737 origin: Default::default(),
1738 state: None,
1739 messages: vec![],
1740 initial_decisions: vec![],
1741 source_mailbox_entry_id: None,
1742 },
1743 EnqueueOptions::default(),
1744 )
1745 .await
1746 .expect("enqueue");
1747
1748 assert!(!thread_id.is_empty());
1749 assert!(!run_id.is_empty());
1750 assert!(!entry_id.is_empty());
1751 assert_ne!(thread_id, run_id);
1753 assert_ne!(run_id, entry_id);
1754 }
1755}