tirea_agentos_server/
http.rs

1use axum::extract::{Path, Query, State};
2use axum::http::StatusCode;
3use axum::response::{IntoResponse, Response};
4use axum::routing::{get, patch, post};
5use axum::{Json, Router};
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use serde_json::Value;
10use tirea_agentos::contracts::storage::{
11    MailboxEntryOrigin, MailboxEntryStatus, MailboxQuery, ThreadListPage, ThreadListQuery,
12};
13use tirea_agentos::contracts::thread::{Message, Visibility};
14use tirea_agentos::contracts::{RunRequest, ToolCallDecision};
15use tirea_agentos::runtime::AgentOsRunError;
16use tirea_contract::storage::{RunOrigin, RunPage, RunQuery, RunRecord, RunStatus};
17use tirea_contract::{AgentEvent, Identity};
18
19use crate::service::{
20    check_run_liveness, load_run_record, normalize_optional_id, parse_message_query,
21    require_agent_state_store, start_background_run, start_http_run,
22    try_cancel_active_or_queued_run_by_id, try_forward_decisions_to_active_run_by_id, ApiError,
23    EnqueueOptions, MessageQueryParams, RunLookup,
24};
25use crate::transport::http_run::{wire_http_sse_relay, HttpSseRelayConfig};
26use crate::transport::http_sse::{sse_body_stream, sse_response};
27
28pub use crate::service::AppState;
29
30const HEALTH_PATH: &str = "/health";
31const THREADS_PATH: &str = "/v1/threads";
32const THREAD_SUMMARIES_PATH: &str = "/v1/threads/summaries";
33const THREAD_PATH: &str = "/v1/threads/:id";
34const THREAD_INTERRUPT_PATH: &str = "/v1/threads/:id/interrupt";
35const THREAD_METADATA_PATH: &str = "/v1/threads/:id/metadata";
36const THREAD_MESSAGES_PATH: &str = "/v1/threads/:id/messages";
37const THREAD_MAILBOX_PATH: &str = "/v1/threads/:id/mailbox";
38const RUNS_PATH: &str = "/v1/runs";
39const RUN_PATH: &str = "/v1/runs/:id";
40const RUN_INPUTS_PATH: &str = "/v1/runs/:id/inputs";
41const RUN_CANCEL_PATH: &str = "/v1/runs/:id/cancel";
42
43/// Build health routes.
44pub fn health_routes() -> Router<AppState> {
45    Router::new().route(HEALTH_PATH, get(health))
46}
47
48/// Build canonical thread query routes.
49pub fn thread_routes() -> Router<AppState> {
50    Router::new()
51        .route(THREADS_PATH, get(list_threads))
52        // Register /summaries before /:id to avoid `:id` capturing "summaries"
53        .route(THREAD_SUMMARIES_PATH, get(get_thread_summaries))
54        .route(THREAD_PATH, get(get_thread).delete(delete_thread))
55        .route(THREAD_INTERRUPT_PATH, post(interrupt_thread))
56        .route(THREAD_METADATA_PATH, patch(patch_thread_metadata))
57        .route(THREAD_MESSAGES_PATH, get(get_thread_messages))
58        .route(THREAD_MAILBOX_PATH, get(list_thread_mailbox))
59}
60
61/// Build run projection query routes (opt-in, not included in the default public router).
62pub fn run_routes() -> Router<AppState> {
63    Router::new()
64        .route(RUNS_PATH, get(list_runs).post(start_run))
65        .route(RUN_PATH, get(get_run))
66        .route(RUN_INPUTS_PATH, post(push_run_inputs))
67        .route(RUN_CANCEL_PATH, post(cancel_run))
68}
69
70async fn health() -> impl IntoResponse {
71    StatusCode::OK
72}
73
74fn default_thread_limit() -> usize {
75    50
76}
77
78#[derive(Debug, Deserialize)]
79struct ThreadListParams {
80    #[serde(default)]
81    offset: Option<usize>,
82    #[serde(default = "default_thread_limit")]
83    limit: usize,
84    #[serde(default)]
85    parent_thread_id: Option<String>,
86}
87
88async fn list_threads(
89    State(st): State<AppState>,
90    Query(params): Query<ThreadListParams>,
91) -> Result<Json<ThreadListPage>, ApiError> {
92    let query = ThreadListQuery {
93        offset: params.offset.unwrap_or(0),
94        limit: params.limit.clamp(1, 200),
95        resource_id: None,
96        parent_thread_id: params.parent_thread_id,
97    };
98    st.read_store
99        .list_paginated(&query)
100        .await
101        .map(Json)
102        .map_err(|e| ApiError::Internal(e.to_string()))
103}
104
105async fn get_thread(
106    State(st): State<AppState>,
107    Path(id): Path<String>,
108) -> Result<Json<Value>, ApiError> {
109    let Some(thread) = st
110        .read_store
111        .load_thread(&id)
112        .await
113        .map_err(|e| ApiError::Internal(e.to_string()))?
114    else {
115        return Err(ApiError::ThreadNotFound(id));
116    };
117    let value = serde_json::to_value(thread).map_err(|e| ApiError::Internal(e.to_string()))?;
118    Ok(Json(sanitize_public_thread_value(value)))
119}
120
121async fn get_thread_messages(
122    State(st): State<AppState>,
123    Path(id): Path<String>,
124    Query(params): Query<MessageQueryParams>,
125) -> Result<Json<Value>, ApiError> {
126    let query = parse_message_query(&params);
127    let page = st
128        .read_store
129        .load_messages(&id, &query)
130        .await
131        .map_err(|e| match e {
132            tirea_agentos::contracts::storage::ThreadStoreError::NotFound(_) => {
133                ApiError::ThreadNotFound(id)
134            }
135            other => ApiError::Internal(other.to_string()),
136        })?;
137    let value = serde_json::to_value(page).map_err(|e| ApiError::Internal(e.to_string()))?;
138    Ok(Json(sanitize_public_message_page_value(value)))
139}
140
141fn sanitize_public_thread_value(mut value: Value) -> Value {
142    if let Some(object) = value.as_object_mut() {
143        object.remove("patches");
144        if let Some(state) = object.get_mut("state") {
145            strip_public_run_state(state);
146        }
147        if let Some(messages) = object.get_mut("messages").and_then(Value::as_array_mut) {
148            for message in messages {
149                strip_public_message_run_metadata(message);
150            }
151        }
152    }
153    value
154}
155
156fn sanitize_public_message_page_value(mut value: Value) -> Value {
157    if let Some(messages) = value.get_mut("messages").and_then(Value::as_array_mut) {
158        for message in messages {
159            strip_public_message_run_metadata(message);
160        }
161    }
162    value
163}
164
165fn sanitize_public_mailbox_page_value(mut value: Value, visibility: Option<Visibility>) -> Value {
166    if let Some(items) = value.get_mut("items").and_then(Value::as_array_mut) {
167        for entry in items {
168            strip_public_mailbox_payload_messages(entry, visibility);
169        }
170    }
171    value
172}
173
174fn strip_public_run_state(state: &mut Value) {
175    if let Some(run_state) = state.get_mut("__run").and_then(Value::as_object_mut) {
176        run_state.remove("id");
177    }
178}
179
180fn strip_public_message_run_metadata(message: &mut Value) {
181    if let Some(object) = message.as_object_mut() {
182        let mut remove_metadata = false;
183        if let Some(metadata) = object.get_mut("metadata").and_then(Value::as_object_mut) {
184            metadata.remove("run_id");
185            remove_metadata = metadata.is_empty();
186        }
187        if remove_metadata {
188            object.remove("metadata");
189        }
190    }
191}
192
193fn strip_public_mailbox_payload_messages(entry: &mut Value, visibility: Option<Visibility>) {
194    let Some(payload) = entry.get_mut("payload").and_then(Value::as_object_mut) else {
195        return;
196    };
197    let Some(messages) = payload.get_mut("messages").and_then(Value::as_array_mut) else {
198        return;
199    };
200
201    messages.retain(|message| match visibility {
202        Some(Visibility::All) => mailbox_message_visibility(message) != Visibility::Internal,
203        Some(Visibility::Internal) => mailbox_message_visibility(message) == Visibility::Internal,
204        None => true,
205    });
206
207    for message in messages {
208        strip_public_message_run_metadata(message);
209    }
210}
211
212fn mailbox_message_visibility(message: &Value) -> Visibility {
213    match message.get("visibility").and_then(Value::as_str) {
214        Some("internal") => Visibility::Internal,
215        _ => Visibility::All,
216    }
217}
218
219// ---------------------------------------------------------------------------
220// Thread CRUD + summaries
221// ---------------------------------------------------------------------------
222
223async fn delete_thread(
224    State(st): State<AppState>,
225    Path(id): Path<String>,
226) -> Result<StatusCode, ApiError> {
227    let store = require_agent_state_store(&st.os)?;
228    store
229        .delete(&id)
230        .await
231        .map_err(|e| ApiError::Internal(e.to_string()))?;
232    Ok(StatusCode::NO_CONTENT)
233}
234
235#[derive(Debug, Deserialize)]
236struct PatchMetadataPayload {
237    title: Option<String>,
238}
239
240async fn patch_thread_metadata(
241    State(st): State<AppState>,
242    Path(id): Path<String>,
243    Json(payload): Json<PatchMetadataPayload>,
244) -> Result<Json<Value>, ApiError> {
245    let store = require_agent_state_store(&st.os)?;
246    let mut thread = store
247        .load_thread(&id)
248        .await
249        .map_err(|e| ApiError::Internal(e.to_string()))?
250        .ok_or(ApiError::ThreadNotFound(id))?;
251
252    if let Some(title) = payload.title {
253        thread
254            .metadata
255            .extra
256            .insert("title".to_string(), Value::String(title));
257    }
258
259    store
260        .save(&thread)
261        .await
262        .map_err(|e| ApiError::Internal(e.to_string()))?;
263
264    Ok(Json(
265        serde_json::to_value(&thread.metadata).unwrap_or_default(),
266    ))
267}
268
269async fn interrupt_thread(
270    State(st): State<AppState>,
271    Path(id): Path<String>,
272) -> Result<Response, ApiError> {
273    let result = st
274        .mailbox_service
275        .control(&id, crate::service::ControlSignal::Interrupt)
276        .await?;
277
278    let superseded_entry_ids: Vec<String> = result
279        .superseded_entries
280        .iter()
281        .map(|entry| entry.entry_id.clone())
282        .collect();
283    Ok((
284        StatusCode::ACCEPTED,
285        Json(json!({
286            "status": "interrupt_requested",
287            "thread_id": id,
288            "generation": result.generation.unwrap_or(0),
289            "cancelled_run_id": result.cancelled_run_id,
290            "superseded_pending_count": result.superseded_entries.len(),
291            "superseded_pending_entry_ids": superseded_entry_ids,
292        })),
293    )
294        .into_response())
295}
296
297fn default_mailbox_limit() -> usize {
298    50
299}
300
301#[derive(Debug, Deserialize)]
302struct MailboxListParams {
303    #[serde(default)]
304    offset: Option<usize>,
305    #[serde(default = "default_mailbox_limit")]
306    limit: usize,
307    #[serde(default)]
308    status: Option<String>,
309    #[serde(default)]
310    origin: Option<String>,
311    #[serde(default)]
312    visibility: Option<String>,
313}
314
315fn parse_mailbox_status(raw: &str) -> Option<MailboxEntryStatus> {
316    match raw.trim().to_ascii_lowercase().as_str() {
317        "queued" => Some(MailboxEntryStatus::Queued),
318        "claimed" => Some(MailboxEntryStatus::Claimed),
319        "accepted" => Some(MailboxEntryStatus::Accepted),
320        "superseded" => Some(MailboxEntryStatus::Superseded),
321        "cancelled" => Some(MailboxEntryStatus::Cancelled),
322        "dead_letter" | "deadletter" => Some(MailboxEntryStatus::DeadLetter),
323        _ => None,
324    }
325}
326
327fn parse_mailbox_origin(raw: &str) -> Option<Option<MailboxEntryOrigin>> {
328    match raw.trim().to_ascii_lowercase().as_str() {
329        "external" => Some(Some(MailboxEntryOrigin::External)),
330        "internal" => Some(Some(MailboxEntryOrigin::Internal)),
331        "none" => Some(None),
332        _ => None,
333    }
334}
335
336fn parse_mailbox_visibility(raw: Option<&str>) -> Option<Visibility> {
337    match raw.map(str::trim).map(str::to_ascii_lowercase).as_deref() {
338        Some("internal") => Some(Visibility::Internal),
339        Some("none") => None,
340        _ => Some(Visibility::All),
341    }
342}
343
344async fn list_thread_mailbox(
345    State(st): State<AppState>,
346    Path(id): Path<String>,
347    Query(params): Query<MailboxListParams>,
348) -> Result<Json<Value>, ApiError> {
349    let origin = parse_mailbox_origin(params.origin.as_deref().unwrap_or("external"))
350        .unwrap_or(Some(MailboxEntryOrigin::External));
351    let visibility = parse_mailbox_visibility(params.visibility.as_deref());
352    let query = MailboxQuery {
353        mailbox_id: Some(id),
354        origin,
355        status: params.status.as_deref().and_then(parse_mailbox_status),
356        offset: params.offset.unwrap_or(0),
357        limit: params.limit.clamp(1, 200),
358    };
359    let page = st
360        .mailbox_store()
361        .list_mailbox_entries(&query)
362        .await
363        .map_err(|e| ApiError::Internal(e.to_string()))?;
364    let value = serde_json::to_value(page).map_err(|e| ApiError::Internal(e.to_string()))?;
365    Ok(Json(sanitize_public_mailbox_page_value(value, visibility)))
366}
367
368#[derive(Debug, Serialize)]
369struct ThreadSummary {
370    id: String,
371    title: Option<String>,
372    updated_at: Option<u64>,
373    created_at: Option<u64>,
374    message_count: usize,
375}
376
377async fn get_thread_summaries(
378    State(st): State<AppState>,
379) -> Result<Json<Vec<ThreadSummary>>, ApiError> {
380    let query = ThreadListQuery {
381        offset: 0,
382        limit: 200,
383        resource_id: None,
384        parent_thread_id: None,
385    };
386    let page = st
387        .read_store
388        .list_paginated(&query)
389        .await
390        .map_err(|e| ApiError::Internal(e.to_string()))?;
391
392    let mut summaries = Vec::with_capacity(page.items.len());
393    for id in &page.items {
394        if let Some(head) = st
395            .read_store
396            .load(id)
397            .await
398            .map_err(|e| ApiError::Internal(e.to_string()))?
399        {
400            // Skip sub-agent threads (they have a parent_thread_id)
401            if head.thread.parent_thread_id.is_some() {
402                continue;
403            }
404            let title = head
405                .thread
406                .metadata
407                .extra
408                .get("title")
409                .and_then(|v| v.as_str())
410                .map(String::from);
411            summaries.push(ThreadSummary {
412                id: id.clone(),
413                title,
414                updated_at: head.thread.metadata.updated_at,
415                created_at: head.thread.metadata.created_at,
416                message_count: head.thread.messages.len(),
417            });
418        }
419    }
420
421    // Sort by updated_at descending (None treated as 0 → sorts last)
422    summaries.sort_by(|a, b| b.updated_at.unwrap_or(0).cmp(&a.updated_at.unwrap_or(0)));
423
424    Ok(Json(summaries))
425}
426
427#[derive(Debug, Deserialize)]
428struct RunListParams {
429    #[serde(default)]
430    offset: Option<usize>,
431    #[serde(default = "default_thread_limit")]
432    limit: usize,
433    #[serde(default)]
434    thread_id: Option<String>,
435    #[serde(default)]
436    parent_run_id: Option<String>,
437    #[serde(default)]
438    status: Option<String>,
439    #[serde(rename = "terminationCode", alias = "termination_code", default)]
440    termination_code: Option<String>,
441    #[serde(default)]
442    origin: Option<String>,
443    #[serde(default)]
444    created_at_from: Option<u64>,
445    #[serde(default)]
446    created_at_to: Option<u64>,
447    #[serde(default)]
448    updated_at_from: Option<u64>,
449    #[serde(default)]
450    updated_at_to: Option<u64>,
451}
452
453fn parse_run_status(raw: &str) -> Option<RunStatus> {
454    match raw.trim().to_ascii_lowercase().as_str() {
455        "running" => Some(RunStatus::Running),
456        "waiting" => Some(RunStatus::Waiting),
457        "done" => Some(RunStatus::Done),
458        _ => None,
459    }
460}
461
462fn parse_run_origin(raw: &str) -> Option<RunOrigin> {
463    match raw.trim().to_ascii_lowercase().as_str() {
464        "user" => Some(RunOrigin::User),
465        "subagent" => Some(RunOrigin::Subagent),
466        "ag_ui" | "ag-ui" | "agui" => Some(RunOrigin::AgUi),
467        "ai_sdk" | "ai-sdk" | "aisdk" => Some(RunOrigin::AiSdk),
468        "a2a" => Some(RunOrigin::A2a),
469        "internal" => Some(RunOrigin::Internal),
470        _ => None,
471    }
472}
473
474fn normalize_termination_code(value: Option<String>) -> Option<String> {
475    value.and_then(|raw| {
476        let trimmed = raw.trim();
477        if trimmed.is_empty() {
478            None
479        } else {
480            Some(trimmed.to_ascii_lowercase())
481        }
482    })
483}
484
485#[derive(Debug, Deserialize)]
486struct CreateRunPayload {
487    #[serde(rename = "agentId", alias = "agent_id")]
488    agent_id: String,
489    #[serde(
490        rename = "threadId",
491        alias = "thread_id",
492        alias = "context_id",
493        alias = "contextId",
494        default
495    )]
496    thread_id: Option<String>,
497    #[serde(
498        rename = "runId",
499        alias = "run_id",
500        alias = "task_id",
501        alias = "taskId",
502        default
503    )]
504    run_id: Option<String>,
505    #[serde(rename = "parentRunId", alias = "parent_run_id", default)]
506    parent_run_id: Option<String>,
507    #[serde(
508        rename = "parentThreadId",
509        alias = "parent_thread_id",
510        alias = "parentContextId",
511        alias = "parent_context_id",
512        default
513    )]
514    parent_thread_id: Option<String>,
515    #[serde(rename = "resourceId", alias = "resource_id", default)]
516    resource_id: Option<String>,
517    #[serde(default)]
518    state: Option<Value>,
519    #[serde(default)]
520    messages: Vec<Message>,
521    #[serde(
522        rename = "initialDecisions",
523        alias = "initial_decisions",
524        alias = "decisions",
525        default
526    )]
527    initial_decisions: Vec<ToolCallDecision>,
528}
529
530impl CreateRunPayload {
531    fn into_run_request(self) -> Result<(String, RunRequest), ApiError> {
532        let agent_id = self.agent_id.trim().to_string();
533        if agent_id.is_empty() {
534            return Err(ApiError::BadRequest("agent_id cannot be empty".to_string()));
535        }
536
537        Ok((
538            agent_id.clone(),
539            RunRequest {
540                agent_id,
541                thread_id: normalize_optional_id(self.thread_id),
542                run_id: normalize_optional_id(self.run_id),
543                parent_run_id: normalize_optional_id(self.parent_run_id),
544                parent_thread_id: normalize_optional_id(self.parent_thread_id),
545                resource_id: normalize_optional_id(self.resource_id),
546                origin: RunOrigin::default(),
547                state: self.state,
548                messages: self.messages,
549                initial_decisions: self.initial_decisions,
550                source_mailbox_entry_id: None,
551            },
552        ))
553    }
554}
555
556#[derive(Debug, Deserialize)]
557struct RunInputPayload {
558    #[serde(rename = "agentId", alias = "agent_id", default)]
559    agent_id: Option<String>,
560    #[serde(default)]
561    messages: Vec<Message>,
562    #[serde(rename = "state", default)]
563    state: Option<Value>,
564    #[serde(rename = "resourceId", alias = "resource_id", default)]
565    resource_id: Option<String>,
566    #[serde(rename = "runId", alias = "run_id", default)]
567    run_id: Option<String>,
568    #[serde(
569        rename = "decisions",
570        alias = "initialDecisions",
571        alias = "initial_decisions",
572        default
573    )]
574    decisions: Vec<ToolCallDecision>,
575}
576
577async fn start_run(
578    State(st): State<AppState>,
579    Json(payload): Json<CreateRunPayload>,
580) -> Result<Response, ApiError> {
581    let (agent_id, run_request) = payload.into_run_request()?;
582
583    let resolved = st.os.resolve(&agent_id).map_err(AgentOsRunError::from)?;
584    let prepared = start_http_run(&st.os, resolved, run_request, &agent_id).await?;
585
586    let encoder = Identity::<AgentEvent>::default();
587    let sse_rx = wire_http_sse_relay(
588        prepared.starter,
589        encoder,
590        prepared.ingress_rx,
591        HttpSseRelayConfig {
592            thread_id: prepared.thread_id,
593            fanout: None,
594            resumable_downstream: false,
595            protocol_label: "run-api",
596            on_relay_done: move |_sse_tx| async move {},
597            error_formatter: |msg| {
598                let error = json!({
599                    "type": "error",
600                    "message": msg,
601                    "code": "RELAY_ERROR",
602                });
603                let payload = serde_json::to_string(&error).unwrap_or_else(|_| {
604                    "{\"type\":\"error\",\"message\":\"relay error\",\"code\":\"RELAY_ERROR\"}"
605                        .to_string()
606                });
607                Bytes::from(format!("data: {payload}\n\n"))
608            },
609        },
610    );
611
612    Ok(sse_response(sse_body_stream(sse_rx)))
613}
614
615async fn push_run_inputs(
616    State(st): State<AppState>,
617    Path(id): Path<String>,
618    Json(mut payload): Json<RunInputPayload>,
619) -> Result<Response, ApiError> {
620    payload.resource_id = normalize_optional_id(payload.resource_id);
621    payload.run_id = normalize_optional_id(payload.run_id);
622    let decisions = payload.decisions;
623
624    if payload.messages.is_empty() && decisions.is_empty() {
625        return Err(ApiError::BadRequest(
626            "messages and decisions cannot both be empty".to_string(),
627        ));
628    }
629
630    if payload.messages.is_empty() {
631        let forwarded = try_forward_decisions_to_active_run_by_id(
632            &st.os,
633            st.read_store.as_ref(),
634            &id,
635            decisions,
636        )
637        .await?;
638
639        return Ok((
640            StatusCode::ACCEPTED,
641            Json(json!({
642                "status": "decision_forwarded",
643                "run_id": id,
644                "thread_id": forwarded.thread_id,
645            })),
646        )
647            .into_response());
648    }
649
650    let Some(parent_run) = load_run_record(st.read_store.as_ref(), &id).await? else {
651        return Err(ApiError::RunNotFound(id));
652    };
653    let agent_id = payload
654        .agent_id
655        .as_deref()
656        .map(str::trim)
657        .filter(|value| !value.is_empty())
658        .ok_or_else(|| {
659            ApiError::BadRequest("agent_id is required when messages are provided".to_string())
660        })?
661        .to_string();
662    let run_request = RunRequest {
663        agent_id: agent_id.clone(),
664        thread_id: Some(parent_run.thread_id.clone()),
665        run_id: payload.run_id,
666        parent_run_id: Some(id.clone()),
667        parent_thread_id: parent_run.parent_thread_id,
668        resource_id: payload.resource_id,
669        origin: RunOrigin::default(),
670        state: payload.state,
671        messages: payload.messages,
672        initial_decisions: decisions,
673        source_mailbox_entry_id: None,
674    };
675
676    let (thread_id, _run_id, _entry_id) = start_background_run(
677        &st.mailbox_service,
678        &agent_id,
679        run_request,
680        EnqueueOptions::default(),
681    )
682    .await?;
683    Ok((
684        StatusCode::ACCEPTED,
685        Json(json!({
686            "status": "continuation_started",
687            "parent_run_id": id,
688            "thread_id": thread_id,
689        })),
690    )
691        .into_response())
692}
693
694async fn cancel_run(
695    State(st): State<AppState>,
696    Path(id): Path<String>,
697) -> Result<Response, ApiError> {
698    if try_cancel_active_or_queued_run_by_id(&st.os, st.mailbox_store(), &id)
699        .await?
700        .is_some()
701    {
702        return Ok((
703            StatusCode::ACCEPTED,
704            Json(json!({
705                "status": "cancel_requested",
706                "run_id": id,
707            })),
708        )
709            .into_response());
710    }
711
712    Err(
713        match check_run_liveness(st.read_store.as_ref(), &id).await? {
714            RunLookup::ExistsButInactive => ApiError::BadRequest("run is not active".to_string()),
715            RunLookup::NotFound => ApiError::RunNotFound(id),
716        },
717    )
718}
719
720async fn get_run(
721    State(st): State<AppState>,
722    Path(id): Path<String>,
723) -> Result<Json<RunRecord>, ApiError> {
724    let Some(record) = st
725        .read_store
726        .load_run(&id)
727        .await
728        .map_err(|e| ApiError::Internal(e.to_string()))?
729    else {
730        return Err(ApiError::RunNotFound(id));
731    };
732    Ok(Json(record))
733}
734
735async fn list_runs(
736    State(st): State<AppState>,
737    Query(params): Query<RunListParams>,
738) -> Result<Json<RunPage>, ApiError> {
739    let query = RunQuery {
740        offset: params.offset.unwrap_or(0),
741        limit: params.limit.clamp(1, 200),
742        thread_id: params.thread_id,
743        parent_run_id: params.parent_run_id,
744        status: params.status.as_deref().and_then(parse_run_status),
745        termination_code: normalize_termination_code(params.termination_code),
746        origin: params.origin.as_deref().and_then(parse_run_origin),
747        created_at_from: params.created_at_from,
748        created_at_to: params.created_at_to,
749        updated_at_from: params.updated_at_from,
750        updated_at_to: params.updated_at_to,
751    };
752    let page = st
753        .read_store
754        .list_runs(&query)
755        .await
756        .map_err(|e| ApiError::Internal(e.to_string()))?;
757    Ok(Json(page))
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    #[test]
765    fn parse_run_filters() {
766        assert_eq!(parse_run_status("running"), Some(RunStatus::Running));
767        assert_eq!(parse_run_status("waiting"), Some(RunStatus::Waiting));
768        assert_eq!(parse_run_status("done"), Some(RunStatus::Done));
769        assert_eq!(parse_run_status("unknown"), None);
770        assert_eq!(
771            normalize_termination_code(Some(" Cancelled ".to_string())),
772            Some("cancelled".to_string())
773        );
774
775        assert_eq!(parse_run_origin("a2a"), Some(RunOrigin::A2a));
776        assert_eq!(parse_run_origin("ag-ui"), Some(RunOrigin::AgUi));
777        assert_eq!(parse_run_origin("x"), None);
778    }
779
780    #[test]
781    fn parse_mailbox_filters_default_to_external_public_view() {
782        assert_eq!(
783            parse_mailbox_origin("external"),
784            Some(Some(MailboxEntryOrigin::External))
785        );
786        assert_eq!(
787            parse_mailbox_origin("internal"),
788            Some(Some(MailboxEntryOrigin::Internal))
789        );
790        assert_eq!(parse_mailbox_origin("none"), Some(None));
791        assert_eq!(parse_mailbox_origin("x"), None);
792
793        assert_eq!(parse_mailbox_visibility(None), Some(Visibility::All));
794        assert_eq!(
795            parse_mailbox_visibility(Some("internal")),
796            Some(Visibility::Internal)
797        );
798        assert_eq!(parse_mailbox_visibility(Some("none")), None);
799    }
800}