tirea_agentos_server/protocol/ag_ui/
http.rs

1use axum::extract::{Path, Query, State};
2use axum::response::{IntoResponse, Response};
3use axum::routing::{get, post};
4use axum::{Json, Router};
5use bytes::Bytes;
6use serde_json::json;
7use tirea_agentos::runtime::AgentOsRunError;
8use tirea_protocol_ag_ui::{AgUiHistoryEncoder, AgUiProtocolEncoder, Event, RunAgentInput};
9
10use super::runtime::apply_agui_extensions;
11
12use crate::service::{
13    encode_message_page, forward_dialog_decisions_by_thread, load_message_page,
14    start_http_dialog_run, ApiError, AppState, MessageQueryParams,
15};
16use crate::transport::http_run::{wire_http_sse_relay, HttpSseRelayConfig};
17use crate::transport::http_sse::{sse_body_stream, sse_response};
18
19const RUN_PATH: &str = "/agents/:agent_id/runs";
20const THREAD_MESSAGES_PATH: &str = "/threads/:id/messages";
21
22/// Build AG-UI HTTP routes.
23pub fn routes() -> Router<AppState> {
24    Router::new()
25        .route(RUN_PATH, post(run))
26        .route(THREAD_MESSAGES_PATH, get(thread_messages))
27}
28
29async fn thread_messages(
30    State(st): State<AppState>,
31    Path(id): Path<String>,
32    Query(params): Query<MessageQueryParams>,
33) -> Result<impl IntoResponse, ApiError> {
34    let page = load_message_page(&st.read_store, &id, &params).await?;
35    let encoded = encode_message_page(page, AgUiHistoryEncoder::encode_message);
36    Ok(Json(encoded))
37}
38
39async fn run(
40    State(st): State<AppState>,
41    Path(agent_id): Path<String>,
42    Json(req): Json<RunAgentInput>,
43) -> Result<Response, ApiError> {
44    req.validate()
45        .map_err(|e| ApiError::BadRequest(e.to_string()))?;
46    let frontend_run_id = req.run_id.clone();
47
48    let suspension_decisions = req.suspension_decisions();
49    let maybe_forwarded = forward_dialog_decisions_by_thread(
50        &st.os,
51        &agent_id,
52        &req.thread_id,
53        req.has_user_input(),
54        Some(frontend_run_id.as_str()),
55        &suspension_decisions,
56    )
57    .await?;
58    if let Some(forwarded) = maybe_forwarded {
59        return Ok((
60            axum::http::StatusCode::ACCEPTED,
61            Json(json!({
62                "status": "decision_forwarded",
63                "threadId": forwarded.thread_id,
64                "runId": frontend_run_id,
65            })),
66        )
67            .into_response());
68    }
69
70    let mut resolved = st.os.resolve(&agent_id).map_err(AgentOsRunError::from)?;
71    apply_agui_extensions(&mut resolved, &req);
72    let run_request = req.into_runtime_run_request(agent_id.clone());
73
74    let prepared = start_http_dialog_run(&st.os, resolved, run_request, &agent_id).await?;
75    let enc = AgUiProtocolEncoder::new_with_frontend_run_id(frontend_run_id);
76    let sse_rx = wire_http_sse_relay(
77        prepared.starter,
78        enc,
79        prepared.ingress_rx,
80        HttpSseRelayConfig {
81            thread_id: prepared.thread_id,
82            fanout: None,
83            resumable_downstream: false,
84            protocol_label: "ag-ui",
85            on_relay_done: move |_sse_tx| async move {},
86            error_formatter: |msg| {
87                let json =
88                    serde_json::to_string(&Event::run_error(&msg, Some("RELAY_ERROR".to_string())))
89                        .unwrap_or_default();
90                Bytes::from(format!("data: {json}\n\n"))
91            },
92        },
93    );
94
95    Ok(sse_response(sse_body_stream(sse_rx)))
96}