tirea_agentos/runtime/
types.rs

1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use futures::Stream;
6use genai::Client;
7
8use crate::composition::{
9    AgentCatalog, AgentRegistry, AgentToolsConfig, BehaviorRegistry, ModelRegistry,
10    ProviderRegistry, RegistrySet, StopPolicyRegistry, SystemWiring, ToolRegistry,
11};
12use crate::contracts::runtime::tool_call::Tool;
13use crate::contracts::runtime::RunIdentity;
14use crate::contracts::storage::{ThreadStore, VersionPrecondition};
15use crate::contracts::{AgentEvent, RunContext, ToolCallDecision};
16#[cfg(feature = "skills")]
17use crate::extensions::skills::SkillRegistry;
18use crate::runtime::loop_runner::{Agent, RunCancellationToken, StateCommitError, StateCommitter};
19
20use super::agent_tools::SubAgentHandleTable;
21use super::background_tasks::{BackgroundTaskManager, TaskStore};
22use super::thread_run;
23
24/// Result of [`AgentOs::run_stream`]: an event stream plus metadata.
25///
26/// Checkpoint persistence is handled internally in stream order — callers only
27/// consume the event stream and use the IDs for protocol encoding.
28///
29/// The final thread is **not** exposed here; storage is updated incrementally
30/// via `ThreadChangeSet` appends.
31pub struct RunStream {
32    /// Resolved thread ID (may have been auto-generated).
33    pub thread_id: String,
34    /// Resolved run ID (may have been auto-generated).
35    pub run_id: String,
36    /// Sender for runtime interaction decisions (approve/deny payloads).
37    ///
38    /// The receiver is owned by the running loop. Sending a decision while the
39    /// run is active allows mid-run resolution of suspended tool calls.
40    pub decision_tx: tokio::sync::mpsc::UnboundedSender<ToolCallDecision>,
41    /// The agent event stream.
42    pub events: Pin<Box<dyn Stream<Item = AgentEvent> + Send>>,
43}
44
45impl RunStream {
46    /// Submit one interaction decision to the active run.
47    pub fn submit_decision(
48        &self,
49        decision: ToolCallDecision,
50    ) -> Result<(), tokio::sync::mpsc::error::SendError<ToolCallDecision>> {
51        self.decision_tx.send(decision)
52    }
53}
54
55/// Fully prepared run payload ready for execution.
56///
57/// This separates request preprocessing from stream execution so preprocessing
58/// can be unit-tested deterministically.
59pub struct PreparedRun {
60    pub(crate) agent: Arc<dyn Agent>,
61    pub(crate) tools: HashMap<String, Arc<dyn Tool>>,
62    pub(crate) run_ctx: RunContext,
63    pub(crate) cancellation_token: Option<RunCancellationToken>,
64    pub(crate) state_committer: Option<Arc<dyn StateCommitter>>,
65    pub(crate) decision_tx: tokio::sync::mpsc::UnboundedSender<ToolCallDecision>,
66    pub(crate) decision_rx: tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>,
67}
68
69impl PreparedRun {
70    /// Resolved thread ID (may have been auto-generated).
71    pub fn thread_id(&self) -> &str {
72        self.run_ctx.thread_id()
73    }
74
75    /// Resolved run ID (may have been auto-generated).
76    pub fn run_id(&self) -> &str {
77        self.run_ctx
78            .run_identity()
79            .run_id_opt()
80            .expect("prepared runs always carry a run id")
81    }
82
83    /// Strongly typed identity for the prepared run.
84    pub fn run_identity(&self) -> &RunIdentity {
85        self.run_ctx.run_identity()
86    }
87
88    /// Attach a cooperative cancellation token for this prepared run.
89    ///
90    /// This keeps loop cancellation wiring outside protocol/UI layers:
91    /// transport code can own token lifecycle and inject it before execution.
92    #[must_use]
93    pub fn with_cancellation_token(mut self, token: RunCancellationToken) -> Self {
94        self.cancellation_token = Some(token);
95        self
96    }
97}
98
99#[derive(Clone)]
100pub struct AgentOs {
101    pub(crate) default_client: Client,
102    pub(crate) agents: Arc<dyn AgentRegistry>,
103    pub(crate) agent_catalog: Arc<dyn AgentCatalog>,
104    pub(crate) base_tools: Arc<dyn ToolRegistry>,
105    pub(crate) behaviors: Arc<dyn BehaviorRegistry>,
106    pub(crate) providers: Arc<dyn ProviderRegistry>,
107    pub(crate) models: Arc<dyn ModelRegistry>,
108    pub(crate) stop_policies: Arc<dyn StopPolicyRegistry>,
109    #[cfg(feature = "skills")]
110    pub(crate) skills_registry: Option<Arc<dyn SkillRegistry>>,
111    pub(crate) system_wirings: Vec<Arc<dyn SystemWiring>>,
112    pub(crate) sub_agent_handles: Arc<SubAgentHandleTable>,
113    pub(crate) background_tasks: Arc<BackgroundTaskManager>,
114    pub(crate) active_runs: Arc<thread_run::ActiveThreadRunRegistry>,
115    pub(crate) agent_tools: AgentToolsConfig,
116    pub(crate) agent_state_store: Option<Arc<dyn ThreadStore>>,
117}
118
119pub(crate) struct RuntimeServices {
120    pub default_client: Client,
121    pub system_wirings: Vec<Arc<dyn SystemWiring>>,
122    pub agent_tools: AgentToolsConfig,
123    pub agent_state_store: Option<Arc<dyn ThreadStore>>,
124    pub agent_catalog: Arc<dyn AgentCatalog>,
125}
126
127impl AgentOs {
128    pub(crate) fn from_registry_set(registries: RegistrySet, services: RuntimeServices) -> Self {
129        let background_task_store = services
130            .agent_state_store
131            .as_ref()
132            .map(|store| Arc::new(TaskStore::new(store.clone())));
133
134        Self {
135            default_client: services.default_client,
136            agents: registries.agents,
137            agent_catalog: services.agent_catalog,
138            base_tools: registries.tools,
139            behaviors: registries.behaviors,
140            providers: registries.providers,
141            models: registries.models,
142            stop_policies: registries.stop_policies,
143            #[cfg(feature = "skills")]
144            skills_registry: registries.skills,
145            system_wirings: services.system_wirings,
146            sub_agent_handles: Arc::new(SubAgentHandleTable::new()),
147            background_tasks: Arc::new(BackgroundTaskManager::with_task_store(
148                background_task_store,
149            )),
150            active_runs: Arc::new(thread_run::ActiveThreadRunRegistry::default()),
151            agent_tools: services.agent_tools,
152            agent_state_store: services.agent_state_store,
153        }
154    }
155}
156
157#[derive(Clone)]
158pub(crate) struct AgentStateStoreStateCommitter {
159    agent_state_store: Arc<dyn ThreadStore>,
160    persist_run_mapping: bool,
161}
162
163impl AgentStateStoreStateCommitter {
164    pub(crate) fn new(agent_state_store: Arc<dyn ThreadStore>, persist_run_mapping: bool) -> Self {
165        Self {
166            agent_state_store,
167            persist_run_mapping,
168        }
169    }
170}
171
172#[async_trait::async_trait]
173impl StateCommitter for AgentStateStoreStateCommitter {
174    async fn commit(
175        &self,
176        thread_id: &str,
177        mut changeset: crate::contracts::ThreadChangeSet,
178        precondition: VersionPrecondition,
179    ) -> Result<u64, StateCommitError> {
180        if !self.persist_run_mapping {
181            changeset.run_meta = None;
182        }
183        self.agent_state_store
184            .append(thread_id, &changeset, precondition)
185            .await
186            .map(|committed| committed.version)
187            .map_err(|e| StateCommitError::new(format!("checkpoint append failed: {e}")))
188    }
189}