tirea_agentos_server/service/
run.rs

1use std::sync::Arc;
2use tirea_agentos::contracts::storage::{ThreadReader, ThreadStore};
3use tirea_agentos::contracts::RunRequest;
4use tirea_agentos::contracts::ToolCallDecision;
5use tirea_agentos::runtime::{AgentOs, AgentOsRunError, ForwardedDecision, ResolvedRun};
6use tirea_contract::storage::RunRecord;
7use tirea_contract::RuntimeInput;
8use tokio::sync::mpsc;
9
10use crate::transport::runtime_endpoint::RunStarter;
11
12use super::mailbox_service::MailboxService;
13use super::ApiError;
14use super::EnqueueOptions;
15
16pub async fn current_run_id_for_thread(
17    os: &Arc<AgentOs>,
18    agent_id: &str,
19    thread_id: &str,
20    read_store: &dyn ThreadReader,
21) -> Result<Option<String>, ApiError> {
22    match os.current_run_id_for_thread(agent_id, thread_id).await {
23        Ok(found) => Ok(found),
24        Err(AgentOsRunError::AgentStateStoreNotConfigured) => {
25            let Some(record) = read_store
26                .active_run_for_thread(thread_id)
27                .await
28                .map_err(|e| ApiError::Internal(e.to_string()))?
29            else {
30                return Ok(None);
31            };
32            if !record.agent_id.is_empty() && record.agent_id != agent_id {
33                return Ok(None);
34            }
35            Ok(Some(record.run_id))
36        }
37        Err(other) => Err(ApiError::Internal(other.to_string())),
38    }
39}
40
41async fn try_forward_decisions_by_thread(
42    os: &Arc<AgentOs>,
43    agent_id: &str,
44    thread_id: &str,
45    decisions: &[ToolCallDecision],
46) -> Option<ForwardedDecision> {
47    os.forward_decisions_by_thread(agent_id, thread_id, decisions)
48        .await
49}
50
51pub async fn forward_dialog_decisions_by_thread(
52    os: &Arc<AgentOs>,
53    agent_id: &str,
54    thread_id: &str,
55    has_user_input: bool,
56    frontend_run_id: Option<&str>,
57    decisions: &[ToolCallDecision],
58) -> Result<Option<ForwardedDecision>, ApiError> {
59    if has_user_input || decisions.is_empty() {
60        return Ok(None);
61    }
62
63    if let Some(forwarded) =
64        try_forward_decisions_by_thread(os, agent_id, thread_id, decisions).await
65    {
66        return Ok(Some(forwarded));
67    }
68
69    Err(ApiError::BadRequest(format!(
70        "no active run found for thread '{thread_id}'{suffix}; cannot apply decisions",
71        suffix = frontend_run_id_suffix(frontend_run_id),
72    )))
73}
74
75fn frontend_run_id_suffix(frontend_run_id: Option<&str>) -> String {
76    frontend_run_id
77        .map(|run_id| format!(", runId: {run_id}"))
78        .unwrap_or_default()
79}
80
81pub async fn try_forward_decisions_to_active_run_by_id(
82    os: &Arc<AgentOs>,
83    read_store: &dyn ThreadReader,
84    run_id: &str,
85    decisions: Vec<ToolCallDecision>,
86) -> Result<ForwardedDecision, ApiError> {
87    if decisions.is_empty() {
88        return Err(ApiError::BadRequest(
89            "decisions cannot be empty".to_string(),
90        ));
91    }
92
93    if let Some(forwarded) = os.forward_decisions_by_run_id(run_id, &decisions).await {
94        return Ok(forwarded);
95    }
96
97    Err(match check_run_liveness(read_store, run_id).await? {
98        RunLookup::ExistsButInactive => ApiError::BadRequest("run is not active".to_string()),
99        RunLookup::NotFound => ApiError::RunNotFound(run_id.to_string()),
100    })
101}
102
103pub async fn try_cancel_active_run_by_id(
104    os: &Arc<AgentOs>,
105    run_id: &str,
106) -> Result<bool, ApiError> {
107    Ok(os.cancel_active_run_by_id(run_id).await)
108}
109
110pub fn require_agent_state_store(os: &Arc<AgentOs>) -> Result<Arc<dyn ThreadStore>, ApiError> {
111    os.agent_state_store()
112        .cloned()
113        .ok_or_else(|| ApiError::Internal("agent state store not configured".to_string()))
114}
115
116/// Shared HTTP run bootstrap result.
117///
118/// The run is already started via AgentOS lifecycle API; this payload only
119/// adapts it to transport wiring (`RunStarter` + ingress channel).
120pub struct PreparedHttpRun {
121    pub starter: RunStarter,
122    pub thread_id: String,
123    pub run_id: String,
124    pub ingress_rx: mpsc::UnboundedReceiver<RuntimeInput>,
125}
126
127pub async fn start_http_run(
128    os: &Arc<AgentOs>,
129    resolved: ResolvedRun,
130    run_request: RunRequest,
131    agent_id: &str,
132) -> Result<PreparedHttpRun, ApiError> {
133    start_http_run_with_persistence(os, resolved, run_request, agent_id, true).await
134}
135
136pub async fn start_http_dialog_run(
137    os: &Arc<AgentOs>,
138    resolved: ResolvedRun,
139    run_request: RunRequest,
140    agent_id: &str,
141) -> Result<PreparedHttpRun, ApiError> {
142    start_http_run_with_persistence(os, resolved, run_request, agent_id, false).await
143}
144
145async fn start_http_run_with_persistence(
146    os: &Arc<AgentOs>,
147    resolved: ResolvedRun,
148    run_request: RunRequest,
149    agent_id: &str,
150    persist_run: bool,
151) -> Result<PreparedHttpRun, ApiError> {
152    let run_request_for_ingress = run_request.clone();
153    let run = os
154        .start_active_run_with_persistence(
155            agent_id,
156            run_request,
157            resolved,
158            persist_run,
159            !persist_run,
160        )
161        .await
162        .map_err(ApiError::from)?;
163    let thread_id = run.thread_id.clone();
164    let run_id = run.run_id.clone();
165
166    let (ingress_tx, ingress_rx) = mpsc::unbounded_channel::<RuntimeInput>();
167    ingress_tx
168        .send(RuntimeInput::Run(run_request_for_ingress))
169        .expect("ingress channel just created");
170
171    let starter: RunStarter = Box::new(move |_request| Box::pin(async move { Ok(run) }));
172
173    Ok(PreparedHttpRun {
174        starter,
175        thread_id,
176        run_id,
177        ingress_rx,
178    })
179}
180
181/// Load the full [`RunRecord`] for a given run id.
182pub async fn load_run_record(
183    read_store: &dyn ThreadReader,
184    run_id: &str,
185) -> Result<Option<RunRecord>, ApiError> {
186    read_store
187        .load_run(run_id)
188        .await
189        .map_err(|e| ApiError::Internal(e.to_string()))
190}
191
192/// Resolve the thread id that a run belongs to.
193pub async fn resolve_thread_id_from_run(
194    read_store: &dyn ThreadReader,
195    run_id: &str,
196) -> Result<Option<String>, ApiError> {
197    Ok(read_store
198        .load_run(run_id)
199        .await
200        .map_err(|e| ApiError::Internal(e.to_string()))?
201        .map(|r| r.thread_id))
202}
203
204/// Result of checking whether a run is currently active.
205pub enum RunLookup {
206    ExistsButInactive,
207    NotFound,
208}
209
210/// After an active-run operation (cancel/forward) fails, check if the run
211/// exists in the persistent store. Returns [`RunLookup::ExistsButInactive`]
212/// or [`RunLookup::NotFound`].
213pub async fn check_run_liveness(
214    read_store: &dyn ThreadReader,
215    run_id: &str,
216) -> Result<RunLookup, ApiError> {
217    if read_store
218        .load_run(run_id)
219        .await
220        .map_err(|e| ApiError::Internal(e.to_string()))?
221        .is_some()
222    {
223        Ok(RunLookup::ExistsButInactive)
224    } else {
225        Ok(RunLookup::NotFound)
226    }
227}
228
229/// Returns `(thread_id, run_id, entry_id)`.
230pub async fn start_background_run(
231    mailbox_service: &Arc<MailboxService>,
232    agent_id: &str,
233    run_request: RunRequest,
234    options: EnqueueOptions,
235) -> Result<(String, String, String), ApiError> {
236    mailbox_service.submit(agent_id, run_request, options).await
237}
238
239/// Truncate a stored thread so it includes messages up to and including `message_id`.
240pub async fn truncate_thread_at_message(
241    os: &Arc<AgentOs>,
242    thread_id: &str,
243    message_id: &str,
244) -> Result<(), ApiError> {
245    let store = require_agent_state_store(os)?;
246    let mut thread = store
247        .load(thread_id)
248        .await
249        .map_err(|err| ApiError::Internal(err.to_string()))?
250        .ok_or_else(|| ApiError::BadRequest("thread not found for regenerate-message".to_string()))?
251        .thread;
252    let position = thread
253        .messages
254        .iter()
255        .position(|m| m.id.as_deref() == Some(message_id))
256        .ok_or_else(|| {
257            ApiError::BadRequest("messageId does not reference a stored message".to_string())
258        })?;
259    thread.messages.truncate(position + 1);
260    store
261        .save(&thread)
262        .await
263        .map_err(|err| ApiError::Internal(err.to_string()))
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn task_mode_preserves_run_lineage() {
272        let request = RunRequest {
273            agent_id: "agent".to_string(),
274            thread_id: Some("thread-1".to_string()),
275            run_id: Some("run-1".to_string()),
276            parent_run_id: Some("parent-run-1".to_string()),
277            parent_thread_id: Some("parent-thread-1".to_string()),
278            resource_id: Some("resource-1".to_string()),
279            origin: Default::default(),
280            state: None,
281            messages: vec![],
282            initial_decisions: vec![],
283            source_mailbox_entry_id: None,
284        };
285
286        let preserved = request;
287
288        assert_eq!(preserved.run_id.as_deref(), Some("run-1"));
289        assert_eq!(preserved.parent_run_id.as_deref(), Some("parent-run-1"));
290        assert_eq!(
291            preserved.parent_thread_id.as_deref(),
292            Some("parent-thread-1")
293        );
294        assert_eq!(preserved.thread_id.as_deref(), Some("thread-1"));
295        assert_eq!(preserved.resource_id.as_deref(), Some("resource-1"));
296    }
297}