tirea_agentos/runtime/background_tasks/
wrapper.rs

1//! `BackgroundCapable<T>` — decorator that adds `run_in_background` support to any tool.
2//!
3//! Tools that support background execution must implement [`BackgroundExecutable`]
4//! in addition to [`Tool`]. The trait provides a context-free execution path
5//! that can safely cross `tokio::spawn` boundaries.
6//!
7//! The wrapper handles the full task lifecycle:
8//! - Persistence: create/resume tasks in [`TaskStore`]
9//! - Background spawning via [`BackgroundTaskManager`]
10//! - Resume: pass existing `task_id` to re-execute stopped tasks
11//! - Status queries: return current status for running/terminal tasks
12
13use super::manager::BackgroundTaskManager;
14use super::types::*;
15use super::{new_task_id, NewTaskSpec, TaskStore, TaskStoreError};
16use crate::contracts::runtime::tool_call::{
17    Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult,
18};
19use crate::runtime::loop_runner::RunCancellationToken;
20use async_trait::async_trait;
21use serde_json::{json, Value};
22use std::sync::Arc;
23
24const RUN_IN_BACKGROUND_PARAM: &str = "run_in_background";
25
26/// Typed runtime context passed into background executions after the spawn boundary.
27#[derive(Debug, Clone, Default, PartialEq, Eq)]
28pub struct BackgroundExecutionContext {
29    pub owner_thread_id: String,
30    pub parent_task_id: Option<String>,
31    pub is_resume: bool,
32}
33
34/// Context-free background execution trait.
35///
36/// Tools implement this to opt into background execution support via
37/// [`BackgroundCapable`]. The wrapper handles persistence, resume, and
38/// background spawning — tools only implement execution logic.
39#[async_trait]
40pub trait BackgroundExecutable: Tool {
41    /// Background-only prepared input computed before crossing the spawn boundary.
42    type PreparedBackground: Send + 'static;
43
44    /// Task type label for persistence (e.g. `"agent_run"`, `"shell"`).
45    fn task_type(&self) -> &str;
46
47    /// Whether stopped tasks can be resumed with the same task_id.
48    fn supports_resume(&self) -> bool {
49        false
50    }
51
52    /// Metadata to persist alongside the task (e.g. agent_id, thread_id).
53    fn task_metadata(&self, _args: &Value) -> Value {
54        json!({})
55    }
56
57    /// Extract an existing task_id from args (for resume/status queries).
58    /// Return `None` for new tasks.
59    fn task_id_from_args(&self, _args: &Value) -> Option<String> {
60        None
61    }
62
63    /// Inject a generated task_id into the args before passing to execute.
64    /// Default implementation sets `args["task_id"]`.
65    fn set_task_id_in_args(&self, args: &mut Value, task_id: &str) {
66        if let Some(obj) = args.as_object_mut() {
67            obj.insert("task_id".to_string(), json!(task_id));
68        }
69    }
70
71    /// Human-readable task description for the background task listing.
72    fn task_description(&self, args: &Value) -> String {
73        let tool_name = self.descriptor().name.clone();
74        format!(
75            "{} (background)",
76            args.get("description")
77                .or_else(|| args.get("command"))
78                .and_then(Value::as_str)
79                .unwrap_or(&tool_name)
80        )
81    }
82
83    /// Validate and normalize arguments for background execution before any
84    /// task state is persisted or spawned.
85    ///
86    /// Tools can use this to perform the same semantic checks they enforce in
87    /// foreground mode and to inject hidden, precomputed execution inputs that
88    /// can safely cross the spawn boundary.
89    #[allow(clippy::result_large_err)]
90    fn prepare_background(
91        &self,
92        _args: &mut Value,
93        _ctx: &ToolCallContext<'_>,
94        _is_resume: bool,
95    ) -> Result<Self::PreparedBackground, ToolResult>;
96
97    /// Execute the tool logic in foreground mode.
98    ///
99    /// Tools can override this when foreground execution needs typed control
100    /// data (for example, resume state) that should not be smuggled through
101    /// hidden JSON fields.
102    async fn execute_foreground(
103        &self,
104        args: Value,
105        ctx: &ToolCallContext<'_>,
106        _is_resume: bool,
107    ) -> Result<ToolResult, ToolError> {
108        self.execute(args, ctx).await
109    }
110
111    /// Derive the durable terminal task status for a foreground execution.
112    ///
113    /// The default implementation treats tool-level errors as failed tasks and
114    /// every other result as completed.
115    fn foreground_task_status(&self, result: &ToolResult) -> (TaskStatus, Option<String>) {
116        if result.is_error() {
117            (TaskStatus::Failed, result.message.clone())
118        } else {
119            (TaskStatus::Completed, None)
120        }
121    }
122
123    /// Execute the tool logic in background mode.
124    ///
125    /// Receives a `task_id` (new or resumed) and a cancellation token but NO
126    /// `ToolCallContext`. Tools that need state access should capture what they
127    /// need from args before the spawn boundary.
128    async fn execute_background(
129        &self,
130        task_id: &str,
131        args: Value,
132        prepared: Self::PreparedBackground,
133        execution: BackgroundExecutionContext,
134        cancel_token: RunCancellationToken,
135    ) -> TaskResult;
136}
137
138/// Wraps a tool that implements [`BackgroundExecutable`] to provide full task
139/// lifecycle management:
140///
141/// - **New tasks**: generates `task_id`, persists, executes foreground or spawns background.
142/// - **Resume**: if `task_id` is in args and the task is `Stopped`, re-executes.
143/// - **Status query**: if `task_id` is in args and the task is running/terminal, returns status.
144/// - **Orphan detection**: if a persisted `Running` task has no live handle, marks it `Stopped`.
145pub struct BackgroundCapable<T: BackgroundExecutable> {
146    inner: Arc<T>,
147    manager: Arc<BackgroundTaskManager>,
148    task_store: Option<Arc<TaskStore>>,
149}
150
151impl<T: BackgroundExecutable> BackgroundCapable<T> {
152    pub fn new(inner: T, manager: Arc<BackgroundTaskManager>) -> Self {
153        Self {
154            inner: Arc::new(inner),
155            manager,
156            task_store: None,
157        }
158    }
159
160    pub fn from_arc(inner: Arc<T>, manager: Arc<BackgroundTaskManager>) -> Self {
161        Self {
162            inner,
163            manager,
164            task_store: None,
165        }
166    }
167
168    pub fn with_task_store(mut self, task_store: Option<Arc<TaskStore>>) -> Self {
169        self.task_store = task_store;
170        self
171    }
172}
173
174/// Internal: result of looking up a task by id.
175struct TaskLookup {
176    summary: TaskSummary,
177    /// Whether this summary came from a live in-memory handle (vs durable store only).
178    is_live: bool,
179}
180
181/// Bundled execution parameters for `execute_task`.
182struct ExecuteParams<'a> {
183    task_id: &'a str,
184    owner_thread_id: &'a str,
185    background: bool,
186    is_resume: bool,
187    parent_task_id: Option<&'a str>,
188}
189
190impl<T: BackgroundExecutable + 'static> BackgroundCapable<T> {
191    /// Look up a task by id, merging live (in-memory) and durable (TaskStore) sources.
192    /// Live takes precedence when both exist.
193    async fn lookup_task(
194        &self,
195        owner_thread_id: &str,
196        task_id: &str,
197    ) -> Result<Option<TaskLookup>, TaskStoreError> {
198        let live = self.manager.get(owner_thread_id, task_id).await;
199        if let Some(summary) = live {
200            return Ok(Some(TaskLookup {
201                summary,
202                is_live: true,
203            }));
204        }
205
206        if let Some(store) = &self.task_store {
207            if let Some(task) = store.load_task_for_owner(owner_thread_id, task_id).await? {
208                return Ok(Some(TaskLookup {
209                    summary: task.summary(),
210                    is_live: false,
211                }));
212            }
213        }
214
215        Ok(None)
216    }
217
218    /// Persist task creation (new) or attempt increment (resume).
219    async fn persist_start(
220        &self,
221        task_id: &str,
222        owner_thread_id: &str,
223        is_resume: bool,
224        description: &str,
225        parent_task_id: Option<&str>,
226        metadata: &Value,
227    ) -> Result<(), ToolError> {
228        let Some(store) = &self.task_store else {
229            return Ok(());
230        };
231
232        if is_resume {
233            // Verify ownership and increment attempt.
234            match store.load_task(task_id).await {
235                Ok(Some(task)) => {
236                    if task.owner_thread_id != owner_thread_id {
237                        return Err(ToolError::ExecutionFailed(format!(
238                            "task '{}' belongs to a different owner",
239                            task_id
240                        )));
241                    }
242                    store.start_task_attempt(task_id).await.map_err(|e| {
243                        ToolError::ExecutionFailed(format!("task persist failed: {e}"))
244                    })?;
245                }
246                Ok(None) => {
247                    // Resuming a non-existent task — create it.
248                    self.create_task(
249                        store,
250                        task_id,
251                        owner_thread_id,
252                        description,
253                        parent_task_id,
254                        metadata,
255                    )
256                    .await?;
257                }
258                Err(e) => {
259                    return Err(ToolError::ExecutionFailed(format!(
260                        "task persist failed: {e}"
261                    )));
262                }
263            }
264        } else {
265            self.create_task(
266                store,
267                task_id,
268                owner_thread_id,
269                description,
270                parent_task_id,
271                metadata,
272            )
273            .await?;
274        }
275
276        Ok(())
277    }
278
279    async fn create_task(
280        &self,
281        store: &TaskStore,
282        task_id: &str,
283        owner_thread_id: &str,
284        description: &str,
285        parent_task_id: Option<&str>,
286        metadata: &Value,
287    ) -> Result<(), ToolError> {
288        store
289            .create_task(NewTaskSpec {
290                task_id: task_id.to_string(),
291                owner_thread_id: owner_thread_id.to_string(),
292                task_type: self.inner.task_type().to_string(),
293                description: description.to_string(),
294                parent_task_id: parent_task_id.map(str::to_string),
295                supports_resume: self.inner.supports_resume(),
296                metadata: metadata.clone(),
297            })
298            .await
299            .map_err(|e| ToolError::ExecutionFailed(format!("task persist failed: {e}")))?;
300        Ok(())
301    }
302
303    /// Persist terminal status after foreground completion.
304    async fn persist_result(
305        &self,
306        task_id: &str,
307        status: TaskStatus,
308        error: Option<String>,
309    ) -> Result<(), ToolError> {
310        let Some(store) = &self.task_store else {
311            return Ok(());
312        };
313        store
314            .persist_foreground_result(task_id, status, error, None)
315            .await
316            .map_err(|e| ToolError::ExecutionFailed(format!("task persist failed: {e}")))?;
317        Ok(())
318    }
319
320    /// Mark an orphaned running task as stopped.
321    async fn mark_orphan_stopped(&self, task_id: &str) -> Result<(), ToolError> {
322        let Some(store) = &self.task_store else {
323            return Ok(());
324        };
325        store
326            .persist_foreground_result(
327                task_id,
328                TaskStatus::Stopped,
329                Some("No live executor found in current process; marked stopped".to_string()),
330                None,
331            )
332            .await
333            .map_err(|e| {
334                ToolError::ExecutionFailed(format!("failed to mark orphan stopped: {e}"))
335            })?;
336        Ok(())
337    }
338
339    fn status_result(&self, task_id: &str, summary: &TaskSummary) -> ToolResult {
340        let tool_name = self.inner.descriptor().name.clone();
341        let agent_id = summary
342            .metadata
343            .get("agent_id")
344            .and_then(|v| v.as_str())
345            .unwrap_or("unknown");
346        ToolResult::success(
347            &tool_name,
348            json!({
349                "task_id": task_id,
350                "agent_id": agent_id,
351                "status": summary.status.as_str(),
352                "error": summary.error,
353            }),
354        )
355    }
356
357    fn extract_owner_thread_id(&self, ctx: &ToolCallContext<'_>) -> String {
358        ctx.caller_context()
359            .thread_id()
360            .unwrap_or(ctx.source())
361            .to_string()
362    }
363
364    /// Merge stored task metadata into args for resume (fills in agent_id etc.
365    /// that the caller may not have supplied).
366    fn enrich_args_for_resume(args: &mut Value, metadata: &Value) {
367        if let (Some(obj), Some(meta)) = (args.as_object_mut(), metadata.as_object()) {
368            for (k, v) in meta {
369                if !obj.contains_key(k) {
370                    obj.insert(k.clone(), v.clone());
371                }
372            }
373        }
374    }
375
376    /// Handle the resume/status query path when a task_id is found in args.
377    async fn handle_existing_task(
378        &self,
379        mut args: Value,
380        ctx: &ToolCallContext<'_>,
381        task_id: String,
382        owner_thread_id: &str,
383        background: bool,
384        parent_task_id: Option<&str>,
385    ) -> Result<ToolResult, ToolError> {
386        let lookup = self
387            .lookup_task(owner_thread_id, &task_id)
388            .await
389            .map_err(|e| ToolError::ExecutionFailed(format!("task lookup failed: {e}")))?;
390
391        let Some(lookup) = lookup else {
392            return Ok(ToolResult::error(
393                self.inner.descriptor().name,
394                format!("Unknown task: {task_id}"),
395            ));
396        };
397
398        match lookup.summary.status {
399            // Running or terminal: return current status without executing.
400            TaskStatus::Running
401            | TaskStatus::Completed
402            | TaskStatus::Failed
403            | TaskStatus::Cancelled => {
404                // Orphan detection: persisted Running but no live handle.
405                if lookup.summary.status == TaskStatus::Running && !lookup.is_live {
406                    self.mark_orphan_stopped(&task_id).await?;
407                    // Return stopped status with orphan message.
408                    let mut stopped_summary = lookup.summary.clone();
409                    stopped_summary.status = TaskStatus::Stopped;
410                    stopped_summary.error = Some(
411                        "No live executor found in current process; marked stopped".to_string(),
412                    );
413                    if self.inner.supports_resume() {
414                        Self::enrich_args_for_resume(&mut args, &lookup.summary.metadata);
415                        return self
416                            .execute_task(
417                                args,
418                                ctx,
419                                &ExecuteParams {
420                                    task_id: &task_id,
421                                    owner_thread_id,
422                                    background,
423                                    is_resume: true,
424                                    parent_task_id,
425                                },
426                            )
427                            .await;
428                    }
429                    return Ok(self.status_result(&task_id, &stopped_summary));
430                }
431                Ok(self.status_result(&task_id, &lookup.summary))
432            }
433            TaskStatus::Stopped => {
434                if !self.inner.supports_resume() {
435                    return Ok(self.status_result(&task_id, &lookup.summary));
436                }
437                // Resume: re-execute with same task_id, enriched with stored metadata.
438                Self::enrich_args_for_resume(&mut args, &lookup.summary.metadata);
439                self.execute_task(
440                    args,
441                    ctx,
442                    &ExecuteParams {
443                        task_id: &task_id,
444                        owner_thread_id,
445                        background,
446                        is_resume: true,
447                        parent_task_id,
448                    },
449                )
450                .await
451            }
452        }
453    }
454
455    /// Core execution path for both new and resumed tasks.
456    async fn execute_task(
457        &self,
458        mut args: Value,
459        ctx: &ToolCallContext<'_>,
460        params: &ExecuteParams<'_>,
461    ) -> Result<ToolResult, ToolError> {
462        let prepared = if params.background {
463            match self
464                .inner
465                .prepare_background(&mut args, ctx, params.is_resume)
466            {
467                Ok(prepared) => Some(prepared),
468                Err(result) => return Ok(result),
469            }
470        } else {
471            None
472        };
473
474        let description = self.inner.task_description(&args);
475        let metadata = self.inner.task_metadata(&args);
476
477        // Persist start (create or increment attempt).
478        self.persist_start(
479            params.task_id,
480            params.owner_thread_id,
481            params.is_resume,
482            &description,
483            params.parent_task_id,
484            &metadata,
485        )
486        .await?;
487
488        if params.background {
489            let inner = self.inner.clone();
490            let task_id_owned = params.task_id.to_string();
491            let cancel_token = RunCancellationToken::new();
492            let spawn_token = cancel_token.clone();
493            let execution = BackgroundExecutionContext {
494                owner_thread_id: params.owner_thread_id.to_string(),
495                parent_task_id: params.parent_task_id.map(str::to_string),
496                is_resume: params.is_resume,
497            };
498            let prepared = prepared.expect("background preparation should exist");
499
500            self.manager
501                .spawn_with_id(
502                    SpawnParams {
503                        task_id: params.task_id.to_string(),
504                        owner_thread_id: params.owner_thread_id.to_string(),
505                        task_type: self.inner.task_type().to_string(),
506                        description,
507                        parent_task_id: params.parent_task_id.map(str::to_string),
508                        metadata,
509                    },
510                    cancel_token,
511                    move |_cancel| async move {
512                        inner
513                            .execute_background(
514                                &task_id_owned,
515                                args,
516                                prepared,
517                                execution,
518                                spawn_token,
519                            )
520                            .await
521                    },
522                )
523                .await;
524
525            let tool_name = self.inner.descriptor().name.clone();
526            Ok(ToolResult::success(
527                &tool_name,
528                json!({
529                    "task_id": params.task_id,
530                    "status": "running_in_background",
531                    "message": format!(
532                        "Task started in background. Use task_status tool with task_id '{}' to check progress.",
533                        params.task_id
534                    ),
535                }),
536            ))
537        } else {
538            let result = self
539                .inner
540                .execute_foreground(args, ctx, params.is_resume)
541                .await?;
542
543            // Infer terminal status from ToolResult for persistence.
544            let (status, error) = self.inner.foreground_task_status(&result);
545            self.persist_result(params.task_id, status, error).await?;
546
547            Ok(result)
548        }
549    }
550}
551
552#[async_trait]
553impl<T: BackgroundExecutable + 'static> Tool for BackgroundCapable<T> {
554    fn descriptor(&self) -> ToolDescriptor {
555        let mut desc = self.inner.descriptor();
556        inject_background_param(&mut desc.parameters);
557        desc
558    }
559
560    fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
561        let mut stripped = args.clone();
562        if let Some(obj) = stripped.as_object_mut() {
563            obj.remove(RUN_IN_BACKGROUND_PARAM);
564        }
565        self.inner.validate_args(&stripped)
566    }
567
568    async fn execute(
569        &self,
570        args: Value,
571        ctx: &ToolCallContext<'_>,
572    ) -> Result<ToolResult, ToolError> {
573        let background = args
574            .get(RUN_IN_BACKGROUND_PARAM)
575            .and_then(Value::as_bool)
576            .unwrap_or(false);
577
578        let mut clean_args = strip_background_param(args);
579        let owner_thread_id = self.extract_owner_thread_id(ctx);
580
581        // Check for existing task_id (resume/status query).
582        let existing_task_id = self.inner.task_id_from_args(&clean_args);
583
584        // Determine parent task_id from scope if available.
585        let parent_task_id: Option<String> = ctx.run_identity().run_id_opt().map(str::to_string);
586
587        if let Some(task_id) = existing_task_id {
588            return self
589                .handle_existing_task(
590                    clean_args,
591                    ctx,
592                    task_id,
593                    &owner_thread_id,
594                    background,
595                    parent_task_id.as_deref(),
596                )
597                .await;
598        }
599
600        // New task: generate task_id and inject into args.
601        let task_id = new_task_id();
602        self.inner.set_task_id_in_args(&mut clean_args, &task_id);
603
604        self.execute_task(
605            clean_args,
606            ctx,
607            &ExecuteParams {
608                task_id: &task_id,
609                owner_thread_id: &owner_thread_id,
610                background,
611                is_resume: false,
612                parent_task_id: parent_task_id.as_deref(),
613            },
614        )
615        .await
616    }
617}
618
619/// Inject `run_in_background` boolean parameter into a JSON Schema.
620fn inject_background_param(schema: &mut Value) {
621    if let Some(properties) = schema
622        .as_object_mut()
623        .and_then(|obj| obj.get_mut("properties"))
624        .and_then(Value::as_object_mut)
625    {
626        properties.insert(
627            RUN_IN_BACKGROUND_PARAM.to_string(),
628            json!({
629                "type": "boolean",
630                "description": "If true, execute this tool in the background and return immediately with a task_id. Use task_status to check progress later."
631            }),
632        );
633    }
634}
635
636fn strip_background_param(mut args: Value) -> Value {
637    if let Some(obj) = args.as_object_mut() {
638        obj.remove(RUN_IN_BACKGROUND_PARAM);
639    }
640    args
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646
647    struct EchoTool;
648
649    #[async_trait]
650    impl Tool for EchoTool {
651        fn descriptor(&self) -> ToolDescriptor {
652            ToolDescriptor::new("echo", "echo", "Echo back the input").with_parameters(json!({
653                "type": "object",
654                "properties": {
655                    "message": { "type": "string" }
656                },
657                "required": ["message"]
658            }))
659        }
660
661        async fn execute(
662            &self,
663            args: Value,
664            _ctx: &ToolCallContext<'_>,
665        ) -> Result<ToolResult, ToolError> {
666            let msg = args
667                .get("message")
668                .and_then(Value::as_str)
669                .unwrap_or("(empty)");
670            Ok(ToolResult::success("echo", json!({ "echoed": msg })))
671        }
672    }
673
674    #[async_trait]
675    impl BackgroundExecutable for EchoTool {
676        type PreparedBackground = ();
677
678        fn task_type(&self) -> &str {
679            "echo"
680        }
681
682        fn prepare_background(
683            &self,
684            _args: &mut Value,
685            _ctx: &ToolCallContext<'_>,
686            _is_resume: bool,
687        ) -> Result<Self::PreparedBackground, ToolResult> {
688            Ok(())
689        }
690
691        async fn execute_background(
692            &self,
693            _task_id: &str,
694            args: Value,
695            _prepared: Self::PreparedBackground,
696            _execution: BackgroundExecutionContext,
697            _cancel_token: RunCancellationToken,
698        ) -> TaskResult {
699            let msg = args
700                .get("message")
701                .and_then(Value::as_str)
702                .unwrap_or("(empty)");
703            TaskResult::Success(json!({ "echoed": msg }))
704        }
705    }
706
707    #[test]
708    fn descriptor_includes_background_param() {
709        let mgr = Arc::new(BackgroundTaskManager::new());
710        let wrapped = BackgroundCapable::new(EchoTool, mgr);
711        let desc = wrapped.descriptor();
712        let props = desc.parameters["properties"].as_object().unwrap();
713        assert!(props.contains_key(RUN_IN_BACKGROUND_PARAM));
714    }
715
716    #[test]
717    fn inject_background_param_preserves_existing_properties() {
718        let mut schema = json!({
719            "type": "object",
720            "properties": {
721                "x": { "type": "string" }
722            }
723        });
724        inject_background_param(&mut schema);
725        let props = schema["properties"].as_object().unwrap();
726        assert!(props.contains_key("x"));
727        assert!(props.contains_key(RUN_IN_BACKGROUND_PARAM));
728    }
729
730    #[test]
731    fn strip_background_param_removes_it() {
732        let args = json!({
733            "message": "hello",
734            "run_in_background": true
735        });
736        let cleaned = strip_background_param(args);
737        assert!(cleaned.get("message").is_some());
738        assert!(cleaned.get(RUN_IN_BACKGROUND_PARAM).is_none());
739    }
740
741    #[test]
742    fn validate_args_strips_background_param_before_inner_check() {
743        let mgr = Arc::new(BackgroundTaskManager::new());
744        let wrapped = BackgroundCapable::new(EchoTool, mgr);
745        // Inner EchoTool doesn't know about run_in_background.
746        let args = json!({
747            "message": "hello",
748            "run_in_background": true
749        });
750        assert!(wrapped.validate_args(&args).is_ok());
751    }
752
753    #[tokio::test]
754    async fn foreground_execution_delegates_to_inner_tool() {
755        let mgr = Arc::new(BackgroundTaskManager::new());
756        let wrapped = BackgroundCapable::new(EchoTool, mgr);
757
758        let fix = tirea_contract::testing::TestFixture::new();
759        let result = wrapped
760            .execute(
761                json!({ "message": "hello", "run_in_background": false }),
762                &fix.ctx(),
763            )
764            .await
765            .unwrap();
766        assert!(result.is_success());
767        let content: Value = result.data.clone();
768        assert_eq!(content["echoed"], "hello");
769    }
770
771    #[tokio::test]
772    async fn foreground_execution_when_param_absent() {
773        let mgr = Arc::new(BackgroundTaskManager::new());
774        let wrapped = BackgroundCapable::new(EchoTool, mgr);
775
776        let fix = tirea_contract::testing::TestFixture::new();
777        let result = wrapped
778            .execute(json!({ "message": "hi" }), &fix.ctx())
779            .await
780            .unwrap();
781        assert!(result.is_success());
782        let content: Value = result.data.clone();
783        assert_eq!(content["echoed"], "hi");
784    }
785
786    #[tokio::test]
787    async fn background_execution_returns_task_id() {
788        let mgr = Arc::new(BackgroundTaskManager::new());
789        let wrapped = BackgroundCapable::new(EchoTool, mgr.clone());
790
791        let mut fix = tirea_contract::testing::TestFixture::new();
792        fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
793            Some("thread-1".to_string()),
794            Some("caller-run".to_string()),
795            Some("caller".to_string()),
796            vec![],
797        );
798
799        let result = wrapped
800            .execute(
801                json!({ "message": "bg-msg", "run_in_background": true }),
802                &fix.ctx_with("call-1", "tool:echo"),
803            )
804            .await
805            .unwrap();
806        assert!(result.is_success());
807        let content: Value = result.data.clone();
808        assert!(content["task_id"].as_str().is_some());
809        assert_eq!(content["status"], "running_in_background");
810
811        // Wait for background task to complete and verify via manager.
812        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
813        let task_id = content["task_id"].as_str().unwrap();
814        let summary = mgr.get("thread-1", task_id).await.unwrap();
815        assert_eq!(summary.status, super::super::types::TaskStatus::Completed);
816        assert_eq!(summary.result.unwrap()["echoed"], "bg-msg");
817    }
818
819    #[tokio::test]
820    async fn background_execution_uses_description_from_args() {
821        let mgr = Arc::new(BackgroundTaskManager::new());
822        let wrapped = BackgroundCapable::new(EchoTool, mgr.clone());
823
824        let mut fix = tirea_contract::testing::TestFixture::new();
825        fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
826            Some("thread-1".to_string()),
827            Some("caller-run".to_string()),
828            Some("caller".to_string()),
829            vec![],
830        );
831
832        let result = wrapped
833            .execute(
834                json!({
835                    "message": "bg",
836                    "command": "echo hello",
837                    "run_in_background": true
838                }),
839                &fix.ctx_with("c-1", "tool:echo"),
840            )
841            .await
842            .unwrap();
843        let content: Value = result.data.clone();
844        let task_id = content["task_id"].as_str().unwrap();
845
846        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
847        let summary = mgr.get("thread-1", task_id).await.unwrap();
848        assert!(
849            summary.description.contains("echo hello"),
850            "description should contain command: {}",
851            summary.description
852        );
853    }
854
855    /// Slow tool that respects cancellation.
856    struct SlowTool;
857
858    #[async_trait]
859    impl Tool for SlowTool {
860        fn descriptor(&self) -> ToolDescriptor {
861            ToolDescriptor::new("slow", "slow", "A slow tool").with_parameters(json!({
862                "type": "object",
863                "properties": {}
864            }))
865        }
866
867        async fn execute(
868            &self,
869            _args: Value,
870            _ctx: &ToolCallContext<'_>,
871        ) -> Result<ToolResult, ToolError> {
872            Ok(ToolResult::success("slow", json!("done")))
873        }
874    }
875
876    #[async_trait]
877    impl BackgroundExecutable for SlowTool {
878        type PreparedBackground = ();
879
880        fn task_type(&self) -> &str {
881            "slow"
882        }
883
884        fn prepare_background(
885            &self,
886            _args: &mut Value,
887            _ctx: &ToolCallContext<'_>,
888            _is_resume: bool,
889        ) -> Result<Self::PreparedBackground, ToolResult> {
890            Ok(())
891        }
892
893        async fn execute_background(
894            &self,
895            _task_id: &str,
896            _args: Value,
897            _prepared: Self::PreparedBackground,
898            _execution: BackgroundExecutionContext,
899            cancel_token: RunCancellationToken,
900        ) -> TaskResult {
901            tokio::select! {
902                _ = cancel_token.cancelled() => TaskResult::Cancelled,
903                _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
904                    TaskResult::Success(json!("completed"))
905                }
906            }
907        }
908    }
909
910    #[tokio::test]
911    async fn background_cancellation_via_cancel_token() {
912        let mgr = Arc::new(BackgroundTaskManager::new());
913        let wrapped = BackgroundCapable::new(SlowTool, mgr.clone());
914
915        let mut fix = tirea_contract::testing::TestFixture::new();
916        fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
917            Some("thread-1".to_string()),
918            Some("caller-run".to_string()),
919            Some("caller".to_string()),
920            vec![],
921        );
922
923        let result = wrapped
924            .execute(
925                json!({ "run_in_background": true }),
926                &fix.ctx_with("c-1", "tool:slow"),
927            )
928            .await
929            .unwrap();
930        let content: Value = result.data.clone();
931        let task_id = content["task_id"].as_str().unwrap();
932
933        // Should be running.
934        let summary = mgr.get("thread-1", task_id).await.unwrap();
935        assert_eq!(summary.status, super::super::types::TaskStatus::Running);
936
937        // Cancel via manager.
938        mgr.cancel("thread-1", task_id).await.unwrap();
939        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
940
941        let summary = mgr.get("thread-1", task_id).await.unwrap();
942        assert_eq!(summary.status, super::super::types::TaskStatus::Cancelled);
943    }
944}