tirea_agentos/runtime/
types.rs1use std::collections::HashMap;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use futures::Stream;
6use genai::Client;
7
8use crate::composition::{
9 AgentCatalog, AgentRegistry, AgentToolsConfig, BehaviorRegistry, ModelRegistry,
10 ProviderRegistry, RegistrySet, StopPolicyRegistry, SystemWiring, ToolRegistry,
11};
12use crate::contracts::runtime::tool_call::Tool;
13use crate::contracts::runtime::RunIdentity;
14use crate::contracts::storage::{ThreadStore, VersionPrecondition};
15use crate::contracts::{AgentEvent, RunContext, ToolCallDecision};
16#[cfg(feature = "skills")]
17use crate::extensions::skills::SkillRegistry;
18use crate::runtime::loop_runner::{Agent, RunCancellationToken, StateCommitError, StateCommitter};
19
20use super::agent_tools::SubAgentHandleTable;
21use super::background_tasks::{BackgroundTaskManager, TaskStore};
22use super::thread_run;
23
24pub struct RunStream {
32 pub thread_id: String,
34 pub run_id: String,
36 pub decision_tx: tokio::sync::mpsc::UnboundedSender<ToolCallDecision>,
41 pub events: Pin<Box<dyn Stream<Item = AgentEvent> + Send>>,
43}
44
45impl RunStream {
46 pub fn submit_decision(
48 &self,
49 decision: ToolCallDecision,
50 ) -> Result<(), tokio::sync::mpsc::error::SendError<ToolCallDecision>> {
51 self.decision_tx.send(decision)
52 }
53}
54
55pub struct PreparedRun {
60 pub(crate) agent: Arc<dyn Agent>,
61 pub(crate) tools: HashMap<String, Arc<dyn Tool>>,
62 pub(crate) run_ctx: RunContext,
63 pub(crate) cancellation_token: Option<RunCancellationToken>,
64 pub(crate) state_committer: Option<Arc<dyn StateCommitter>>,
65 pub(crate) decision_tx: tokio::sync::mpsc::UnboundedSender<ToolCallDecision>,
66 pub(crate) decision_rx: tokio::sync::mpsc::UnboundedReceiver<ToolCallDecision>,
67}
68
69impl PreparedRun {
70 pub fn thread_id(&self) -> &str {
72 self.run_ctx.thread_id()
73 }
74
75 pub fn run_id(&self) -> &str {
77 self.run_ctx
78 .run_identity()
79 .run_id_opt()
80 .expect("prepared runs always carry a run id")
81 }
82
83 pub fn run_identity(&self) -> &RunIdentity {
85 self.run_ctx.run_identity()
86 }
87
88 #[must_use]
93 pub fn with_cancellation_token(mut self, token: RunCancellationToken) -> Self {
94 self.cancellation_token = Some(token);
95 self
96 }
97}
98
99#[derive(Clone)]
100pub struct AgentOs {
101 pub(crate) default_client: Client,
102 pub(crate) agents: Arc<dyn AgentRegistry>,
103 pub(crate) agent_catalog: Arc<dyn AgentCatalog>,
104 pub(crate) base_tools: Arc<dyn ToolRegistry>,
105 pub(crate) behaviors: Arc<dyn BehaviorRegistry>,
106 pub(crate) providers: Arc<dyn ProviderRegistry>,
107 pub(crate) models: Arc<dyn ModelRegistry>,
108 pub(crate) stop_policies: Arc<dyn StopPolicyRegistry>,
109 #[cfg(feature = "skills")]
110 pub(crate) skills_registry: Option<Arc<dyn SkillRegistry>>,
111 pub(crate) system_wirings: Vec<Arc<dyn SystemWiring>>,
112 pub(crate) sub_agent_handles: Arc<SubAgentHandleTable>,
113 pub(crate) background_tasks: Arc<BackgroundTaskManager>,
114 pub(crate) active_runs: Arc<thread_run::ActiveThreadRunRegistry>,
115 pub(crate) agent_tools: AgentToolsConfig,
116 pub(crate) agent_state_store: Option<Arc<dyn ThreadStore>>,
117}
118
119pub(crate) struct RuntimeServices {
120 pub default_client: Client,
121 pub system_wirings: Vec<Arc<dyn SystemWiring>>,
122 pub agent_tools: AgentToolsConfig,
123 pub agent_state_store: Option<Arc<dyn ThreadStore>>,
124 pub agent_catalog: Arc<dyn AgentCatalog>,
125}
126
127impl AgentOs {
128 pub(crate) fn from_registry_set(registries: RegistrySet, services: RuntimeServices) -> Self {
129 let background_task_store = services
130 .agent_state_store
131 .as_ref()
132 .map(|store| Arc::new(TaskStore::new(store.clone())));
133
134 Self {
135 default_client: services.default_client,
136 agents: registries.agents,
137 agent_catalog: services.agent_catalog,
138 base_tools: registries.tools,
139 behaviors: registries.behaviors,
140 providers: registries.providers,
141 models: registries.models,
142 stop_policies: registries.stop_policies,
143 #[cfg(feature = "skills")]
144 skills_registry: registries.skills,
145 system_wirings: services.system_wirings,
146 sub_agent_handles: Arc::new(SubAgentHandleTable::new()),
147 background_tasks: Arc::new(BackgroundTaskManager::with_task_store(
148 background_task_store,
149 )),
150 active_runs: Arc::new(thread_run::ActiveThreadRunRegistry::default()),
151 agent_tools: services.agent_tools,
152 agent_state_store: services.agent_state_store,
153 }
154 }
155}
156
157#[derive(Clone)]
158pub(crate) struct AgentStateStoreStateCommitter {
159 agent_state_store: Arc<dyn ThreadStore>,
160 persist_run_mapping: bool,
161}
162
163impl AgentStateStoreStateCommitter {
164 pub(crate) fn new(agent_state_store: Arc<dyn ThreadStore>, persist_run_mapping: bool) -> Self {
165 Self {
166 agent_state_store,
167 persist_run_mapping,
168 }
169 }
170}
171
172#[async_trait::async_trait]
173impl StateCommitter for AgentStateStoreStateCommitter {
174 async fn commit(
175 &self,
176 thread_id: &str,
177 mut changeset: crate::contracts::ThreadChangeSet,
178 precondition: VersionPrecondition,
179 ) -> Result<u64, StateCommitError> {
180 if !self.persist_run_mapping {
181 changeset.run_meta = None;
182 }
183 self.agent_state_store
184 .append(thread_id, &changeset, precondition)
185 .await
186 .map(|committed| committed.version)
187 .map_err(|e| StateCommitError::new(format!("checkpoint append failed: {e}")))
188 }
189}