1use axum::extract::{Path, Query, State};
2use axum::http::StatusCode;
3use axum::response::{IntoResponse, Response};
4use axum::routing::{get, patch, post};
5use axum::{Json, Router};
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use serde_json::Value;
10use tirea_agentos::contracts::storage::{
11 MailboxEntryOrigin, MailboxEntryStatus, MailboxQuery, ThreadListPage, ThreadListQuery,
12};
13use tirea_agentos::contracts::thread::{Message, Visibility};
14use tirea_agentos::contracts::{RunRequest, ToolCallDecision};
15use tirea_agentos::runtime::AgentOsRunError;
16use tirea_contract::storage::{RunOrigin, RunPage, RunQuery, RunRecord, RunStatus};
17use tirea_contract::{AgentEvent, Identity};
18
19use crate::service::{
20 check_run_liveness, load_run_record, normalize_optional_id, parse_message_query,
21 require_agent_state_store, start_background_run, start_http_run,
22 try_cancel_active_or_queued_run_by_id, try_forward_decisions_to_active_run_by_id, ApiError,
23 EnqueueOptions, MessageQueryParams, RunLookup,
24};
25use crate::transport::http_run::{wire_http_sse_relay, HttpSseRelayConfig};
26use crate::transport::http_sse::{sse_body_stream, sse_response};
27
28pub use crate::service::AppState;
29
30const HEALTH_PATH: &str = "/health";
31const THREADS_PATH: &str = "/v1/threads";
32const THREAD_SUMMARIES_PATH: &str = "/v1/threads/summaries";
33const THREAD_PATH: &str = "/v1/threads/:id";
34const THREAD_INTERRUPT_PATH: &str = "/v1/threads/:id/interrupt";
35const THREAD_METADATA_PATH: &str = "/v1/threads/:id/metadata";
36const THREAD_MESSAGES_PATH: &str = "/v1/threads/:id/messages";
37const THREAD_MAILBOX_PATH: &str = "/v1/threads/:id/mailbox";
38const RUNS_PATH: &str = "/v1/runs";
39const RUN_PATH: &str = "/v1/runs/:id";
40const RUN_INPUTS_PATH: &str = "/v1/runs/:id/inputs";
41const RUN_CANCEL_PATH: &str = "/v1/runs/:id/cancel";
42
43pub fn health_routes() -> Router<AppState> {
45 Router::new().route(HEALTH_PATH, get(health))
46}
47
48pub fn thread_routes() -> Router<AppState> {
50 Router::new()
51 .route(THREADS_PATH, get(list_threads))
52 .route(THREAD_SUMMARIES_PATH, get(get_thread_summaries))
54 .route(THREAD_PATH, get(get_thread).delete(delete_thread))
55 .route(THREAD_INTERRUPT_PATH, post(interrupt_thread))
56 .route(THREAD_METADATA_PATH, patch(patch_thread_metadata))
57 .route(THREAD_MESSAGES_PATH, get(get_thread_messages))
58 .route(THREAD_MAILBOX_PATH, get(list_thread_mailbox))
59}
60
61pub fn run_routes() -> Router<AppState> {
63 Router::new()
64 .route(RUNS_PATH, get(list_runs).post(start_run))
65 .route(RUN_PATH, get(get_run))
66 .route(RUN_INPUTS_PATH, post(push_run_inputs))
67 .route(RUN_CANCEL_PATH, post(cancel_run))
68}
69
70async fn health() -> impl IntoResponse {
71 StatusCode::OK
72}
73
74fn default_thread_limit() -> usize {
75 50
76}
77
78#[derive(Debug, Deserialize)]
79struct ThreadListParams {
80 #[serde(default)]
81 offset: Option<usize>,
82 #[serde(default = "default_thread_limit")]
83 limit: usize,
84 #[serde(default)]
85 parent_thread_id: Option<String>,
86}
87
88async fn list_threads(
89 State(st): State<AppState>,
90 Query(params): Query<ThreadListParams>,
91) -> Result<Json<ThreadListPage>, ApiError> {
92 let query = ThreadListQuery {
93 offset: params.offset.unwrap_or(0),
94 limit: params.limit.clamp(1, 200),
95 resource_id: None,
96 parent_thread_id: params.parent_thread_id,
97 };
98 st.read_store
99 .list_paginated(&query)
100 .await
101 .map(Json)
102 .map_err(|e| ApiError::Internal(e.to_string()))
103}
104
105async fn get_thread(
106 State(st): State<AppState>,
107 Path(id): Path<String>,
108) -> Result<Json<Value>, ApiError> {
109 let Some(thread) = st
110 .read_store
111 .load_thread(&id)
112 .await
113 .map_err(|e| ApiError::Internal(e.to_string()))?
114 else {
115 return Err(ApiError::ThreadNotFound(id));
116 };
117 let value = serde_json::to_value(thread).map_err(|e| ApiError::Internal(e.to_string()))?;
118 Ok(Json(sanitize_public_thread_value(value)))
119}
120
121async fn get_thread_messages(
122 State(st): State<AppState>,
123 Path(id): Path<String>,
124 Query(params): Query<MessageQueryParams>,
125) -> Result<Json<Value>, ApiError> {
126 let query = parse_message_query(¶ms);
127 let page = st
128 .read_store
129 .load_messages(&id, &query)
130 .await
131 .map_err(|e| match e {
132 tirea_agentos::contracts::storage::ThreadStoreError::NotFound(_) => {
133 ApiError::ThreadNotFound(id)
134 }
135 other => ApiError::Internal(other.to_string()),
136 })?;
137 let value = serde_json::to_value(page).map_err(|e| ApiError::Internal(e.to_string()))?;
138 Ok(Json(sanitize_public_message_page_value(value)))
139}
140
141fn sanitize_public_thread_value(mut value: Value) -> Value {
142 if let Some(object) = value.as_object_mut() {
143 object.remove("patches");
144 if let Some(state) = object.get_mut("state") {
145 strip_public_run_state(state);
146 }
147 if let Some(messages) = object.get_mut("messages").and_then(Value::as_array_mut) {
148 for message in messages {
149 strip_public_message_run_metadata(message);
150 }
151 }
152 }
153 value
154}
155
156fn sanitize_public_message_page_value(mut value: Value) -> Value {
157 if let Some(messages) = value.get_mut("messages").and_then(Value::as_array_mut) {
158 for message in messages {
159 strip_public_message_run_metadata(message);
160 }
161 }
162 value
163}
164
165fn sanitize_public_mailbox_page_value(mut value: Value, visibility: Option<Visibility>) -> Value {
166 if let Some(items) = value.get_mut("items").and_then(Value::as_array_mut) {
167 for entry in items {
168 strip_public_mailbox_payload_messages(entry, visibility);
169 }
170 }
171 value
172}
173
174fn strip_public_run_state(state: &mut Value) {
175 if let Some(run_state) = state.get_mut("__run").and_then(Value::as_object_mut) {
176 run_state.remove("id");
177 }
178}
179
180fn strip_public_message_run_metadata(message: &mut Value) {
181 if let Some(object) = message.as_object_mut() {
182 let mut remove_metadata = false;
183 if let Some(metadata) = object.get_mut("metadata").and_then(Value::as_object_mut) {
184 metadata.remove("run_id");
185 remove_metadata = metadata.is_empty();
186 }
187 if remove_metadata {
188 object.remove("metadata");
189 }
190 }
191}
192
193fn strip_public_mailbox_payload_messages(entry: &mut Value, visibility: Option<Visibility>) {
194 let Some(payload) = entry.get_mut("payload").and_then(Value::as_object_mut) else {
195 return;
196 };
197 let Some(messages) = payload.get_mut("messages").and_then(Value::as_array_mut) else {
198 return;
199 };
200
201 messages.retain(|message| match visibility {
202 Some(Visibility::All) => mailbox_message_visibility(message) != Visibility::Internal,
203 Some(Visibility::Internal) => mailbox_message_visibility(message) == Visibility::Internal,
204 None => true,
205 });
206
207 for message in messages {
208 strip_public_message_run_metadata(message);
209 }
210}
211
212fn mailbox_message_visibility(message: &Value) -> Visibility {
213 match message.get("visibility").and_then(Value::as_str) {
214 Some("internal") => Visibility::Internal,
215 _ => Visibility::All,
216 }
217}
218
219async fn delete_thread(
224 State(st): State<AppState>,
225 Path(id): Path<String>,
226) -> Result<StatusCode, ApiError> {
227 let store = require_agent_state_store(&st.os)?;
228 store
229 .delete(&id)
230 .await
231 .map_err(|e| ApiError::Internal(e.to_string()))?;
232 Ok(StatusCode::NO_CONTENT)
233}
234
235#[derive(Debug, Deserialize)]
236struct PatchMetadataPayload {
237 title: Option<String>,
238}
239
240async fn patch_thread_metadata(
241 State(st): State<AppState>,
242 Path(id): Path<String>,
243 Json(payload): Json<PatchMetadataPayload>,
244) -> Result<Json<Value>, ApiError> {
245 let store = require_agent_state_store(&st.os)?;
246 let mut thread = store
247 .load_thread(&id)
248 .await
249 .map_err(|e| ApiError::Internal(e.to_string()))?
250 .ok_or(ApiError::ThreadNotFound(id))?;
251
252 if let Some(title) = payload.title {
253 thread
254 .metadata
255 .extra
256 .insert("title".to_string(), Value::String(title));
257 }
258
259 store
260 .save(&thread)
261 .await
262 .map_err(|e| ApiError::Internal(e.to_string()))?;
263
264 Ok(Json(
265 serde_json::to_value(&thread.metadata).unwrap_or_default(),
266 ))
267}
268
269async fn interrupt_thread(
270 State(st): State<AppState>,
271 Path(id): Path<String>,
272) -> Result<Response, ApiError> {
273 let result = st
274 .mailbox_service
275 .control(&id, crate::service::ControlSignal::Interrupt)
276 .await?;
277
278 let superseded_entry_ids: Vec<String> = result
279 .superseded_entries
280 .iter()
281 .map(|entry| entry.entry_id.clone())
282 .collect();
283 Ok((
284 StatusCode::ACCEPTED,
285 Json(json!({
286 "status": "interrupt_requested",
287 "thread_id": id,
288 "generation": result.generation.unwrap_or(0),
289 "cancelled_run_id": result.cancelled_run_id,
290 "superseded_pending_count": result.superseded_entries.len(),
291 "superseded_pending_entry_ids": superseded_entry_ids,
292 })),
293 )
294 .into_response())
295}
296
297fn default_mailbox_limit() -> usize {
298 50
299}
300
301#[derive(Debug, Deserialize)]
302struct MailboxListParams {
303 #[serde(default)]
304 offset: Option<usize>,
305 #[serde(default = "default_mailbox_limit")]
306 limit: usize,
307 #[serde(default)]
308 status: Option<String>,
309 #[serde(default)]
310 origin: Option<String>,
311 #[serde(default)]
312 visibility: Option<String>,
313}
314
315fn parse_mailbox_status(raw: &str) -> Option<MailboxEntryStatus> {
316 match raw.trim().to_ascii_lowercase().as_str() {
317 "queued" => Some(MailboxEntryStatus::Queued),
318 "claimed" => Some(MailboxEntryStatus::Claimed),
319 "accepted" => Some(MailboxEntryStatus::Accepted),
320 "superseded" => Some(MailboxEntryStatus::Superseded),
321 "cancelled" => Some(MailboxEntryStatus::Cancelled),
322 "dead_letter" | "deadletter" => Some(MailboxEntryStatus::DeadLetter),
323 _ => None,
324 }
325}
326
327fn parse_mailbox_origin(raw: &str) -> Option<Option<MailboxEntryOrigin>> {
328 match raw.trim().to_ascii_lowercase().as_str() {
329 "external" => Some(Some(MailboxEntryOrigin::External)),
330 "internal" => Some(Some(MailboxEntryOrigin::Internal)),
331 "none" => Some(None),
332 _ => None,
333 }
334}
335
336fn parse_mailbox_visibility(raw: Option<&str>) -> Option<Visibility> {
337 match raw.map(str::trim).map(str::to_ascii_lowercase).as_deref() {
338 Some("internal") => Some(Visibility::Internal),
339 Some("none") => None,
340 _ => Some(Visibility::All),
341 }
342}
343
344async fn list_thread_mailbox(
345 State(st): State<AppState>,
346 Path(id): Path<String>,
347 Query(params): Query<MailboxListParams>,
348) -> Result<Json<Value>, ApiError> {
349 let origin = parse_mailbox_origin(params.origin.as_deref().unwrap_or("external"))
350 .unwrap_or(Some(MailboxEntryOrigin::External));
351 let visibility = parse_mailbox_visibility(params.visibility.as_deref());
352 let query = MailboxQuery {
353 mailbox_id: Some(id),
354 origin,
355 status: params.status.as_deref().and_then(parse_mailbox_status),
356 offset: params.offset.unwrap_or(0),
357 limit: params.limit.clamp(1, 200),
358 };
359 let page = st
360 .mailbox_store()
361 .list_mailbox_entries(&query)
362 .await
363 .map_err(|e| ApiError::Internal(e.to_string()))?;
364 let value = serde_json::to_value(page).map_err(|e| ApiError::Internal(e.to_string()))?;
365 Ok(Json(sanitize_public_mailbox_page_value(value, visibility)))
366}
367
368#[derive(Debug, Serialize)]
369struct ThreadSummary {
370 id: String,
371 title: Option<String>,
372 updated_at: Option<u64>,
373 created_at: Option<u64>,
374 message_count: usize,
375}
376
377async fn get_thread_summaries(
378 State(st): State<AppState>,
379) -> Result<Json<Vec<ThreadSummary>>, ApiError> {
380 let query = ThreadListQuery {
381 offset: 0,
382 limit: 200,
383 resource_id: None,
384 parent_thread_id: None,
385 };
386 let page = st
387 .read_store
388 .list_paginated(&query)
389 .await
390 .map_err(|e| ApiError::Internal(e.to_string()))?;
391
392 let mut summaries = Vec::with_capacity(page.items.len());
393 for id in &page.items {
394 if let Some(head) = st
395 .read_store
396 .load(id)
397 .await
398 .map_err(|e| ApiError::Internal(e.to_string()))?
399 {
400 if head.thread.parent_thread_id.is_some() {
402 continue;
403 }
404 let title = head
405 .thread
406 .metadata
407 .extra
408 .get("title")
409 .and_then(|v| v.as_str())
410 .map(String::from);
411 summaries.push(ThreadSummary {
412 id: id.clone(),
413 title,
414 updated_at: head.thread.metadata.updated_at,
415 created_at: head.thread.metadata.created_at,
416 message_count: head.thread.messages.len(),
417 });
418 }
419 }
420
421 summaries.sort_by(|a, b| b.updated_at.unwrap_or(0).cmp(&a.updated_at.unwrap_or(0)));
423
424 Ok(Json(summaries))
425}
426
427#[derive(Debug, Deserialize)]
428struct RunListParams {
429 #[serde(default)]
430 offset: Option<usize>,
431 #[serde(default = "default_thread_limit")]
432 limit: usize,
433 #[serde(default)]
434 thread_id: Option<String>,
435 #[serde(default)]
436 parent_run_id: Option<String>,
437 #[serde(default)]
438 status: Option<String>,
439 #[serde(rename = "terminationCode", alias = "termination_code", default)]
440 termination_code: Option<String>,
441 #[serde(default)]
442 origin: Option<String>,
443 #[serde(default)]
444 created_at_from: Option<u64>,
445 #[serde(default)]
446 created_at_to: Option<u64>,
447 #[serde(default)]
448 updated_at_from: Option<u64>,
449 #[serde(default)]
450 updated_at_to: Option<u64>,
451}
452
453fn parse_run_status(raw: &str) -> Option<RunStatus> {
454 match raw.trim().to_ascii_lowercase().as_str() {
455 "running" => Some(RunStatus::Running),
456 "waiting" => Some(RunStatus::Waiting),
457 "done" => Some(RunStatus::Done),
458 _ => None,
459 }
460}
461
462fn parse_run_origin(raw: &str) -> Option<RunOrigin> {
463 match raw.trim().to_ascii_lowercase().as_str() {
464 "user" => Some(RunOrigin::User),
465 "subagent" => Some(RunOrigin::Subagent),
466 "ag_ui" | "ag-ui" | "agui" => Some(RunOrigin::AgUi),
467 "ai_sdk" | "ai-sdk" | "aisdk" => Some(RunOrigin::AiSdk),
468 "a2a" => Some(RunOrigin::A2a),
469 "internal" => Some(RunOrigin::Internal),
470 _ => None,
471 }
472}
473
474fn normalize_termination_code(value: Option<String>) -> Option<String> {
475 value.and_then(|raw| {
476 let trimmed = raw.trim();
477 if trimmed.is_empty() {
478 None
479 } else {
480 Some(trimmed.to_ascii_lowercase())
481 }
482 })
483}
484
485#[derive(Debug, Deserialize)]
486struct CreateRunPayload {
487 #[serde(rename = "agentId", alias = "agent_id")]
488 agent_id: String,
489 #[serde(
490 rename = "threadId",
491 alias = "thread_id",
492 alias = "context_id",
493 alias = "contextId",
494 default
495 )]
496 thread_id: Option<String>,
497 #[serde(
498 rename = "runId",
499 alias = "run_id",
500 alias = "task_id",
501 alias = "taskId",
502 default
503 )]
504 run_id: Option<String>,
505 #[serde(rename = "parentRunId", alias = "parent_run_id", default)]
506 parent_run_id: Option<String>,
507 #[serde(
508 rename = "parentThreadId",
509 alias = "parent_thread_id",
510 alias = "parentContextId",
511 alias = "parent_context_id",
512 default
513 )]
514 parent_thread_id: Option<String>,
515 #[serde(rename = "resourceId", alias = "resource_id", default)]
516 resource_id: Option<String>,
517 #[serde(default)]
518 state: Option<Value>,
519 #[serde(default)]
520 messages: Vec<Message>,
521 #[serde(
522 rename = "initialDecisions",
523 alias = "initial_decisions",
524 alias = "decisions",
525 default
526 )]
527 initial_decisions: Vec<ToolCallDecision>,
528}
529
530impl CreateRunPayload {
531 fn into_run_request(self) -> Result<(String, RunRequest), ApiError> {
532 let agent_id = self.agent_id.trim().to_string();
533 if agent_id.is_empty() {
534 return Err(ApiError::BadRequest("agent_id cannot be empty".to_string()));
535 }
536
537 Ok((
538 agent_id.clone(),
539 RunRequest {
540 agent_id,
541 thread_id: normalize_optional_id(self.thread_id),
542 run_id: normalize_optional_id(self.run_id),
543 parent_run_id: normalize_optional_id(self.parent_run_id),
544 parent_thread_id: normalize_optional_id(self.parent_thread_id),
545 resource_id: normalize_optional_id(self.resource_id),
546 origin: RunOrigin::default(),
547 state: self.state,
548 messages: self.messages,
549 initial_decisions: self.initial_decisions,
550 source_mailbox_entry_id: None,
551 },
552 ))
553 }
554}
555
556#[derive(Debug, Deserialize)]
557struct RunInputPayload {
558 #[serde(rename = "agentId", alias = "agent_id", default)]
559 agent_id: Option<String>,
560 #[serde(default)]
561 messages: Vec<Message>,
562 #[serde(rename = "state", default)]
563 state: Option<Value>,
564 #[serde(rename = "resourceId", alias = "resource_id", default)]
565 resource_id: Option<String>,
566 #[serde(rename = "runId", alias = "run_id", default)]
567 run_id: Option<String>,
568 #[serde(
569 rename = "decisions",
570 alias = "initialDecisions",
571 alias = "initial_decisions",
572 default
573 )]
574 decisions: Vec<ToolCallDecision>,
575}
576
577async fn start_run(
578 State(st): State<AppState>,
579 Json(payload): Json<CreateRunPayload>,
580) -> Result<Response, ApiError> {
581 let (agent_id, run_request) = payload.into_run_request()?;
582
583 let resolved = st.os.resolve(&agent_id).map_err(AgentOsRunError::from)?;
584 let prepared = start_http_run(&st.os, resolved, run_request, &agent_id).await?;
585
586 let encoder = Identity::<AgentEvent>::default();
587 let sse_rx = wire_http_sse_relay(
588 prepared.starter,
589 encoder,
590 prepared.ingress_rx,
591 HttpSseRelayConfig {
592 thread_id: prepared.thread_id,
593 fanout: None,
594 resumable_downstream: false,
595 protocol_label: "run-api",
596 on_relay_done: move |_sse_tx| async move {},
597 error_formatter: |msg| {
598 let error = json!({
599 "type": "error",
600 "message": msg,
601 "code": "RELAY_ERROR",
602 });
603 let payload = serde_json::to_string(&error).unwrap_or_else(|_| {
604 "{\"type\":\"error\",\"message\":\"relay error\",\"code\":\"RELAY_ERROR\"}"
605 .to_string()
606 });
607 Bytes::from(format!("data: {payload}\n\n"))
608 },
609 },
610 );
611
612 Ok(sse_response(sse_body_stream(sse_rx)))
613}
614
615async fn push_run_inputs(
616 State(st): State<AppState>,
617 Path(id): Path<String>,
618 Json(mut payload): Json<RunInputPayload>,
619) -> Result<Response, ApiError> {
620 payload.resource_id = normalize_optional_id(payload.resource_id);
621 payload.run_id = normalize_optional_id(payload.run_id);
622 let decisions = payload.decisions;
623
624 if payload.messages.is_empty() && decisions.is_empty() {
625 return Err(ApiError::BadRequest(
626 "messages and decisions cannot both be empty".to_string(),
627 ));
628 }
629
630 if payload.messages.is_empty() {
631 let forwarded = try_forward_decisions_to_active_run_by_id(
632 &st.os,
633 st.read_store.as_ref(),
634 &id,
635 decisions,
636 )
637 .await?;
638
639 return Ok((
640 StatusCode::ACCEPTED,
641 Json(json!({
642 "status": "decision_forwarded",
643 "run_id": id,
644 "thread_id": forwarded.thread_id,
645 })),
646 )
647 .into_response());
648 }
649
650 let Some(parent_run) = load_run_record(st.read_store.as_ref(), &id).await? else {
651 return Err(ApiError::RunNotFound(id));
652 };
653 let agent_id = payload
654 .agent_id
655 .as_deref()
656 .map(str::trim)
657 .filter(|value| !value.is_empty())
658 .ok_or_else(|| {
659 ApiError::BadRequest("agent_id is required when messages are provided".to_string())
660 })?
661 .to_string();
662 let run_request = RunRequest {
663 agent_id: agent_id.clone(),
664 thread_id: Some(parent_run.thread_id.clone()),
665 run_id: payload.run_id,
666 parent_run_id: Some(id.clone()),
667 parent_thread_id: parent_run.parent_thread_id,
668 resource_id: payload.resource_id,
669 origin: RunOrigin::default(),
670 state: payload.state,
671 messages: payload.messages,
672 initial_decisions: decisions,
673 source_mailbox_entry_id: None,
674 };
675
676 let (thread_id, _run_id, _entry_id) = start_background_run(
677 &st.mailbox_service,
678 &agent_id,
679 run_request,
680 EnqueueOptions::default(),
681 )
682 .await?;
683 Ok((
684 StatusCode::ACCEPTED,
685 Json(json!({
686 "status": "continuation_started",
687 "parent_run_id": id,
688 "thread_id": thread_id,
689 })),
690 )
691 .into_response())
692}
693
694async fn cancel_run(
695 State(st): State<AppState>,
696 Path(id): Path<String>,
697) -> Result<Response, ApiError> {
698 if try_cancel_active_or_queued_run_by_id(&st.os, st.mailbox_store(), &id)
699 .await?
700 .is_some()
701 {
702 return Ok((
703 StatusCode::ACCEPTED,
704 Json(json!({
705 "status": "cancel_requested",
706 "run_id": id,
707 })),
708 )
709 .into_response());
710 }
711
712 Err(
713 match check_run_liveness(st.read_store.as_ref(), &id).await? {
714 RunLookup::ExistsButInactive => ApiError::BadRequest("run is not active".to_string()),
715 RunLookup::NotFound => ApiError::RunNotFound(id),
716 },
717 )
718}
719
720async fn get_run(
721 State(st): State<AppState>,
722 Path(id): Path<String>,
723) -> Result<Json<RunRecord>, ApiError> {
724 let Some(record) = st
725 .read_store
726 .load_run(&id)
727 .await
728 .map_err(|e| ApiError::Internal(e.to_string()))?
729 else {
730 return Err(ApiError::RunNotFound(id));
731 };
732 Ok(Json(record))
733}
734
735async fn list_runs(
736 State(st): State<AppState>,
737 Query(params): Query<RunListParams>,
738) -> Result<Json<RunPage>, ApiError> {
739 let query = RunQuery {
740 offset: params.offset.unwrap_or(0),
741 limit: params.limit.clamp(1, 200),
742 thread_id: params.thread_id,
743 parent_run_id: params.parent_run_id,
744 status: params.status.as_deref().and_then(parse_run_status),
745 termination_code: normalize_termination_code(params.termination_code),
746 origin: params.origin.as_deref().and_then(parse_run_origin),
747 created_at_from: params.created_at_from,
748 created_at_to: params.created_at_to,
749 updated_at_from: params.updated_at_from,
750 updated_at_to: params.updated_at_to,
751 };
752 let page = st
753 .read_store
754 .list_runs(&query)
755 .await
756 .map_err(|e| ApiError::Internal(e.to_string()))?;
757 Ok(Json(page))
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763
764 #[test]
765 fn parse_run_filters() {
766 assert_eq!(parse_run_status("running"), Some(RunStatus::Running));
767 assert_eq!(parse_run_status("waiting"), Some(RunStatus::Waiting));
768 assert_eq!(parse_run_status("done"), Some(RunStatus::Done));
769 assert_eq!(parse_run_status("unknown"), None);
770 assert_eq!(
771 normalize_termination_code(Some(" Cancelled ".to_string())),
772 Some("cancelled".to_string())
773 );
774
775 assert_eq!(parse_run_origin("a2a"), Some(RunOrigin::A2a));
776 assert_eq!(parse_run_origin("ag-ui"), Some(RunOrigin::AgUi));
777 assert_eq!(parse_run_origin("x"), None);
778 }
779
780 #[test]
781 fn parse_mailbox_filters_default_to_external_public_view() {
782 assert_eq!(
783 parse_mailbox_origin("external"),
784 Some(Some(MailboxEntryOrigin::External))
785 );
786 assert_eq!(
787 parse_mailbox_origin("internal"),
788 Some(Some(MailboxEntryOrigin::Internal))
789 );
790 assert_eq!(parse_mailbox_origin("none"), Some(None));
791 assert_eq!(parse_mailbox_origin("x"), None);
792
793 assert_eq!(parse_mailbox_visibility(None), Some(Visibility::All));
794 assert_eq!(
795 parse_mailbox_visibility(Some("internal")),
796 Some(Visibility::Internal)
797 );
798 assert_eq!(parse_mailbox_visibility(Some("none")), None);
799 }
800}