tirea_agentos_server/protocol/a2a/
http.rs

1use axum::extract::{Path, State};
2use axum::http::header::{CACHE_CONTROL, ETAG, IF_NONE_MATCH};
3use axum::http::StatusCode;
4use axum::http::{HeaderMap, HeaderValue};
5use axum::response::{IntoResponse, Response};
6use axum::routing::{get, post};
7use axum::{Json, Router};
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value};
10use tirea_agentos::contracts::thread::{Message, Role, Visibility};
11use tirea_agentos::contracts::{RunOrigin, RunRequest, ToolCallDecision};
12
13use crate::service::{
14    check_run_liveness, load_background_task, normalize_optional_id, resolve_thread_id_from_run,
15    start_background_run, try_cancel_active_or_queued_run_by_id,
16    try_forward_decisions_to_active_run_by_id, ApiError, AppState, BackgroundTaskLookup,
17    EnqueueOptions, RunLookup,
18};
19
20const WELL_KNOWN_AGENT_CARD_PATH: &str = "/.well-known/agent-card.json";
21const WELL_KNOWN_CACHE_CONTROL: &str = "public, max-age=30, must-revalidate";
22const AGENTS_PATH: &str = "/agents";
23const AGENT_CARD_PATH: &str = "/agents/:agent_id/agent-card";
24const MESSAGE_SEND_PATH: &str = "/agents/:agent_id/:action";
25const TASK_PATH: &str = "/agents/:agent_id/tasks/:task_action";
26
27/// Build top-level well-known A2A discovery route.
28pub fn well_known_routes() -> Router<AppState> {
29    Router::new().route(WELL_KNOWN_AGENT_CARD_PATH, get(well_known_agent_card))
30}
31
32/// Build A2A-compatible HTTP routes.
33pub fn routes() -> Router<AppState> {
34    Router::new()
35        .route(AGENTS_PATH, get(list_agents))
36        .route(AGENT_CARD_PATH, get(get_agent_card))
37        .route(MESSAGE_SEND_PATH, post(message_send))
38        .route(TASK_PATH, get(get_task).post(cancel_task))
39}
40
41#[derive(Debug, Serialize)]
42#[serde(rename_all = "camelCase")]
43struct A2aGatewayCard {
44    name: String,
45    description: String,
46    version: String,
47    url: String,
48    default_input_modes: Vec<String>,
49    default_output_modes: Vec<String>,
50    capabilities: Value,
51}
52
53fn sorted_agent_ids(agent_ids: &[String]) -> Vec<String> {
54    let mut ids = agent_ids.to_vec();
55    ids.sort_unstable();
56    ids.dedup();
57    ids
58}
59
60fn build_gateway_card(agent_ids: &[String]) -> A2aGatewayCard {
61    let normalized_agent_ids = sorted_agent_ids(agent_ids);
62    let (name, description, url) = match normalized_agent_ids.as_slice() {
63        [single_agent_id] => (
64            format!("tirea-agent-{single_agent_id}"),
65            format!("A2A discovery card for Tirea agent '{single_agent_id}'"),
66            format!("/v1/a2a/agents/{single_agent_id}/message:send"),
67        ),
68        _ => (
69            "tirea-a2a-gateway".to_string(),
70            format!(
71                "A2A discovery card for Tirea multi-agent gateway ({} agents)",
72                normalized_agent_ids.len()
73            ),
74            "/v1/a2a/agents".to_string(),
75        ),
76    };
77
78    A2aGatewayCard {
79        name,
80        description,
81        version: "1.0".to_string(),
82        url,
83        default_input_modes: vec!["application/json".to_string()],
84        default_output_modes: vec!["application/json".to_string()],
85        capabilities: json!({
86            "taskManagement": true,
87            "streaming": true,
88            "agentDiscovery": true,
89            "agentCount": normalized_agent_ids.len(),
90            "agents": normalized_agent_ids,
91        }),
92    }
93}
94
95fn fnv1a64(data: &[u8]) -> u64 {
96    let mut hash = 0xcbf29ce484222325u64;
97    for byte in data {
98        hash ^= u64::from(*byte);
99        hash = hash.wrapping_mul(0x100000001b3);
100    }
101    hash
102}
103
104fn build_well_known_etag(agent_ids: &[String]) -> String {
105    let canonical = format!("v1|{}", sorted_agent_ids(agent_ids).join("\u{001f}"));
106    format!("W/\"a2a-agents-{:016x}\"", fnv1a64(canonical.as_bytes()))
107}
108
109fn if_none_match_matches(headers: &HeaderMap, etag: &str) -> bool {
110    let Some(raw) = headers.get(IF_NONE_MATCH) else {
111        return false;
112    };
113    let Ok(raw) = raw.to_str() else {
114        return false;
115    };
116    raw.split(',')
117        .map(str::trim)
118        .any(|candidate| candidate == "*" || candidate == etag)
119}
120
121async fn well_known_agent_card(State(st): State<AppState>, headers: HeaderMap) -> Response {
122    let agent_ids = st.os.agent_ids();
123    let etag = build_well_known_etag(&agent_ids);
124
125    if if_none_match_matches(&headers, &etag) {
126        let mut response = StatusCode::NOT_MODIFIED.into_response();
127        response.headers_mut().insert(
128            CACHE_CONTROL,
129            HeaderValue::from_static(WELL_KNOWN_CACHE_CONTROL),
130        );
131        if let Ok(value) = HeaderValue::from_str(&etag) {
132            response.headers_mut().insert(ETAG, value);
133        }
134        return response;
135    }
136
137    let mut response = Json(build_gateway_card(&agent_ids)).into_response();
138    response.headers_mut().insert(
139        CACHE_CONTROL,
140        HeaderValue::from_static(WELL_KNOWN_CACHE_CONTROL),
141    );
142    if let Ok(value) = HeaderValue::from_str(&etag) {
143        response.headers_mut().insert(ETAG, value);
144    }
145    response
146}
147
148async fn list_agents(State(st): State<AppState>) -> Json<Vec<A2aAgentEntry>> {
149    let mut agent_ids = st.os.agent_ids();
150    agent_ids.sort_unstable();
151    agent_ids.dedup();
152    let entries = agent_ids
153        .into_iter()
154        .map(|agent_id| A2aAgentEntry {
155            agent_card_url: format!("/v1/a2a/agents/{agent_id}/agent-card"),
156            message_send_url: format!("/v1/a2a/agents/{agent_id}/message:send"),
157            agent_id,
158        })
159        .collect::<Vec<_>>();
160    Json(entries)
161}
162
163#[derive(Debug, Serialize)]
164#[serde(rename_all = "camelCase")]
165struct A2aAgentEntry {
166    agent_id: String,
167    agent_card_url: String,
168    message_send_url: String,
169}
170
171#[derive(Debug, Serialize)]
172#[serde(rename_all = "camelCase")]
173struct A2aAgentCard {
174    name: String,
175    description: String,
176    version: String,
177    url: String,
178    default_input_modes: Vec<String>,
179    default_output_modes: Vec<String>,
180    capabilities: Value,
181}
182
183fn build_agent_card(agent_id: &str) -> A2aAgentCard {
184    A2aAgentCard {
185        name: format!("tirea-agent-{agent_id}"),
186        description: format!("A2A card for Tirea agent '{agent_id}'"),
187        version: "1.0".to_string(),
188        url: format!("/v1/a2a/agents/{agent_id}/message:send"),
189        default_input_modes: vec!["application/json".to_string()],
190        default_output_modes: vec!["application/json".to_string()],
191        capabilities: json!({
192            "taskManagement": true,
193            "streaming": true
194        }),
195    }
196}
197
198async fn get_agent_card(
199    State(st): State<AppState>,
200    Path(agent_id): Path<String>,
201) -> Result<Json<A2aAgentCard>, ApiError> {
202    st.os
203        .validate_agent(&agent_id)
204        .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
205    Ok(Json(build_agent_card(&agent_id)))
206}
207
208#[derive(Debug, Deserialize)]
209struct A2aMessage {
210    #[serde(default)]
211    role: Option<String>,
212    content: String,
213}
214
215#[derive(Debug, Deserialize)]
216struct A2aMessageSendPayload {
217    #[serde(rename = "contextId", alias = "context_id", default)]
218    context_id: Option<String>,
219    #[serde(rename = "taskId", alias = "task_id", default)]
220    task_id: Option<String>,
221    #[serde(default)]
222    message: Option<A2aMessage>,
223    #[serde(default)]
224    input: Option<String>,
225    #[serde(default)]
226    decisions: Vec<ToolCallDecision>,
227}
228
229fn latest_public_assistant_output(
230    thread: &tirea_agentos::contracts::thread::Thread,
231) -> Option<String> {
232    thread
233        .messages
234        .iter()
235        .rev()
236        .find(|message| message.visibility == Visibility::All && message.role == Role::Assistant)
237        .map(|message| message.content.trim().to_string())
238        .filter(|content| !content.is_empty())
239}
240
241fn public_history(thread: &tirea_agentos::contracts::thread::Thread) -> Vec<Value> {
242    thread
243        .messages
244        .iter()
245        .filter(|message| message.visibility == Visibility::All)
246        .filter_map(|message| match message.role {
247            Role::User | Role::Assistant | Role::System => Some(json!({
248                "role": match message.role {
249                    Role::User => "user",
250                    Role::Assistant => "assistant",
251                    Role::System => "system",
252                    Role::Tool => unreachable!("tool messages are filtered out"),
253                },
254                "content": message.content,
255            })),
256            Role::Tool => None,
257        })
258        .collect()
259}
260
261impl A2aMessageSendPayload {
262    fn to_messages(&self) -> Vec<Message> {
263        let mut out = Vec::new();
264        if let Some(input) = self.input.as_deref() {
265            let trimmed = input.trim();
266            if !trimmed.is_empty() {
267                out.push(Message::user(trimmed));
268            }
269        }
270        if let Some(message) = self.message.as_ref() {
271            let content = message.content.trim();
272            if !content.is_empty() {
273                let mapped = match message
274                    .role
275                    .as_deref()
276                    .map(str::trim)
277                    .map(str::to_ascii_lowercase)
278                {
279                    Some(role) if role == "assistant" => Message::assistant(content),
280                    Some(role) if role == "system" => Message::system(content),
281                    _ => Message::user(content),
282                };
283                out.push(mapped);
284            }
285        }
286        out
287    }
288}
289
290async fn message_send(
291    State(st): State<AppState>,
292    Path((agent_id, action)): Path<(String, String)>,
293    Json(payload): Json<A2aMessageSendPayload>,
294) -> Result<Response, ApiError> {
295    if action != "message:send" {
296        return Err(ApiError::BadRequest(
297            "unsupported A2A action; expected 'message:send'".to_string(),
298        ));
299    }
300
301    st.os
302        .validate_agent(&agent_id)
303        .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
304
305    let task_id = normalize_optional_id(payload.task_id.clone());
306    let context_id = normalize_optional_id(payload.context_id.clone());
307    let messages = payload.to_messages();
308    let decisions = payload.decisions;
309
310    if task_id.is_none() && context_id.is_none() && messages.is_empty() && decisions.is_empty() {
311        return Err(ApiError::BadRequest(
312            "message send payload must include input/message/decisions/context/task".to_string(),
313        ));
314    }
315
316    if messages.is_empty() && !decisions.is_empty() {
317        let Some(task_id) = task_id.as_deref() else {
318            return Err(ApiError::BadRequest(
319                "task_id is required when forwarding decisions only".to_string(),
320            ));
321        };
322
323        let forwarded = try_forward_decisions_to_active_run_by_id(
324            &st.os,
325            st.read_store.as_ref(),
326            task_id,
327            decisions,
328        )
329        .await?;
330        return Ok((
331            StatusCode::ACCEPTED,
332            Json(json!({
333                "contextId": forwarded.thread_id,
334                "taskId": task_id,
335                "status": "decision_forwarded",
336            })),
337        )
338            .into_response());
339    }
340
341    let thread_id = if let Some(context_id) = context_id {
342        Some(context_id)
343    } else if let Some(task_id) = task_id.as_deref() {
344        resolve_thread_id_from_run(st.read_store.as_ref(), task_id).await?
345    } else {
346        None
347    };
348
349    if task_id.is_some() && thread_id.is_none() {
350        return Err(ApiError::RunNotFound(task_id.unwrap_or_default()));
351    }
352
353    let run_request = RunRequest {
354        agent_id: agent_id.clone(),
355        thread_id,
356        run_id: None,
357        parent_run_id: task_id,
358        parent_thread_id: None,
359        resource_id: None,
360        origin: RunOrigin::A2a,
361        state: None,
362        messages,
363        initial_decisions: decisions,
364        source_mailbox_entry_id: None,
365    };
366
367    let (context_id, _run_id, task_id) = start_background_run(
368        &st.mailbox_service,
369        &agent_id,
370        run_request,
371        EnqueueOptions::default(),
372    )
373    .await?;
374    Ok((
375        StatusCode::ACCEPTED,
376        Json(json!({
377            "contextId": context_id,
378            "taskId": task_id,
379            "status": "submitted",
380        })),
381    )
382        .into_response())
383}
384
385async fn get_task(
386    State(st): State<AppState>,
387    Path((agent_id, task_action)): Path<(String, String)>,
388) -> Result<Response, ApiError> {
389    st.os
390        .validate_agent(&agent_id)
391        .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
392    if task_action.ends_with(":cancel") {
393        return Err(ApiError::BadRequest(
394            "use POST for task cancellation".to_string(),
395        ));
396    }
397    let task_id = task_action.trim().to_string();
398    if task_id.is_empty() {
399        return Err(ApiError::BadRequest(
400            "task_id is required in task path".to_string(),
401        ));
402    }
403    let Some(task) = load_background_task(
404        st.read_store.as_ref(),
405        st.mailbox_store().as_ref(),
406        &task_id,
407    )
408    .await?
409    else {
410        return Err(ApiError::RunNotFound(task_id));
411    };
412    Ok(match task {
413        BackgroundTaskLookup::Run(record) => {
414            let thread = st
415                .read_store
416                .load(&record.thread_id)
417                .await
418                .map_err(|err| ApiError::Internal(err.to_string()))?;
419            let latest_output = thread
420                .as_ref()
421                .and_then(|head| latest_public_assistant_output(&head.thread));
422            let history = thread
423                .as_ref()
424                .map(|head| public_history(&head.thread))
425                .unwrap_or_default();
426            let artifacts = latest_output
427                .as_ref()
428                .map(|content| vec![json!({ "content": content })])
429                .unwrap_or_default();
430            let message = latest_output
431                .as_ref()
432                .map(|content| json!({ "role": "assistant", "content": content }));
433
434            Json(json!({
435                "taskId": task_id,
436                "contextId": record.thread_id,
437                "status": record.status,
438                "origin": record.origin,
439                "terminationCode": record.termination_code,
440                "terminationDetail": record.termination_detail,
441                "createdAt": record.created_at,
442                "updatedAt": record.updated_at,
443                "message": message,
444                "artifacts": artifacts,
445                "history": history,
446            }))
447        }
448        BackgroundTaskLookup::Mailbox(entry) => Json(json!({
449            "taskId": task_id,
450            "contextId": entry.mailbox_id,
451            "status": entry.status,
452            "createdAt": entry.created_at,
453            "updatedAt": entry.updated_at,
454        })),
455    }
456    .into_response())
457}
458
459async fn cancel_task(
460    State(st): State<AppState>,
461    Path((agent_id, task_action)): Path<(String, String)>,
462) -> Result<Response, ApiError> {
463    st.os
464        .validate_agent(&agent_id)
465        .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
466
467    let Some(task_id) = task_action.strip_suffix(":cancel") else {
468        return Err(ApiError::BadRequest(
469            "task cancel path must end with ':cancel'".to_string(),
470        ));
471    };
472    let task_id = task_id.trim().to_string();
473    if task_id.is_empty() {
474        return Err(ApiError::BadRequest(
475            "task_id is required in cancel path".to_string(),
476        ));
477    }
478
479    if try_cancel_active_or_queued_run_by_id(&st.os, st.mailbox_store(), &task_id)
480        .await?
481        .is_some()
482    {
483        return Ok((
484            StatusCode::ACCEPTED,
485            Json(json!({
486                "taskId": task_id,
487                "status": "cancel_requested",
488            })),
489        )
490            .into_response());
491    }
492
493    Err(
494        match load_background_task(
495            st.read_store.as_ref(),
496            st.mailbox_store().as_ref(),
497            &task_id,
498        )
499        .await?
500        {
501            Some(_) => ApiError::BadRequest("task is not active".to_string()),
502            None => match check_run_liveness(st.read_store.as_ref(), &task_id).await? {
503                RunLookup::ExistsButInactive => {
504                    ApiError::BadRequest("task is not active".to_string())
505                }
506                RunLookup::NotFound => ApiError::RunNotFound(task_id),
507            },
508        },
509    )
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn well_known_etag_is_stable_and_changes_with_agent_set() {
518        let etag_a = build_well_known_etag(&["alpha".to_string()]);
519        let etag_b = build_well_known_etag(&["alpha".to_string()]);
520        let etag_c = build_well_known_etag(&["beta".to_string(), "alpha".to_string()]);
521        let etag_d = build_well_known_etag(&["alpha".to_string(), "beta".to_string()]);
522        assert_eq!(etag_a, etag_b);
523        assert_eq!(etag_c, etag_d);
524        assert_ne!(
525            etag_a, etag_d,
526            "adding an extra agent id should change ETag"
527        );
528    }
529
530    #[test]
531    fn if_none_match_matches_star_and_csv_values() {
532        let etag = "W/\"a2a-agents-deadbeef\"";
533
534        let mut headers = HeaderMap::new();
535        headers.insert(IF_NONE_MATCH, HeaderValue::from_static("*"));
536        assert!(if_none_match_matches(&headers, etag));
537
538        let mut headers = HeaderMap::new();
539        headers.insert(
540            IF_NONE_MATCH,
541            HeaderValue::from_str(&format!("\"other\", {etag}")).expect("valid header"),
542        );
543        assert!(if_none_match_matches(&headers, etag));
544
545        let mut headers = HeaderMap::new();
546        headers.insert(IF_NONE_MATCH, HeaderValue::from_static("\"other\""));
547        assert!(!if_none_match_matches(&headers, etag));
548    }
549}