1use super::manager::BackgroundTaskManager;
14use super::types::*;
15use super::{new_task_id, NewTaskSpec, TaskStore, TaskStoreError};
16use crate::contracts::runtime::tool_call::{
17 Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult,
18};
19use crate::runtime::loop_runner::RunCancellationToken;
20use async_trait::async_trait;
21use serde_json::{json, Value};
22use std::sync::Arc;
23
24const RUN_IN_BACKGROUND_PARAM: &str = "run_in_background";
25
26#[derive(Debug, Clone, Default, PartialEq, Eq)]
28pub struct BackgroundExecutionContext {
29 pub owner_thread_id: String,
30 pub parent_task_id: Option<String>,
31 pub is_resume: bool,
32}
33
34#[async_trait]
40pub trait BackgroundExecutable: Tool {
41 type PreparedBackground: Send + 'static;
43
44 fn task_type(&self) -> &str;
46
47 fn supports_resume(&self) -> bool {
49 false
50 }
51
52 fn task_metadata(&self, _args: &Value) -> Value {
54 json!({})
55 }
56
57 fn task_id_from_args(&self, _args: &Value) -> Option<String> {
60 None
61 }
62
63 fn set_task_id_in_args(&self, args: &mut Value, task_id: &str) {
66 if let Some(obj) = args.as_object_mut() {
67 obj.insert("task_id".to_string(), json!(task_id));
68 }
69 }
70
71 fn task_description(&self, args: &Value) -> String {
73 let tool_name = self.descriptor().name.clone();
74 format!(
75 "{} (background)",
76 args.get("description")
77 .or_else(|| args.get("command"))
78 .and_then(Value::as_str)
79 .unwrap_or(&tool_name)
80 )
81 }
82
83 #[allow(clippy::result_large_err)]
90 fn prepare_background(
91 &self,
92 _args: &mut Value,
93 _ctx: &ToolCallContext<'_>,
94 _is_resume: bool,
95 ) -> Result<Self::PreparedBackground, ToolResult>;
96
97 async fn execute_foreground(
103 &self,
104 args: Value,
105 ctx: &ToolCallContext<'_>,
106 _is_resume: bool,
107 ) -> Result<ToolResult, ToolError> {
108 self.execute(args, ctx).await
109 }
110
111 fn foreground_task_status(&self, result: &ToolResult) -> (TaskStatus, Option<String>) {
116 if result.is_error() {
117 (TaskStatus::Failed, result.message.clone())
118 } else {
119 (TaskStatus::Completed, None)
120 }
121 }
122
123 async fn execute_background(
129 &self,
130 task_id: &str,
131 args: Value,
132 prepared: Self::PreparedBackground,
133 execution: BackgroundExecutionContext,
134 cancel_token: RunCancellationToken,
135 ) -> TaskResult;
136}
137
138pub struct BackgroundCapable<T: BackgroundExecutable> {
146 inner: Arc<T>,
147 manager: Arc<BackgroundTaskManager>,
148 task_store: Option<Arc<TaskStore>>,
149}
150
151impl<T: BackgroundExecutable> BackgroundCapable<T> {
152 pub fn new(inner: T, manager: Arc<BackgroundTaskManager>) -> Self {
153 Self {
154 inner: Arc::new(inner),
155 manager,
156 task_store: None,
157 }
158 }
159
160 pub fn from_arc(inner: Arc<T>, manager: Arc<BackgroundTaskManager>) -> Self {
161 Self {
162 inner,
163 manager,
164 task_store: None,
165 }
166 }
167
168 pub fn with_task_store(mut self, task_store: Option<Arc<TaskStore>>) -> Self {
169 self.task_store = task_store;
170 self
171 }
172}
173
174struct TaskLookup {
176 summary: TaskSummary,
177 is_live: bool,
179}
180
181struct ExecuteParams<'a> {
183 task_id: &'a str,
184 owner_thread_id: &'a str,
185 background: bool,
186 is_resume: bool,
187 parent_task_id: Option<&'a str>,
188}
189
190impl<T: BackgroundExecutable + 'static> BackgroundCapable<T> {
191 async fn lookup_task(
194 &self,
195 owner_thread_id: &str,
196 task_id: &str,
197 ) -> Result<Option<TaskLookup>, TaskStoreError> {
198 let live = self.manager.get(owner_thread_id, task_id).await;
199 if let Some(summary) = live {
200 return Ok(Some(TaskLookup {
201 summary,
202 is_live: true,
203 }));
204 }
205
206 if let Some(store) = &self.task_store {
207 if let Some(task) = store.load_task_for_owner(owner_thread_id, task_id).await? {
208 return Ok(Some(TaskLookup {
209 summary: task.summary(),
210 is_live: false,
211 }));
212 }
213 }
214
215 Ok(None)
216 }
217
218 async fn persist_start(
220 &self,
221 task_id: &str,
222 owner_thread_id: &str,
223 is_resume: bool,
224 description: &str,
225 parent_task_id: Option<&str>,
226 metadata: &Value,
227 ) -> Result<(), ToolError> {
228 let Some(store) = &self.task_store else {
229 return Ok(());
230 };
231
232 if is_resume {
233 match store.load_task(task_id).await {
235 Ok(Some(task)) => {
236 if task.owner_thread_id != owner_thread_id {
237 return Err(ToolError::ExecutionFailed(format!(
238 "task '{}' belongs to a different owner",
239 task_id
240 )));
241 }
242 store.start_task_attempt(task_id).await.map_err(|e| {
243 ToolError::ExecutionFailed(format!("task persist failed: {e}"))
244 })?;
245 }
246 Ok(None) => {
247 self.create_task(
249 store,
250 task_id,
251 owner_thread_id,
252 description,
253 parent_task_id,
254 metadata,
255 )
256 .await?;
257 }
258 Err(e) => {
259 return Err(ToolError::ExecutionFailed(format!(
260 "task persist failed: {e}"
261 )));
262 }
263 }
264 } else {
265 self.create_task(
266 store,
267 task_id,
268 owner_thread_id,
269 description,
270 parent_task_id,
271 metadata,
272 )
273 .await?;
274 }
275
276 Ok(())
277 }
278
279 async fn create_task(
280 &self,
281 store: &TaskStore,
282 task_id: &str,
283 owner_thread_id: &str,
284 description: &str,
285 parent_task_id: Option<&str>,
286 metadata: &Value,
287 ) -> Result<(), ToolError> {
288 store
289 .create_task(NewTaskSpec {
290 task_id: task_id.to_string(),
291 owner_thread_id: owner_thread_id.to_string(),
292 task_type: self.inner.task_type().to_string(),
293 description: description.to_string(),
294 parent_task_id: parent_task_id.map(str::to_string),
295 supports_resume: self.inner.supports_resume(),
296 metadata: metadata.clone(),
297 })
298 .await
299 .map_err(|e| ToolError::ExecutionFailed(format!("task persist failed: {e}")))?;
300 Ok(())
301 }
302
303 async fn persist_result(
305 &self,
306 task_id: &str,
307 status: TaskStatus,
308 error: Option<String>,
309 ) -> Result<(), ToolError> {
310 let Some(store) = &self.task_store else {
311 return Ok(());
312 };
313 store
314 .persist_foreground_result(task_id, status, error, None)
315 .await
316 .map_err(|e| ToolError::ExecutionFailed(format!("task persist failed: {e}")))?;
317 Ok(())
318 }
319
320 async fn mark_orphan_stopped(&self, task_id: &str) -> Result<(), ToolError> {
322 let Some(store) = &self.task_store else {
323 return Ok(());
324 };
325 store
326 .persist_foreground_result(
327 task_id,
328 TaskStatus::Stopped,
329 Some("No live executor found in current process; marked stopped".to_string()),
330 None,
331 )
332 .await
333 .map_err(|e| {
334 ToolError::ExecutionFailed(format!("failed to mark orphan stopped: {e}"))
335 })?;
336 Ok(())
337 }
338
339 fn status_result(&self, task_id: &str, summary: &TaskSummary) -> ToolResult {
340 let tool_name = self.inner.descriptor().name.clone();
341 let agent_id = summary
342 .metadata
343 .get("agent_id")
344 .and_then(|v| v.as_str())
345 .unwrap_or("unknown");
346 ToolResult::success(
347 &tool_name,
348 json!({
349 "task_id": task_id,
350 "agent_id": agent_id,
351 "status": summary.status.as_str(),
352 "error": summary.error,
353 }),
354 )
355 }
356
357 fn extract_owner_thread_id(&self, ctx: &ToolCallContext<'_>) -> String {
358 ctx.caller_context()
359 .thread_id()
360 .unwrap_or(ctx.source())
361 .to_string()
362 }
363
364 fn enrich_args_for_resume(args: &mut Value, metadata: &Value) {
367 if let (Some(obj), Some(meta)) = (args.as_object_mut(), metadata.as_object()) {
368 for (k, v) in meta {
369 if !obj.contains_key(k) {
370 obj.insert(k.clone(), v.clone());
371 }
372 }
373 }
374 }
375
376 async fn handle_existing_task(
378 &self,
379 mut args: Value,
380 ctx: &ToolCallContext<'_>,
381 task_id: String,
382 owner_thread_id: &str,
383 background: bool,
384 parent_task_id: Option<&str>,
385 ) -> Result<ToolResult, ToolError> {
386 let lookup = self
387 .lookup_task(owner_thread_id, &task_id)
388 .await
389 .map_err(|e| ToolError::ExecutionFailed(format!("task lookup failed: {e}")))?;
390
391 let Some(lookup) = lookup else {
392 return Ok(ToolResult::error(
393 self.inner.descriptor().name,
394 format!("Unknown task: {task_id}"),
395 ));
396 };
397
398 match lookup.summary.status {
399 TaskStatus::Running
401 | TaskStatus::Completed
402 | TaskStatus::Failed
403 | TaskStatus::Cancelled => {
404 if lookup.summary.status == TaskStatus::Running && !lookup.is_live {
406 self.mark_orphan_stopped(&task_id).await?;
407 let mut stopped_summary = lookup.summary.clone();
409 stopped_summary.status = TaskStatus::Stopped;
410 stopped_summary.error = Some(
411 "No live executor found in current process; marked stopped".to_string(),
412 );
413 if self.inner.supports_resume() {
414 Self::enrich_args_for_resume(&mut args, &lookup.summary.metadata);
415 return self
416 .execute_task(
417 args,
418 ctx,
419 &ExecuteParams {
420 task_id: &task_id,
421 owner_thread_id,
422 background,
423 is_resume: true,
424 parent_task_id,
425 },
426 )
427 .await;
428 }
429 return Ok(self.status_result(&task_id, &stopped_summary));
430 }
431 Ok(self.status_result(&task_id, &lookup.summary))
432 }
433 TaskStatus::Stopped => {
434 if !self.inner.supports_resume() {
435 return Ok(self.status_result(&task_id, &lookup.summary));
436 }
437 Self::enrich_args_for_resume(&mut args, &lookup.summary.metadata);
439 self.execute_task(
440 args,
441 ctx,
442 &ExecuteParams {
443 task_id: &task_id,
444 owner_thread_id,
445 background,
446 is_resume: true,
447 parent_task_id,
448 },
449 )
450 .await
451 }
452 }
453 }
454
455 async fn execute_task(
457 &self,
458 mut args: Value,
459 ctx: &ToolCallContext<'_>,
460 params: &ExecuteParams<'_>,
461 ) -> Result<ToolResult, ToolError> {
462 let prepared = if params.background {
463 match self
464 .inner
465 .prepare_background(&mut args, ctx, params.is_resume)
466 {
467 Ok(prepared) => Some(prepared),
468 Err(result) => return Ok(result),
469 }
470 } else {
471 None
472 };
473
474 let description = self.inner.task_description(&args);
475 let metadata = self.inner.task_metadata(&args);
476
477 self.persist_start(
479 params.task_id,
480 params.owner_thread_id,
481 params.is_resume,
482 &description,
483 params.parent_task_id,
484 &metadata,
485 )
486 .await?;
487
488 if params.background {
489 let inner = self.inner.clone();
490 let task_id_owned = params.task_id.to_string();
491 let cancel_token = RunCancellationToken::new();
492 let spawn_token = cancel_token.clone();
493 let execution = BackgroundExecutionContext {
494 owner_thread_id: params.owner_thread_id.to_string(),
495 parent_task_id: params.parent_task_id.map(str::to_string),
496 is_resume: params.is_resume,
497 };
498 let prepared = prepared.expect("background preparation should exist");
499
500 self.manager
501 .spawn_with_id(
502 SpawnParams {
503 task_id: params.task_id.to_string(),
504 owner_thread_id: params.owner_thread_id.to_string(),
505 task_type: self.inner.task_type().to_string(),
506 description,
507 parent_task_id: params.parent_task_id.map(str::to_string),
508 metadata,
509 },
510 cancel_token,
511 move |_cancel| async move {
512 inner
513 .execute_background(
514 &task_id_owned,
515 args,
516 prepared,
517 execution,
518 spawn_token,
519 )
520 .await
521 },
522 )
523 .await;
524
525 let tool_name = self.inner.descriptor().name.clone();
526 Ok(ToolResult::success(
527 &tool_name,
528 json!({
529 "task_id": params.task_id,
530 "status": "running_in_background",
531 "message": format!(
532 "Task started in background. Use task_status tool with task_id '{}' to check progress.",
533 params.task_id
534 ),
535 }),
536 ))
537 } else {
538 let result = self
539 .inner
540 .execute_foreground(args, ctx, params.is_resume)
541 .await?;
542
543 let (status, error) = self.inner.foreground_task_status(&result);
545 self.persist_result(params.task_id, status, error).await?;
546
547 Ok(result)
548 }
549 }
550}
551
552#[async_trait]
553impl<T: BackgroundExecutable + 'static> Tool for BackgroundCapable<T> {
554 fn descriptor(&self) -> ToolDescriptor {
555 let mut desc = self.inner.descriptor();
556 inject_background_param(&mut desc.parameters);
557 desc
558 }
559
560 fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
561 let mut stripped = args.clone();
562 if let Some(obj) = stripped.as_object_mut() {
563 obj.remove(RUN_IN_BACKGROUND_PARAM);
564 }
565 self.inner.validate_args(&stripped)
566 }
567
568 async fn execute(
569 &self,
570 args: Value,
571 ctx: &ToolCallContext<'_>,
572 ) -> Result<ToolResult, ToolError> {
573 let background = args
574 .get(RUN_IN_BACKGROUND_PARAM)
575 .and_then(Value::as_bool)
576 .unwrap_or(false);
577
578 let mut clean_args = strip_background_param(args);
579 let owner_thread_id = self.extract_owner_thread_id(ctx);
580
581 let existing_task_id = self.inner.task_id_from_args(&clean_args);
583
584 let parent_task_id: Option<String> = ctx.run_identity().run_id_opt().map(str::to_string);
586
587 if let Some(task_id) = existing_task_id {
588 return self
589 .handle_existing_task(
590 clean_args,
591 ctx,
592 task_id,
593 &owner_thread_id,
594 background,
595 parent_task_id.as_deref(),
596 )
597 .await;
598 }
599
600 let task_id = new_task_id();
602 self.inner.set_task_id_in_args(&mut clean_args, &task_id);
603
604 self.execute_task(
605 clean_args,
606 ctx,
607 &ExecuteParams {
608 task_id: &task_id,
609 owner_thread_id: &owner_thread_id,
610 background,
611 is_resume: false,
612 parent_task_id: parent_task_id.as_deref(),
613 },
614 )
615 .await
616 }
617}
618
619fn inject_background_param(schema: &mut Value) {
621 if let Some(properties) = schema
622 .as_object_mut()
623 .and_then(|obj| obj.get_mut("properties"))
624 .and_then(Value::as_object_mut)
625 {
626 properties.insert(
627 RUN_IN_BACKGROUND_PARAM.to_string(),
628 json!({
629 "type": "boolean",
630 "description": "If true, execute this tool in the background and return immediately with a task_id. Use task_status to check progress later."
631 }),
632 );
633 }
634}
635
636fn strip_background_param(mut args: Value) -> Value {
637 if let Some(obj) = args.as_object_mut() {
638 obj.remove(RUN_IN_BACKGROUND_PARAM);
639 }
640 args
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 struct EchoTool;
648
649 #[async_trait]
650 impl Tool for EchoTool {
651 fn descriptor(&self) -> ToolDescriptor {
652 ToolDescriptor::new("echo", "echo", "Echo back the input").with_parameters(json!({
653 "type": "object",
654 "properties": {
655 "message": { "type": "string" }
656 },
657 "required": ["message"]
658 }))
659 }
660
661 async fn execute(
662 &self,
663 args: Value,
664 _ctx: &ToolCallContext<'_>,
665 ) -> Result<ToolResult, ToolError> {
666 let msg = args
667 .get("message")
668 .and_then(Value::as_str)
669 .unwrap_or("(empty)");
670 Ok(ToolResult::success("echo", json!({ "echoed": msg })))
671 }
672 }
673
674 #[async_trait]
675 impl BackgroundExecutable for EchoTool {
676 type PreparedBackground = ();
677
678 fn task_type(&self) -> &str {
679 "echo"
680 }
681
682 fn prepare_background(
683 &self,
684 _args: &mut Value,
685 _ctx: &ToolCallContext<'_>,
686 _is_resume: bool,
687 ) -> Result<Self::PreparedBackground, ToolResult> {
688 Ok(())
689 }
690
691 async fn execute_background(
692 &self,
693 _task_id: &str,
694 args: Value,
695 _prepared: Self::PreparedBackground,
696 _execution: BackgroundExecutionContext,
697 _cancel_token: RunCancellationToken,
698 ) -> TaskResult {
699 let msg = args
700 .get("message")
701 .and_then(Value::as_str)
702 .unwrap_or("(empty)");
703 TaskResult::Success(json!({ "echoed": msg }))
704 }
705 }
706
707 #[test]
708 fn descriptor_includes_background_param() {
709 let mgr = Arc::new(BackgroundTaskManager::new());
710 let wrapped = BackgroundCapable::new(EchoTool, mgr);
711 let desc = wrapped.descriptor();
712 let props = desc.parameters["properties"].as_object().unwrap();
713 assert!(props.contains_key(RUN_IN_BACKGROUND_PARAM));
714 }
715
716 #[test]
717 fn inject_background_param_preserves_existing_properties() {
718 let mut schema = json!({
719 "type": "object",
720 "properties": {
721 "x": { "type": "string" }
722 }
723 });
724 inject_background_param(&mut schema);
725 let props = schema["properties"].as_object().unwrap();
726 assert!(props.contains_key("x"));
727 assert!(props.contains_key(RUN_IN_BACKGROUND_PARAM));
728 }
729
730 #[test]
731 fn strip_background_param_removes_it() {
732 let args = json!({
733 "message": "hello",
734 "run_in_background": true
735 });
736 let cleaned = strip_background_param(args);
737 assert!(cleaned.get("message").is_some());
738 assert!(cleaned.get(RUN_IN_BACKGROUND_PARAM).is_none());
739 }
740
741 #[test]
742 fn validate_args_strips_background_param_before_inner_check() {
743 let mgr = Arc::new(BackgroundTaskManager::new());
744 let wrapped = BackgroundCapable::new(EchoTool, mgr);
745 let args = json!({
747 "message": "hello",
748 "run_in_background": true
749 });
750 assert!(wrapped.validate_args(&args).is_ok());
751 }
752
753 #[tokio::test]
754 async fn foreground_execution_delegates_to_inner_tool() {
755 let mgr = Arc::new(BackgroundTaskManager::new());
756 let wrapped = BackgroundCapable::new(EchoTool, mgr);
757
758 let fix = tirea_contract::testing::TestFixture::new();
759 let result = wrapped
760 .execute(
761 json!({ "message": "hello", "run_in_background": false }),
762 &fix.ctx(),
763 )
764 .await
765 .unwrap();
766 assert!(result.is_success());
767 let content: Value = result.data.clone();
768 assert_eq!(content["echoed"], "hello");
769 }
770
771 #[tokio::test]
772 async fn foreground_execution_when_param_absent() {
773 let mgr = Arc::new(BackgroundTaskManager::new());
774 let wrapped = BackgroundCapable::new(EchoTool, mgr);
775
776 let fix = tirea_contract::testing::TestFixture::new();
777 let result = wrapped
778 .execute(json!({ "message": "hi" }), &fix.ctx())
779 .await
780 .unwrap();
781 assert!(result.is_success());
782 let content: Value = result.data.clone();
783 assert_eq!(content["echoed"], "hi");
784 }
785
786 #[tokio::test]
787 async fn background_execution_returns_task_id() {
788 let mgr = Arc::new(BackgroundTaskManager::new());
789 let wrapped = BackgroundCapable::new(EchoTool, mgr.clone());
790
791 let mut fix = tirea_contract::testing::TestFixture::new();
792 fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
793 Some("thread-1".to_string()),
794 Some("caller-run".to_string()),
795 Some("caller".to_string()),
796 vec![],
797 );
798
799 let result = wrapped
800 .execute(
801 json!({ "message": "bg-msg", "run_in_background": true }),
802 &fix.ctx_with("call-1", "tool:echo"),
803 )
804 .await
805 .unwrap();
806 assert!(result.is_success());
807 let content: Value = result.data.clone();
808 assert!(content["task_id"].as_str().is_some());
809 assert_eq!(content["status"], "running_in_background");
810
811 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
813 let task_id = content["task_id"].as_str().unwrap();
814 let summary = mgr.get("thread-1", task_id).await.unwrap();
815 assert_eq!(summary.status, super::super::types::TaskStatus::Completed);
816 assert_eq!(summary.result.unwrap()["echoed"], "bg-msg");
817 }
818
819 #[tokio::test]
820 async fn background_execution_uses_description_from_args() {
821 let mgr = Arc::new(BackgroundTaskManager::new());
822 let wrapped = BackgroundCapable::new(EchoTool, mgr.clone());
823
824 let mut fix = tirea_contract::testing::TestFixture::new();
825 fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
826 Some("thread-1".to_string()),
827 Some("caller-run".to_string()),
828 Some("caller".to_string()),
829 vec![],
830 );
831
832 let result = wrapped
833 .execute(
834 json!({
835 "message": "bg",
836 "command": "echo hello",
837 "run_in_background": true
838 }),
839 &fix.ctx_with("c-1", "tool:echo"),
840 )
841 .await
842 .unwrap();
843 let content: Value = result.data.clone();
844 let task_id = content["task_id"].as_str().unwrap();
845
846 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
847 let summary = mgr.get("thread-1", task_id).await.unwrap();
848 assert!(
849 summary.description.contains("echo hello"),
850 "description should contain command: {}",
851 summary.description
852 );
853 }
854
855 struct SlowTool;
857
858 #[async_trait]
859 impl Tool for SlowTool {
860 fn descriptor(&self) -> ToolDescriptor {
861 ToolDescriptor::new("slow", "slow", "A slow tool").with_parameters(json!({
862 "type": "object",
863 "properties": {}
864 }))
865 }
866
867 async fn execute(
868 &self,
869 _args: Value,
870 _ctx: &ToolCallContext<'_>,
871 ) -> Result<ToolResult, ToolError> {
872 Ok(ToolResult::success("slow", json!("done")))
873 }
874 }
875
876 #[async_trait]
877 impl BackgroundExecutable for SlowTool {
878 type PreparedBackground = ();
879
880 fn task_type(&self) -> &str {
881 "slow"
882 }
883
884 fn prepare_background(
885 &self,
886 _args: &mut Value,
887 _ctx: &ToolCallContext<'_>,
888 _is_resume: bool,
889 ) -> Result<Self::PreparedBackground, ToolResult> {
890 Ok(())
891 }
892
893 async fn execute_background(
894 &self,
895 _task_id: &str,
896 _args: Value,
897 _prepared: Self::PreparedBackground,
898 _execution: BackgroundExecutionContext,
899 cancel_token: RunCancellationToken,
900 ) -> TaskResult {
901 tokio::select! {
902 _ = cancel_token.cancelled() => TaskResult::Cancelled,
903 _ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
904 TaskResult::Success(json!("completed"))
905 }
906 }
907 }
908 }
909
910 #[tokio::test]
911 async fn background_cancellation_via_cancel_token() {
912 let mgr = Arc::new(BackgroundTaskManager::new());
913 let wrapped = BackgroundCapable::new(SlowTool, mgr.clone());
914
915 let mut fix = tirea_contract::testing::TestFixture::new();
916 fix.caller_context = crate::contracts::runtime::tool_call::CallerContext::new(
917 Some("thread-1".to_string()),
918 Some("caller-run".to_string()),
919 Some("caller".to_string()),
920 vec![],
921 );
922
923 let result = wrapped
924 .execute(
925 json!({ "run_in_background": true }),
926 &fix.ctx_with("c-1", "tool:slow"),
927 )
928 .await
929 .unwrap();
930 let content: Value = result.data.clone();
931 let task_id = content["task_id"].as_str().unwrap();
932
933 let summary = mgr.get("thread-1", task_id).await.unwrap();
935 assert_eq!(summary.status, super::super::types::TaskStatus::Running);
936
937 mgr.cancel("thread-1", task_id).await.unwrap();
939 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
940
941 let summary = mgr.get("thread-1", task_id).await.unwrap();
942 assert_eq!(summary.status, super::super::types::TaskStatus::Cancelled);
943 }
944}