tirea_agentos/runtime/
run.rs

1use super::errors::{AgentOsResolveError, AgentOsRunError};
2use super::prepare::{
3    clear_tool_call_scope_state, request_has_user_input, run_lifecycle_running_patch,
4    run_scope_cleanup_patches, set_or_validate_parent_thread_id, ActiveRunCleanupGuard,
5};
6use super::types::{AgentOs, AgentStateStoreStateCommitter, PreparedRun, RunStream};
7use super::ResolvedRun;
8
9use crate::composition::AgentOsWiringError;
10use crate::contracts::runtime::RunIdentity;
11use crate::contracts::storage::{ThreadHead, ThreadStore, VersionPrecondition};
12use crate::contracts::thread::{CheckpointReason, Message, Thread};
13use crate::contracts::{AgentEvent, RunContext, RunRequest};
14use crate::runtime::loop_runner::{
15    run_loop_stream_with_context, AgentLoopError, RunCancellationToken, StateCommitter,
16};
17use futures::StreamExt;
18use std::sync::Arc;
19
20impl AgentOs {
21    pub fn agent_state_store(&self) -> Option<&Arc<dyn ThreadStore>> {
22        self.agent_state_store.as_ref()
23    }
24
25    fn require_agent_state_store(&self) -> Result<&Arc<dyn ThreadStore>, AgentOsRunError> {
26        self.agent_state_store
27            .as_ref()
28            .ok_or(AgentOsRunError::AgentStateStoreNotConfigured)
29    }
30
31    fn generate_id() -> String {
32        uuid::Uuid::now_v7().simple().to_string()
33    }
34
35    /// Load a thread from storage. Returns the thread and its version.
36    /// If the thread does not exist, returns `None`.
37    pub async fn load_thread(&self, id: &str) -> Result<Option<ThreadHead>, AgentOsRunError> {
38        let agent_state_store = self.require_agent_state_store()?;
39        Ok(agent_state_store.load(id).await?)
40    }
41
42    pub async fn current_run_id_for_thread(
43        &self,
44        agent_id: &str,
45        thread_id: &str,
46    ) -> Result<Option<String>, AgentOsRunError> {
47        if let Some(run_id) = self.active_run_id_for_thread(agent_id, thread_id).await {
48            return Ok(Some(run_id));
49        }
50        let store = self.require_agent_state_store()?;
51        let Some(record) = store.active_run_for_thread(thread_id).await? else {
52            return Ok(None);
53        };
54        if !record.agent_id.is_empty() && record.agent_id != agent_id {
55            return Ok(None);
56        }
57        Ok(Some(record.run_id))
58    }
59
60    async fn clear_suspended_calls_before_user_run_input(
61        &self,
62        run_request: &mut RunRequest,
63    ) -> Result<(), AgentOsRunError> {
64        let Some(thread_id) = run_request.thread_id.as_deref() else {
65            return Ok(());
66        };
67        if !request_has_user_input(&run_request.messages) {
68            return Ok(());
69        }
70
71        let store = self.require_agent_state_store()?;
72        let Some(head) = store.load(thread_id).await? else {
73            return Ok(());
74        };
75        if let Some(cleaned) = clear_tool_call_scope_state(&head.thread.state) {
76            run_request.state = Some(cleaned);
77        }
78        Ok(())
79    }
80
81    pub(crate) async fn prepare_active_run_with_persistence(
82        &self,
83        owner_agent_id: &str,
84        mut run_request: RunRequest,
85        resolved: ResolvedRun,
86        persist_run: bool,
87        strip_lineage: bool,
88    ) -> Result<(PreparedRun, String, String), AgentOsRunError> {
89        if strip_lineage {
90            run_request.run_id = None;
91            run_request.parent_run_id = None;
92            run_request.parent_thread_id = None;
93        }
94
95        let previous_run_id = if !run_request.messages.is_empty() {
96            if let Some(thread_id) = run_request.thread_id.as_deref() {
97                self.current_run_id_for_thread(owner_agent_id, thread_id)
98                    .await?
99            } else {
100                None
101            }
102        } else {
103            None
104        };
105
106        self.clear_suspended_calls_before_user_run_input(&mut run_request)
107            .await?;
108
109        let prepared = self
110            .prepare_run_with_persistence(run_request, resolved, persist_run)
111            .await?;
112        let thread_id = prepared.thread_id().to_string();
113        let run_id = prepared.run_id().to_string();
114
115        if let Some(previous_run_id) = previous_run_id.filter(|candidate| candidate != &run_id) {
116            self.cancel_active_run_by_id(&previous_run_id).await;
117        }
118
119        self.register_thread_run_handle(
120            run_id.clone(),
121            owner_agent_id,
122            &thread_id,
123            RunCancellationToken::new(),
124        )
125        .await;
126
127        Ok((prepared, thread_id, run_id))
128    }
129
130    pub(crate) async fn start_prepared_active_run(
131        &self,
132        run_id: &str,
133        prepared: PreparedRun,
134    ) -> Result<RunStream, AgentOsRunError> {
135        let token = self
136            .active_thread_run_by_run_id(run_id)
137            .await
138            .ok_or_else(|| {
139                AgentOsRunError::Loop(AgentLoopError::StateError(format!(
140                    "active run handle missing for run '{run_id}'",
141                )))
142            })?
143            .cancellation_token();
144        let run = Self::execute_prepared(prepared.with_cancellation_token(token))?;
145        if !self
146            .bind_thread_run_decision_tx(run_id, run.decision_tx.clone())
147            .await
148        {
149            self.remove_thread_run_handle(run_id).await;
150            return Err(AgentOsRunError::Loop(AgentLoopError::StateError(format!(
151                "active run handle missing for run '{run_id}'",
152            ))));
153        }
154        Ok(self.wrap_run_stream_with_active_handle_cleanup(run))
155    }
156
157    pub async fn start_active_run_with_persistence(
158        &self,
159        owner_agent_id: &str,
160        run_request: RunRequest,
161        resolved: ResolvedRun,
162        persist_run: bool,
163        strip_lineage: bool,
164    ) -> Result<RunStream, AgentOsRunError> {
165        let (prepared, _thread_id, run_id) = self
166            .prepare_active_run_with_persistence(
167                owner_agent_id,
168                run_request,
169                resolved,
170                persist_run,
171                strip_lineage,
172            )
173            .await?;
174        self.start_prepared_active_run(&run_id, prepared).await
175    }
176
177    fn wrap_run_stream_with_active_handle_cleanup(&self, run: RunStream) -> RunStream {
178        let RunStream {
179            thread_id,
180            run_id,
181            decision_tx,
182            events,
183        } = run;
184        let run_id_for_cleanup = run_id.clone();
185        let registry = self.active_runs.clone();
186        let events = Box::pin(futures::stream::unfold(
187            (
188                events,
189                Some(ActiveRunCleanupGuard::new(run_id_for_cleanup, registry)),
190            ),
191            |(mut inner, mut cleanup)| async move {
192                match inner.next().await {
193                    Some(event) => Some((event, (inner, cleanup))),
194                    None => {
195                        if let Some(mut cleanup) = cleanup.take() {
196                            cleanup.cleanup_now().await;
197                        }
198                        None
199                    }
200                }
201            },
202        ));
203        RunStream {
204            thread_id,
205            run_id,
206            decision_tx,
207            events,
208        }
209    }
210
211    /// Prepare a resolved run for execution.
212    ///
213    /// This handles all deterministic pre-run logic:
214    /// 1. Thread loading/creation from storage
215    /// 2. Message deduplication and appending
216    /// 3. Persisting pre-run state
217    /// 4. Run-context creation
218    ///
219    /// Callers resolve first, optionally customize, then prepare:
220    /// ```ignore
221    /// let mut resolved = os.resolve("my-agent")?;
222    /// resolved.tools.insert("extra".into(), tool);
223    /// let prepared = os.prepare_run(request, resolved).await?;
224    /// ```
225    pub async fn prepare_run(
226        &self,
227        request: RunRequest,
228        resolved: ResolvedRun,
229    ) -> Result<PreparedRun, AgentOsRunError> {
230        let owner_agent_id = request.agent_id.clone();
231        self.prepare_active_run_with_persistence(&owner_agent_id, request, resolved, true, false)
232            .await
233            .map(|(prepared, _thread_id, _run_id)| prepared)
234    }
235
236    /// Prepare a resolved run and control whether the run should be persisted.
237    ///
238    /// This powers dialog-style runs where short-lived execution state is needed
239    /// but we intentionally do not keep durable run records.
240    pub async fn prepare_run_with_persistence(
241        &self,
242        mut request: RunRequest,
243        resolved: ResolvedRun,
244        persist_run: bool,
245    ) -> Result<PreparedRun, AgentOsRunError> {
246        let agent_state_store = self.require_agent_state_store()?;
247
248        let thread_id = request.thread_id.unwrap_or_else(Self::generate_id);
249        let run_id = request.run_id.unwrap_or_else(Self::generate_id);
250        let parent_run_id = request.parent_run_id.clone();
251        let parent_thread_id = request.parent_thread_id.clone();
252        let initial_decisions = std::mem::take(&mut request.initial_decisions);
253
254        // 1. Load or create thread
255        //    If frontend sent a state snapshot, apply it:
256        //    - New thread: used as initial state
257        //    - Existing thread: replaces current state (persisted in UserMessage delta)
258        let frontend_state = request.state.take();
259        let mut state_snapshot_for_delta: Option<serde_json::Value> = None;
260        let (mut thread, mut version) = match agent_state_store.load(&thread_id).await? {
261            Some(head) => {
262                let mut t = head.thread;
263                if let Some(state) = frontend_state {
264                    t.state = state.clone();
265                    t.patches.clear();
266                    state_snapshot_for_delta = Some(state);
267                }
268                (t, head.version)
269            }
270            None => {
271                let thread = if let Some(state) = frontend_state {
272                    Thread::with_initial_state(thread_id.clone(), state)
273                } else {
274                    Thread::new(thread_id.clone())
275                };
276                let committed = agent_state_store.create(&thread).await?;
277                (thread, committed.version)
278            }
279        };
280        let parent_thread_id_updated =
281            set_or_validate_parent_thread_id(&mut thread, &thread_id, parent_thread_id.as_deref())?;
282        if parent_thread_id_updated {
283            agent_state_store.save(&thread).await?;
284            let refreshed = agent_state_store.load(&thread_id).await?.ok_or_else(|| {
285                AgentOsRunError::Loop(AgentLoopError::StateError(format!(
286                    "thread '{thread_id}' disappeared after parent_thread_id update",
287                )))
288            })?;
289            thread = refreshed.thread;
290            version = refreshed.version;
291        }
292
293        // 1a. Lazy context loading: trim pre-boundary messages.
294        crate::runtime::context::trim_thread_to_latest_boundary(&mut thread);
295
296        // 2. Set resource_id on thread if provided
297        if let Some(ref resource_id) = request.resource_id {
298            thread.resource_id = Some(resource_id.clone());
299        }
300
301        // 3. Deduplicate and append inbound messages
302        let mut deduped_messages = Self::dedup_messages(&thread, request.messages);
303        if !deduped_messages.is_empty() {
304            deduped_messages = Self::attach_run_metadata_to_messages(deduped_messages, &run_id);
305            thread = thread.with_messages(deduped_messages.clone());
306        }
307
308        // 4. Persist run-start changes (user messages + frontend state snapshot + run state)
309        let delta_messages: Vec<Arc<Message>> =
310            deduped_messages.into_iter().map(Arc::new).collect();
311        // 4a. Clean up stale Run-scoped state from any previous run.
312        let mut delta_patches =
313            run_scope_cleanup_patches(&thread.state, &resolved.agent.state_scope_registry);
314        // 4b. Apply cleanup patches to in-memory thread state so the lifecycle
315        //     patch reducer sees a clean base.
316        for cp in &delta_patches {
317            thread.state =
318                tirea_state::apply_patch(&thread.state, cp.patch()).map_err(|error| {
319                    AgentOsRunError::Loop(AgentLoopError::StateError(format!(
320                        "failed to apply run-scope cleanup patch for thread '{thread_id}': {error}"
321                    )))
322                })?;
323        }
324        delta_patches.push(run_lifecycle_running_patch(&thread.state, &run_id)?);
325        let mut changeset = crate::contracts::ThreadChangeSet::from_parts(
326            run_id.clone(),
327            parent_run_id.clone(),
328            CheckpointReason::UserMessage,
329            delta_messages,
330            delta_patches.clone(),
331            Vec::new(),
332            state_snapshot_for_delta,
333        );
334        if persist_run {
335            changeset = changeset.with_run_meta(crate::contracts::RunMeta {
336                agent_id: request.agent_id.clone(),
337                origin: request.origin,
338                status: crate::contracts::storage::RunStatus::Running,
339                parent_thread_id: parent_thread_id.clone(),
340                termination_code: None,
341                termination_detail: None,
342                source_mailbox_entry_id: request.source_mailbox_entry_id.clone(),
343            });
344        }
345        let committed = agent_state_store
346            .append(&thread_id, &changeset, VersionPrecondition::Exact(version))
347            .await?;
348        version = committed.version;
349        thread = thread.with_patches(delta_patches);
350        thread.metadata.version = Some(version);
351
352        let mut run_identity = RunIdentity::new(
353            thread_id.clone(),
354            parent_thread_id.clone(),
355            run_id.clone(),
356            parent_run_id.clone(),
357            request.agent_id.clone(),
358            request.origin,
359        );
360        if let Some(parent_tool_call_id) = resolved.parent_tool_call_id.clone() {
361            run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
362        }
363
364        // 6. Behavior uniqueness: wiring ensures base uniqueness, but callers
365        //    may mutate `resolved.agent.behavior` after resolve.
366        //    Validate the final composed behavior_ids for duplicates.
367        {
368            let ids = resolved.agent.behavior.behavior_ids();
369            let mut seen = std::collections::HashSet::with_capacity(ids.len());
370            for id in &ids {
371                if !seen.insert(*id) {
372                    return Err(AgentOsRunError::Resolve(AgentOsResolveError::Wiring(
373                        AgentOsWiringError::BehaviorAlreadyInstalled(id.to_string()),
374                    )));
375                }
376            }
377        }
378
379        let run_ctx = RunContext::from_thread_with_registry_and_identity(
380            &thread,
381            resolved.run_policy,
382            run_identity.clone(),
383            resolved.agent.lattice_registry.clone(),
384        )
385        .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
386        let (decision_tx, decision_rx) = tokio::sync::mpsc::unbounded_channel();
387        for decision in initial_decisions {
388            decision_tx
389                .send(decision)
390                .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
391        }
392
393        Ok(PreparedRun {
394            agent: Arc::new(resolved.agent),
395            tools: resolved.tools,
396            run_ctx,
397            cancellation_token: None,
398            state_committer: Some(Arc::new(AgentStateStoreStateCommitter::new(
399                agent_state_store.clone(),
400                persist_run,
401            ))),
402            decision_tx,
403            decision_rx,
404        })
405    }
406
407    /// Execute a previously prepared run.
408    pub fn execute_prepared(prepared: PreparedRun) -> Result<RunStream, AgentOsRunError> {
409        let thread_id = prepared.thread_id().to_string();
410        let run_id = prepared.run_id().to_string();
411        let run_identity = prepared.run_ctx.run_identity().clone();
412        let events = run_loop_stream_with_context(
413            prepared.agent,
414            prepared.tools,
415            prepared.run_ctx,
416            run_identity,
417            prepared.cancellation_token,
418            prepared.state_committer,
419            Some(prepared.decision_rx),
420        );
421        Ok(RunStream {
422            thread_id,
423            run_id,
424            decision_tx: prepared.decision_tx,
425            events,
426        })
427    }
428
429    /// Resolve, prepare, and execute an agent run.
430    ///
431    /// This is the primary entry point. Callers that need to customize
432    /// the resolved wiring should use [`resolve`] + mutation + [`prepare_run`]
433    /// + [`execute_prepared`] instead.
434    pub async fn run_stream(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
435        let resolved = self.resolve(&request.agent_id)?;
436        let prepared = self.prepare_run(request, resolved).await?;
437        Self::execute_prepared(prepared)
438    }
439
440    /// Deduplicate incoming messages against existing thread messages.
441    ///
442    /// Skips messages whose ID or tool_call_id already exists in the thread.
443    fn dedup_messages(thread: &Thread, incoming: Vec<Message>) -> Vec<Message> {
444        use std::collections::HashSet;
445
446        let existing_ids: HashSet<&str> = thread
447            .messages
448            .iter()
449            .filter_map(|m| m.id.as_deref())
450            .collect();
451        let existing_tool_call_ids: HashSet<&str> = thread
452            .messages
453            .iter()
454            .filter_map(|m| m.tool_call_id.as_deref())
455            .collect();
456
457        incoming
458            .into_iter()
459            .filter(|m| {
460                // Dedup tool messages by tool_call_id
461                if let Some(ref tc_id) = m.tool_call_id {
462                    if existing_tool_call_ids.contains(tc_id.as_str()) {
463                        return false;
464                    }
465                }
466                // Dedup by message id
467                if let Some(ref id) = m.id {
468                    if existing_ids.contains(id.as_str()) {
469                        return false;
470                    }
471                }
472                true
473            })
474            .collect()
475    }
476
477    fn attach_run_metadata_to_messages(mut messages: Vec<Message>, run_id: &str) -> Vec<Message> {
478        messages.iter_mut().for_each(|message| {
479            let mut metadata = message.metadata.clone().unwrap_or_default();
480            metadata.run_id = Some(run_id.to_string());
481            message.metadata = Some(metadata);
482        });
483        messages
484    }
485
486    // --- Internal low-level helper (legacy) ---
487
488    #[deprecated(note = "Use prepare_run + execute_prepared instead")]
489    #[allow(dead_code)]
490    pub(crate) fn run_stream_with_context(
491        &self,
492        agent_id: &str,
493        thread: Thread,
494        cancellation_token: Option<RunCancellationToken>,
495        state_committer: Option<Arc<dyn StateCommitter>>,
496    ) -> Result<impl futures::Stream<Item = AgentEvent> + Send, AgentOsRunError> {
497        let resolved = self.resolve(agent_id)?;
498        let run_identity = RunIdentity::new(
499            thread.id.clone(),
500            thread.parent_thread_id.clone(),
501            thread.id.clone(),
502            None,
503            agent_id.to_string(),
504            crate::contracts::storage::RunOrigin::Internal,
505        );
506        let run_ctx = RunContext::from_thread_with_registry_and_identity(
507            &thread,
508            resolved.run_policy,
509            run_identity.clone(),
510            resolved.agent.lattice_registry.clone(),
511        )
512        .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
513        Ok(run_loop_stream_with_context(
514            Arc::new(resolved.agent),
515            resolved.tools,
516            run_ctx,
517            run_identity,
518            cancellation_token,
519            state_committer,
520            None,
521        ))
522    }
523}