tirea_agentos_server/protocol/ag_ui/
http.rs1use axum::extract::{Path, Query, State};
2use axum::response::{IntoResponse, Response};
3use axum::routing::{get, post};
4use axum::{Json, Router};
5use bytes::Bytes;
6use serde_json::json;
7use tirea_agentos::runtime::AgentOsRunError;
8use tirea_protocol_ag_ui::{AgUiHistoryEncoder, AgUiProtocolEncoder, Event, RunAgentInput};
9
10use super::runtime::apply_agui_extensions;
11
12use crate::service::{
13 encode_message_page, forward_dialog_decisions_by_thread, load_message_page,
14 start_http_dialog_run, ApiError, AppState, MessageQueryParams,
15};
16use crate::transport::http_run::{wire_http_sse_relay, HttpSseRelayConfig};
17use crate::transport::http_sse::{sse_body_stream, sse_response};
18
19const RUN_PATH: &str = "/agents/:agent_id/runs";
20const THREAD_MESSAGES_PATH: &str = "/threads/:id/messages";
21
22pub fn routes() -> Router<AppState> {
24 Router::new()
25 .route(RUN_PATH, post(run))
26 .route(THREAD_MESSAGES_PATH, get(thread_messages))
27}
28
29async fn thread_messages(
30 State(st): State<AppState>,
31 Path(id): Path<String>,
32 Query(params): Query<MessageQueryParams>,
33) -> Result<impl IntoResponse, ApiError> {
34 let page = load_message_page(&st.read_store, &id, ¶ms).await?;
35 let encoded = encode_message_page(page, AgUiHistoryEncoder::encode_message);
36 Ok(Json(encoded))
37}
38
39async fn run(
40 State(st): State<AppState>,
41 Path(agent_id): Path<String>,
42 Json(req): Json<RunAgentInput>,
43) -> Result<Response, ApiError> {
44 req.validate()
45 .map_err(|e| ApiError::BadRequest(e.to_string()))?;
46 let frontend_run_id = req.run_id.clone();
47
48 let suspension_decisions = req.suspension_decisions();
49 let maybe_forwarded = forward_dialog_decisions_by_thread(
50 &st.os,
51 &agent_id,
52 &req.thread_id,
53 req.has_user_input(),
54 Some(frontend_run_id.as_str()),
55 &suspension_decisions,
56 )
57 .await?;
58 if let Some(forwarded) = maybe_forwarded {
59 return Ok((
60 axum::http::StatusCode::ACCEPTED,
61 Json(json!({
62 "status": "decision_forwarded",
63 "threadId": forwarded.thread_id,
64 "runId": frontend_run_id,
65 })),
66 )
67 .into_response());
68 }
69
70 let mut resolved = st.os.resolve(&agent_id).map_err(AgentOsRunError::from)?;
71 apply_agui_extensions(&mut resolved, &req);
72 let run_request = req.into_runtime_run_request(agent_id.clone());
73
74 let prepared = start_http_dialog_run(&st.os, resolved, run_request, &agent_id).await?;
75 let enc = AgUiProtocolEncoder::new_with_frontend_run_id(frontend_run_id);
76 let sse_rx = wire_http_sse_relay(
77 prepared.starter,
78 enc,
79 prepared.ingress_rx,
80 HttpSseRelayConfig {
81 thread_id: prepared.thread_id,
82 fanout: None,
83 resumable_downstream: false,
84 protocol_label: "ag-ui",
85 on_relay_done: move |_sse_tx| async move {},
86 error_formatter: |msg| {
87 let json =
88 serde_json::to_string(&Event::run_error(&msg, Some("RELAY_ERROR".to_string())))
89 .unwrap_or_default();
90 Bytes::from(format!("data: {json}\n\n"))
91 },
92 },
93 );
94
95 Ok(sse_response(sse_body_stream(sse_rx)))
96}