1use super::errors::{AgentOsResolveError, AgentOsRunError};
2use super::prepare::{
3 clear_tool_call_scope_state, request_has_user_input, run_lifecycle_running_patch,
4 run_scope_cleanup_patches, set_or_validate_parent_thread_id, ActiveRunCleanupGuard,
5};
6use super::types::{AgentOs, AgentStateStoreStateCommitter, PreparedRun, RunStream};
7use super::ResolvedRun;
8
9use crate::composition::AgentOsWiringError;
10use crate::contracts::runtime::RunIdentity;
11use crate::contracts::storage::{ThreadHead, ThreadStore, VersionPrecondition};
12use crate::contracts::thread::{CheckpointReason, Message, Thread};
13use crate::contracts::{AgentEvent, RunContext, RunRequest};
14use crate::runtime::loop_runner::{
15 run_loop_stream_with_context, AgentLoopError, RunCancellationToken, StateCommitter,
16};
17use futures::StreamExt;
18use std::sync::Arc;
19
20impl AgentOs {
21 pub fn agent_state_store(&self) -> Option<&Arc<dyn ThreadStore>> {
22 self.agent_state_store.as_ref()
23 }
24
25 fn require_agent_state_store(&self) -> Result<&Arc<dyn ThreadStore>, AgentOsRunError> {
26 self.agent_state_store
27 .as_ref()
28 .ok_or(AgentOsRunError::AgentStateStoreNotConfigured)
29 }
30
31 fn generate_id() -> String {
32 uuid::Uuid::now_v7().simple().to_string()
33 }
34
35 pub async fn load_thread(&self, id: &str) -> Result<Option<ThreadHead>, AgentOsRunError> {
38 let agent_state_store = self.require_agent_state_store()?;
39 Ok(agent_state_store.load(id).await?)
40 }
41
42 pub async fn current_run_id_for_thread(
43 &self,
44 agent_id: &str,
45 thread_id: &str,
46 ) -> Result<Option<String>, AgentOsRunError> {
47 if let Some(run_id) = self.active_run_id_for_thread(agent_id, thread_id).await {
48 return Ok(Some(run_id));
49 }
50 let store = self.require_agent_state_store()?;
51 let Some(record) = store.active_run_for_thread(thread_id).await? else {
52 return Ok(None);
53 };
54 if !record.agent_id.is_empty() && record.agent_id != agent_id {
55 return Ok(None);
56 }
57 Ok(Some(record.run_id))
58 }
59
60 async fn clear_suspended_calls_before_user_run_input(
61 &self,
62 run_request: &mut RunRequest,
63 ) -> Result<(), AgentOsRunError> {
64 let Some(thread_id) = run_request.thread_id.as_deref() else {
65 return Ok(());
66 };
67 if !request_has_user_input(&run_request.messages) {
68 return Ok(());
69 }
70
71 let store = self.require_agent_state_store()?;
72 let Some(head) = store.load(thread_id).await? else {
73 return Ok(());
74 };
75 if let Some(cleaned) = clear_tool_call_scope_state(&head.thread.state) {
76 run_request.state = Some(cleaned);
77 }
78 Ok(())
79 }
80
81 pub(crate) async fn prepare_active_run_with_persistence(
82 &self,
83 owner_agent_id: &str,
84 mut run_request: RunRequest,
85 resolved: ResolvedRun,
86 persist_run: bool,
87 strip_lineage: bool,
88 ) -> Result<(PreparedRun, String, String), AgentOsRunError> {
89 if strip_lineage {
90 run_request.run_id = None;
91 run_request.parent_run_id = None;
92 run_request.parent_thread_id = None;
93 }
94
95 let previous_run_id = if !run_request.messages.is_empty() {
96 if let Some(thread_id) = run_request.thread_id.as_deref() {
97 self.current_run_id_for_thread(owner_agent_id, thread_id)
98 .await?
99 } else {
100 None
101 }
102 } else {
103 None
104 };
105
106 self.clear_suspended_calls_before_user_run_input(&mut run_request)
107 .await?;
108
109 let prepared = self
110 .prepare_run_with_persistence(run_request, resolved, persist_run)
111 .await?;
112 let thread_id = prepared.thread_id().to_string();
113 let run_id = prepared.run_id().to_string();
114
115 if let Some(previous_run_id) = previous_run_id.filter(|candidate| candidate != &run_id) {
116 self.cancel_active_run_by_id(&previous_run_id).await;
117 }
118
119 self.register_thread_run_handle(
120 run_id.clone(),
121 owner_agent_id,
122 &thread_id,
123 RunCancellationToken::new(),
124 )
125 .await;
126
127 Ok((prepared, thread_id, run_id))
128 }
129
130 pub(crate) async fn start_prepared_active_run(
131 &self,
132 run_id: &str,
133 prepared: PreparedRun,
134 ) -> Result<RunStream, AgentOsRunError> {
135 let token = self
136 .active_thread_run_by_run_id(run_id)
137 .await
138 .ok_or_else(|| {
139 AgentOsRunError::Loop(AgentLoopError::StateError(format!(
140 "active run handle missing for run '{run_id}'",
141 )))
142 })?
143 .cancellation_token();
144 let run = Self::execute_prepared(prepared.with_cancellation_token(token))?;
145 if !self
146 .bind_thread_run_decision_tx(run_id, run.decision_tx.clone())
147 .await
148 {
149 self.remove_thread_run_handle(run_id).await;
150 return Err(AgentOsRunError::Loop(AgentLoopError::StateError(format!(
151 "active run handle missing for run '{run_id}'",
152 ))));
153 }
154 Ok(self.wrap_run_stream_with_active_handle_cleanup(run))
155 }
156
157 pub async fn start_active_run_with_persistence(
158 &self,
159 owner_agent_id: &str,
160 run_request: RunRequest,
161 resolved: ResolvedRun,
162 persist_run: bool,
163 strip_lineage: bool,
164 ) -> Result<RunStream, AgentOsRunError> {
165 let (prepared, _thread_id, run_id) = self
166 .prepare_active_run_with_persistence(
167 owner_agent_id,
168 run_request,
169 resolved,
170 persist_run,
171 strip_lineage,
172 )
173 .await?;
174 self.start_prepared_active_run(&run_id, prepared).await
175 }
176
177 fn wrap_run_stream_with_active_handle_cleanup(&self, run: RunStream) -> RunStream {
178 let RunStream {
179 thread_id,
180 run_id,
181 decision_tx,
182 events,
183 } = run;
184 let run_id_for_cleanup = run_id.clone();
185 let registry = self.active_runs.clone();
186 let events = Box::pin(futures::stream::unfold(
187 (
188 events,
189 Some(ActiveRunCleanupGuard::new(run_id_for_cleanup, registry)),
190 ),
191 |(mut inner, mut cleanup)| async move {
192 match inner.next().await {
193 Some(event) => Some((event, (inner, cleanup))),
194 None => {
195 if let Some(mut cleanup) = cleanup.take() {
196 cleanup.cleanup_now().await;
197 }
198 None
199 }
200 }
201 },
202 ));
203 RunStream {
204 thread_id,
205 run_id,
206 decision_tx,
207 events,
208 }
209 }
210
211 pub async fn prepare_run(
226 &self,
227 request: RunRequest,
228 resolved: ResolvedRun,
229 ) -> Result<PreparedRun, AgentOsRunError> {
230 let owner_agent_id = request.agent_id.clone();
231 self.prepare_active_run_with_persistence(&owner_agent_id, request, resolved, true, false)
232 .await
233 .map(|(prepared, _thread_id, _run_id)| prepared)
234 }
235
236 pub async fn prepare_run_with_persistence(
241 &self,
242 mut request: RunRequest,
243 resolved: ResolvedRun,
244 persist_run: bool,
245 ) -> Result<PreparedRun, AgentOsRunError> {
246 let agent_state_store = self.require_agent_state_store()?;
247
248 let thread_id = request.thread_id.unwrap_or_else(Self::generate_id);
249 let run_id = request.run_id.unwrap_or_else(Self::generate_id);
250 let parent_run_id = request.parent_run_id.clone();
251 let parent_thread_id = request.parent_thread_id.clone();
252 let initial_decisions = std::mem::take(&mut request.initial_decisions);
253
254 let frontend_state = request.state.take();
259 let mut state_snapshot_for_delta: Option<serde_json::Value> = None;
260 let (mut thread, mut version) = match agent_state_store.load(&thread_id).await? {
261 Some(head) => {
262 let mut t = head.thread;
263 if let Some(state) = frontend_state {
264 t.state = state.clone();
265 t.patches.clear();
266 state_snapshot_for_delta = Some(state);
267 }
268 (t, head.version)
269 }
270 None => {
271 let thread = if let Some(state) = frontend_state {
272 Thread::with_initial_state(thread_id.clone(), state)
273 } else {
274 Thread::new(thread_id.clone())
275 };
276 let committed = agent_state_store.create(&thread).await?;
277 (thread, committed.version)
278 }
279 };
280 let parent_thread_id_updated =
281 set_or_validate_parent_thread_id(&mut thread, &thread_id, parent_thread_id.as_deref())?;
282 if parent_thread_id_updated {
283 agent_state_store.save(&thread).await?;
284 let refreshed = agent_state_store.load(&thread_id).await?.ok_or_else(|| {
285 AgentOsRunError::Loop(AgentLoopError::StateError(format!(
286 "thread '{thread_id}' disappeared after parent_thread_id update",
287 )))
288 })?;
289 thread = refreshed.thread;
290 version = refreshed.version;
291 }
292
293 crate::runtime::context::trim_thread_to_latest_boundary(&mut thread);
295
296 if let Some(ref resource_id) = request.resource_id {
298 thread.resource_id = Some(resource_id.clone());
299 }
300
301 let mut deduped_messages = Self::dedup_messages(&thread, request.messages);
303 if !deduped_messages.is_empty() {
304 deduped_messages = Self::attach_run_metadata_to_messages(deduped_messages, &run_id);
305 thread = thread.with_messages(deduped_messages.clone());
306 }
307
308 let delta_messages: Vec<Arc<Message>> =
310 deduped_messages.into_iter().map(Arc::new).collect();
311 let mut delta_patches =
313 run_scope_cleanup_patches(&thread.state, &resolved.agent.state_scope_registry);
314 for cp in &delta_patches {
317 thread.state =
318 tirea_state::apply_patch(&thread.state, cp.patch()).map_err(|error| {
319 AgentOsRunError::Loop(AgentLoopError::StateError(format!(
320 "failed to apply run-scope cleanup patch for thread '{thread_id}': {error}"
321 )))
322 })?;
323 }
324 delta_patches.push(run_lifecycle_running_patch(&thread.state, &run_id)?);
325 let mut changeset = crate::contracts::ThreadChangeSet::from_parts(
326 run_id.clone(),
327 parent_run_id.clone(),
328 CheckpointReason::UserMessage,
329 delta_messages,
330 delta_patches.clone(),
331 Vec::new(),
332 state_snapshot_for_delta,
333 );
334 if persist_run {
335 changeset = changeset.with_run_meta(crate::contracts::RunMeta {
336 agent_id: request.agent_id.clone(),
337 origin: request.origin,
338 status: crate::contracts::storage::RunStatus::Running,
339 parent_thread_id: parent_thread_id.clone(),
340 termination_code: None,
341 termination_detail: None,
342 source_mailbox_entry_id: request.source_mailbox_entry_id.clone(),
343 });
344 }
345 let committed = agent_state_store
346 .append(&thread_id, &changeset, VersionPrecondition::Exact(version))
347 .await?;
348 version = committed.version;
349 thread = thread.with_patches(delta_patches);
350 thread.metadata.version = Some(version);
351
352 let mut run_identity = RunIdentity::new(
353 thread_id.clone(),
354 parent_thread_id.clone(),
355 run_id.clone(),
356 parent_run_id.clone(),
357 request.agent_id.clone(),
358 request.origin,
359 );
360 if let Some(parent_tool_call_id) = resolved.parent_tool_call_id.clone() {
361 run_identity = run_identity.with_parent_tool_call_id(parent_tool_call_id);
362 }
363
364 {
368 let ids = resolved.agent.behavior.behavior_ids();
369 let mut seen = std::collections::HashSet::with_capacity(ids.len());
370 for id in &ids {
371 if !seen.insert(*id) {
372 return Err(AgentOsRunError::Resolve(AgentOsResolveError::Wiring(
373 AgentOsWiringError::BehaviorAlreadyInstalled(id.to_string()),
374 )));
375 }
376 }
377 }
378
379 let run_ctx = RunContext::from_thread_with_registry_and_identity(
380 &thread,
381 resolved.run_policy,
382 run_identity.clone(),
383 resolved.agent.lattice_registry.clone(),
384 )
385 .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
386 let (decision_tx, decision_rx) = tokio::sync::mpsc::unbounded_channel();
387 for decision in initial_decisions {
388 decision_tx
389 .send(decision)
390 .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
391 }
392
393 Ok(PreparedRun {
394 agent: Arc::new(resolved.agent),
395 tools: resolved.tools,
396 run_ctx,
397 cancellation_token: None,
398 state_committer: Some(Arc::new(AgentStateStoreStateCommitter::new(
399 agent_state_store.clone(),
400 persist_run,
401 ))),
402 decision_tx,
403 decision_rx,
404 })
405 }
406
407 pub fn execute_prepared(prepared: PreparedRun) -> Result<RunStream, AgentOsRunError> {
409 let thread_id = prepared.thread_id().to_string();
410 let run_id = prepared.run_id().to_string();
411 let run_identity = prepared.run_ctx.run_identity().clone();
412 let events = run_loop_stream_with_context(
413 prepared.agent,
414 prepared.tools,
415 prepared.run_ctx,
416 run_identity,
417 prepared.cancellation_token,
418 prepared.state_committer,
419 Some(prepared.decision_rx),
420 );
421 Ok(RunStream {
422 thread_id,
423 run_id,
424 decision_tx: prepared.decision_tx,
425 events,
426 })
427 }
428
429 pub async fn run_stream(&self, request: RunRequest) -> Result<RunStream, AgentOsRunError> {
435 let resolved = self.resolve(&request.agent_id)?;
436 let prepared = self.prepare_run(request, resolved).await?;
437 Self::execute_prepared(prepared)
438 }
439
440 fn dedup_messages(thread: &Thread, incoming: Vec<Message>) -> Vec<Message> {
444 use std::collections::HashSet;
445
446 let existing_ids: HashSet<&str> = thread
447 .messages
448 .iter()
449 .filter_map(|m| m.id.as_deref())
450 .collect();
451 let existing_tool_call_ids: HashSet<&str> = thread
452 .messages
453 .iter()
454 .filter_map(|m| m.tool_call_id.as_deref())
455 .collect();
456
457 incoming
458 .into_iter()
459 .filter(|m| {
460 if let Some(ref tc_id) = m.tool_call_id {
462 if existing_tool_call_ids.contains(tc_id.as_str()) {
463 return false;
464 }
465 }
466 if let Some(ref id) = m.id {
468 if existing_ids.contains(id.as_str()) {
469 return false;
470 }
471 }
472 true
473 })
474 .collect()
475 }
476
477 fn attach_run_metadata_to_messages(mut messages: Vec<Message>, run_id: &str) -> Vec<Message> {
478 messages.iter_mut().for_each(|message| {
479 let mut metadata = message.metadata.clone().unwrap_or_default();
480 metadata.run_id = Some(run_id.to_string());
481 message.metadata = Some(metadata);
482 });
483 messages
484 }
485
486 #[deprecated(note = "Use prepare_run + execute_prepared instead")]
489 #[allow(dead_code)]
490 pub(crate) fn run_stream_with_context(
491 &self,
492 agent_id: &str,
493 thread: Thread,
494 cancellation_token: Option<RunCancellationToken>,
495 state_committer: Option<Arc<dyn StateCommitter>>,
496 ) -> Result<impl futures::Stream<Item = AgentEvent> + Send, AgentOsRunError> {
497 let resolved = self.resolve(agent_id)?;
498 let run_identity = RunIdentity::new(
499 thread.id.clone(),
500 thread.parent_thread_id.clone(),
501 thread.id.clone(),
502 None,
503 agent_id.to_string(),
504 crate::contracts::storage::RunOrigin::Internal,
505 );
506 let run_ctx = RunContext::from_thread_with_registry_and_identity(
507 &thread,
508 resolved.run_policy,
509 run_identity.clone(),
510 resolved.agent.lattice_registry.clone(),
511 )
512 .map_err(|e| AgentOsRunError::Loop(AgentLoopError::StateError(e.to_string())))?;
513 Ok(run_loop_stream_with_context(
514 Arc::new(resolved.agent),
515 resolved.tools,
516 run_ctx,
517 run_identity,
518 cancellation_token,
519 state_committer,
520 None,
521 ))
522 }
523}