tirea_agentos_server/service/
run.rs1use std::sync::Arc;
2use tirea_agentos::contracts::storage::{ThreadReader, ThreadStore};
3use tirea_agentos::contracts::RunRequest;
4use tirea_agentos::contracts::ToolCallDecision;
5use tirea_agentos::runtime::{AgentOs, AgentOsRunError, ForwardedDecision, ResolvedRun};
6use tirea_contract::storage::RunRecord;
7use tirea_contract::RuntimeInput;
8use tokio::sync::mpsc;
9
10use crate::transport::runtime_endpoint::RunStarter;
11
12use super::mailbox_service::MailboxService;
13use super::ApiError;
14use super::EnqueueOptions;
15
16pub async fn current_run_id_for_thread(
17 os: &Arc<AgentOs>,
18 agent_id: &str,
19 thread_id: &str,
20 read_store: &dyn ThreadReader,
21) -> Result<Option<String>, ApiError> {
22 match os.current_run_id_for_thread(agent_id, thread_id).await {
23 Ok(found) => Ok(found),
24 Err(AgentOsRunError::AgentStateStoreNotConfigured) => {
25 let Some(record) = read_store
26 .active_run_for_thread(thread_id)
27 .await
28 .map_err(|e| ApiError::Internal(e.to_string()))?
29 else {
30 return Ok(None);
31 };
32 if !record.agent_id.is_empty() && record.agent_id != agent_id {
33 return Ok(None);
34 }
35 Ok(Some(record.run_id))
36 }
37 Err(other) => Err(ApiError::Internal(other.to_string())),
38 }
39}
40
41async fn try_forward_decisions_by_thread(
42 os: &Arc<AgentOs>,
43 agent_id: &str,
44 thread_id: &str,
45 decisions: &[ToolCallDecision],
46) -> Option<ForwardedDecision> {
47 os.forward_decisions_by_thread(agent_id, thread_id, decisions)
48 .await
49}
50
51pub async fn forward_dialog_decisions_by_thread(
52 os: &Arc<AgentOs>,
53 agent_id: &str,
54 thread_id: &str,
55 has_user_input: bool,
56 frontend_run_id: Option<&str>,
57 decisions: &[ToolCallDecision],
58) -> Result<Option<ForwardedDecision>, ApiError> {
59 if has_user_input || decisions.is_empty() {
60 return Ok(None);
61 }
62
63 if let Some(forwarded) =
64 try_forward_decisions_by_thread(os, agent_id, thread_id, decisions).await
65 {
66 return Ok(Some(forwarded));
67 }
68
69 Err(ApiError::BadRequest(format!(
70 "no active run found for thread '{thread_id}'{suffix}; cannot apply decisions",
71 suffix = frontend_run_id_suffix(frontend_run_id),
72 )))
73}
74
75fn frontend_run_id_suffix(frontend_run_id: Option<&str>) -> String {
76 frontend_run_id
77 .map(|run_id| format!(", runId: {run_id}"))
78 .unwrap_or_default()
79}
80
81pub async fn try_forward_decisions_to_active_run_by_id(
82 os: &Arc<AgentOs>,
83 read_store: &dyn ThreadReader,
84 run_id: &str,
85 decisions: Vec<ToolCallDecision>,
86) -> Result<ForwardedDecision, ApiError> {
87 if decisions.is_empty() {
88 return Err(ApiError::BadRequest(
89 "decisions cannot be empty".to_string(),
90 ));
91 }
92
93 if let Some(forwarded) = os.forward_decisions_by_run_id(run_id, &decisions).await {
94 return Ok(forwarded);
95 }
96
97 Err(match check_run_liveness(read_store, run_id).await? {
98 RunLookup::ExistsButInactive => ApiError::BadRequest("run is not active".to_string()),
99 RunLookup::NotFound => ApiError::RunNotFound(run_id.to_string()),
100 })
101}
102
103pub async fn try_cancel_active_run_by_id(
104 os: &Arc<AgentOs>,
105 run_id: &str,
106) -> Result<bool, ApiError> {
107 Ok(os.cancel_active_run_by_id(run_id).await)
108}
109
110pub fn require_agent_state_store(os: &Arc<AgentOs>) -> Result<Arc<dyn ThreadStore>, ApiError> {
111 os.agent_state_store()
112 .cloned()
113 .ok_or_else(|| ApiError::Internal("agent state store not configured".to_string()))
114}
115
116pub struct PreparedHttpRun {
121 pub starter: RunStarter,
122 pub thread_id: String,
123 pub run_id: String,
124 pub ingress_rx: mpsc::UnboundedReceiver<RuntimeInput>,
125}
126
127pub async fn start_http_run(
128 os: &Arc<AgentOs>,
129 resolved: ResolvedRun,
130 run_request: RunRequest,
131 agent_id: &str,
132) -> Result<PreparedHttpRun, ApiError> {
133 start_http_run_with_persistence(os, resolved, run_request, agent_id, true).await
134}
135
136pub async fn start_http_dialog_run(
137 os: &Arc<AgentOs>,
138 resolved: ResolvedRun,
139 run_request: RunRequest,
140 agent_id: &str,
141) -> Result<PreparedHttpRun, ApiError> {
142 start_http_run_with_persistence(os, resolved, run_request, agent_id, false).await
143}
144
145async fn start_http_run_with_persistence(
146 os: &Arc<AgentOs>,
147 resolved: ResolvedRun,
148 run_request: RunRequest,
149 agent_id: &str,
150 persist_run: bool,
151) -> Result<PreparedHttpRun, ApiError> {
152 let run_request_for_ingress = run_request.clone();
153 let run = os
154 .start_active_run_with_persistence(
155 agent_id,
156 run_request,
157 resolved,
158 persist_run,
159 !persist_run,
160 )
161 .await
162 .map_err(ApiError::from)?;
163 let thread_id = run.thread_id.clone();
164 let run_id = run.run_id.clone();
165
166 let (ingress_tx, ingress_rx) = mpsc::unbounded_channel::<RuntimeInput>();
167 ingress_tx
168 .send(RuntimeInput::Run(run_request_for_ingress))
169 .expect("ingress channel just created");
170
171 let starter: RunStarter = Box::new(move |_request| Box::pin(async move { Ok(run) }));
172
173 Ok(PreparedHttpRun {
174 starter,
175 thread_id,
176 run_id,
177 ingress_rx,
178 })
179}
180
181pub async fn load_run_record(
183 read_store: &dyn ThreadReader,
184 run_id: &str,
185) -> Result<Option<RunRecord>, ApiError> {
186 read_store
187 .load_run(run_id)
188 .await
189 .map_err(|e| ApiError::Internal(e.to_string()))
190}
191
192pub async fn resolve_thread_id_from_run(
194 read_store: &dyn ThreadReader,
195 run_id: &str,
196) -> Result<Option<String>, ApiError> {
197 Ok(read_store
198 .load_run(run_id)
199 .await
200 .map_err(|e| ApiError::Internal(e.to_string()))?
201 .map(|r| r.thread_id))
202}
203
204pub enum RunLookup {
206 ExistsButInactive,
207 NotFound,
208}
209
210pub async fn check_run_liveness(
214 read_store: &dyn ThreadReader,
215 run_id: &str,
216) -> Result<RunLookup, ApiError> {
217 if read_store
218 .load_run(run_id)
219 .await
220 .map_err(|e| ApiError::Internal(e.to_string()))?
221 .is_some()
222 {
223 Ok(RunLookup::ExistsButInactive)
224 } else {
225 Ok(RunLookup::NotFound)
226 }
227}
228
229pub async fn start_background_run(
231 mailbox_service: &Arc<MailboxService>,
232 agent_id: &str,
233 run_request: RunRequest,
234 options: EnqueueOptions,
235) -> Result<(String, String, String), ApiError> {
236 mailbox_service.submit(agent_id, run_request, options).await
237}
238
239pub async fn truncate_thread_at_message(
241 os: &Arc<AgentOs>,
242 thread_id: &str,
243 message_id: &str,
244) -> Result<(), ApiError> {
245 let store = require_agent_state_store(os)?;
246 let mut thread = store
247 .load(thread_id)
248 .await
249 .map_err(|err| ApiError::Internal(err.to_string()))?
250 .ok_or_else(|| ApiError::BadRequest("thread not found for regenerate-message".to_string()))?
251 .thread;
252 let position = thread
253 .messages
254 .iter()
255 .position(|m| m.id.as_deref() == Some(message_id))
256 .ok_or_else(|| {
257 ApiError::BadRequest("messageId does not reference a stored message".to_string())
258 })?;
259 thread.messages.truncate(position + 1);
260 store
261 .save(&thread)
262 .await
263 .map_err(|err| ApiError::Internal(err.to_string()))
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn task_mode_preserves_run_lineage() {
272 let request = RunRequest {
273 agent_id: "agent".to_string(),
274 thread_id: Some("thread-1".to_string()),
275 run_id: Some("run-1".to_string()),
276 parent_run_id: Some("parent-run-1".to_string()),
277 parent_thread_id: Some("parent-thread-1".to_string()),
278 resource_id: Some("resource-1".to_string()),
279 origin: Default::default(),
280 state: None,
281 messages: vec![],
282 initial_decisions: vec![],
283 source_mailbox_entry_id: None,
284 };
285
286 let preserved = request;
287
288 assert_eq!(preserved.run_id.as_deref(), Some("run-1"));
289 assert_eq!(preserved.parent_run_id.as_deref(), Some("parent-run-1"));
290 assert_eq!(
291 preserved.parent_thread_id.as_deref(),
292 Some("parent-thread-1")
293 );
294 assert_eq!(preserved.thread_id.as_deref(), Some("thread-1"));
295 assert_eq!(preserved.resource_id.as_deref(), Some("resource-1"));
296 }
297}