1use axum::extract::{Path, State};
2use axum::http::header::{CACHE_CONTROL, ETAG, IF_NONE_MATCH};
3use axum::http::StatusCode;
4use axum::http::{HeaderMap, HeaderValue};
5use axum::response::{IntoResponse, Response};
6use axum::routing::{get, post};
7use axum::{Json, Router};
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value};
10use tirea_agentos::contracts::thread::{Message, Role, Visibility};
11use tirea_agentos::contracts::{RunOrigin, RunRequest, ToolCallDecision};
12
13use crate::service::{
14 check_run_liveness, load_background_task, normalize_optional_id, resolve_thread_id_from_run,
15 start_background_run, try_cancel_active_or_queued_run_by_id,
16 try_forward_decisions_to_active_run_by_id, ApiError, AppState, BackgroundTaskLookup,
17 EnqueueOptions, RunLookup,
18};
19
20const WELL_KNOWN_AGENT_CARD_PATH: &str = "/.well-known/agent-card.json";
21const WELL_KNOWN_CACHE_CONTROL: &str = "public, max-age=30, must-revalidate";
22const AGENTS_PATH: &str = "/agents";
23const AGENT_CARD_PATH: &str = "/agents/:agent_id/agent-card";
24const MESSAGE_SEND_PATH: &str = "/agents/:agent_id/:action";
25const TASK_PATH: &str = "/agents/:agent_id/tasks/:task_action";
26
27pub fn well_known_routes() -> Router<AppState> {
29 Router::new().route(WELL_KNOWN_AGENT_CARD_PATH, get(well_known_agent_card))
30}
31
32pub fn routes() -> Router<AppState> {
34 Router::new()
35 .route(AGENTS_PATH, get(list_agents))
36 .route(AGENT_CARD_PATH, get(get_agent_card))
37 .route(MESSAGE_SEND_PATH, post(message_send))
38 .route(TASK_PATH, get(get_task).post(cancel_task))
39}
40
41#[derive(Debug, Serialize)]
42#[serde(rename_all = "camelCase")]
43struct A2aGatewayCard {
44 name: String,
45 description: String,
46 version: String,
47 url: String,
48 default_input_modes: Vec<String>,
49 default_output_modes: Vec<String>,
50 capabilities: Value,
51}
52
53fn sorted_agent_ids(agent_ids: &[String]) -> Vec<String> {
54 let mut ids = agent_ids.to_vec();
55 ids.sort_unstable();
56 ids.dedup();
57 ids
58}
59
60fn build_gateway_card(agent_ids: &[String]) -> A2aGatewayCard {
61 let normalized_agent_ids = sorted_agent_ids(agent_ids);
62 let (name, description, url) = match normalized_agent_ids.as_slice() {
63 [single_agent_id] => (
64 format!("tirea-agent-{single_agent_id}"),
65 format!("A2A discovery card for Tirea agent '{single_agent_id}'"),
66 format!("/v1/a2a/agents/{single_agent_id}/message:send"),
67 ),
68 _ => (
69 "tirea-a2a-gateway".to_string(),
70 format!(
71 "A2A discovery card for Tirea multi-agent gateway ({} agents)",
72 normalized_agent_ids.len()
73 ),
74 "/v1/a2a/agents".to_string(),
75 ),
76 };
77
78 A2aGatewayCard {
79 name,
80 description,
81 version: "1.0".to_string(),
82 url,
83 default_input_modes: vec!["application/json".to_string()],
84 default_output_modes: vec!["application/json".to_string()],
85 capabilities: json!({
86 "taskManagement": true,
87 "streaming": true,
88 "agentDiscovery": true,
89 "agentCount": normalized_agent_ids.len(),
90 "agents": normalized_agent_ids,
91 }),
92 }
93}
94
95fn fnv1a64(data: &[u8]) -> u64 {
96 let mut hash = 0xcbf29ce484222325u64;
97 for byte in data {
98 hash ^= u64::from(*byte);
99 hash = hash.wrapping_mul(0x100000001b3);
100 }
101 hash
102}
103
104fn build_well_known_etag(agent_ids: &[String]) -> String {
105 let canonical = format!("v1|{}", sorted_agent_ids(agent_ids).join("\u{001f}"));
106 format!("W/\"a2a-agents-{:016x}\"", fnv1a64(canonical.as_bytes()))
107}
108
109fn if_none_match_matches(headers: &HeaderMap, etag: &str) -> bool {
110 let Some(raw) = headers.get(IF_NONE_MATCH) else {
111 return false;
112 };
113 let Ok(raw) = raw.to_str() else {
114 return false;
115 };
116 raw.split(',')
117 .map(str::trim)
118 .any(|candidate| candidate == "*" || candidate == etag)
119}
120
121async fn well_known_agent_card(State(st): State<AppState>, headers: HeaderMap) -> Response {
122 let agent_ids = st.os.agent_ids();
123 let etag = build_well_known_etag(&agent_ids);
124
125 if if_none_match_matches(&headers, &etag) {
126 let mut response = StatusCode::NOT_MODIFIED.into_response();
127 response.headers_mut().insert(
128 CACHE_CONTROL,
129 HeaderValue::from_static(WELL_KNOWN_CACHE_CONTROL),
130 );
131 if let Ok(value) = HeaderValue::from_str(&etag) {
132 response.headers_mut().insert(ETAG, value);
133 }
134 return response;
135 }
136
137 let mut response = Json(build_gateway_card(&agent_ids)).into_response();
138 response.headers_mut().insert(
139 CACHE_CONTROL,
140 HeaderValue::from_static(WELL_KNOWN_CACHE_CONTROL),
141 );
142 if let Ok(value) = HeaderValue::from_str(&etag) {
143 response.headers_mut().insert(ETAG, value);
144 }
145 response
146}
147
148async fn list_agents(State(st): State<AppState>) -> Json<Vec<A2aAgentEntry>> {
149 let mut agent_ids = st.os.agent_ids();
150 agent_ids.sort_unstable();
151 agent_ids.dedup();
152 let entries = agent_ids
153 .into_iter()
154 .map(|agent_id| A2aAgentEntry {
155 agent_card_url: format!("/v1/a2a/agents/{agent_id}/agent-card"),
156 message_send_url: format!("/v1/a2a/agents/{agent_id}/message:send"),
157 agent_id,
158 })
159 .collect::<Vec<_>>();
160 Json(entries)
161}
162
163#[derive(Debug, Serialize)]
164#[serde(rename_all = "camelCase")]
165struct A2aAgentEntry {
166 agent_id: String,
167 agent_card_url: String,
168 message_send_url: String,
169}
170
171#[derive(Debug, Serialize)]
172#[serde(rename_all = "camelCase")]
173struct A2aAgentCard {
174 name: String,
175 description: String,
176 version: String,
177 url: String,
178 default_input_modes: Vec<String>,
179 default_output_modes: Vec<String>,
180 capabilities: Value,
181}
182
183fn build_agent_card(agent_id: &str) -> A2aAgentCard {
184 A2aAgentCard {
185 name: format!("tirea-agent-{agent_id}"),
186 description: format!("A2A card for Tirea agent '{agent_id}'"),
187 version: "1.0".to_string(),
188 url: format!("/v1/a2a/agents/{agent_id}/message:send"),
189 default_input_modes: vec!["application/json".to_string()],
190 default_output_modes: vec!["application/json".to_string()],
191 capabilities: json!({
192 "taskManagement": true,
193 "streaming": true
194 }),
195 }
196}
197
198async fn get_agent_card(
199 State(st): State<AppState>,
200 Path(agent_id): Path<String>,
201) -> Result<Json<A2aAgentCard>, ApiError> {
202 st.os
203 .validate_agent(&agent_id)
204 .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
205 Ok(Json(build_agent_card(&agent_id)))
206}
207
208#[derive(Debug, Deserialize)]
209struct A2aMessage {
210 #[serde(default)]
211 role: Option<String>,
212 content: String,
213}
214
215#[derive(Debug, Deserialize)]
216struct A2aMessageSendPayload {
217 #[serde(rename = "contextId", alias = "context_id", default)]
218 context_id: Option<String>,
219 #[serde(rename = "taskId", alias = "task_id", default)]
220 task_id: Option<String>,
221 #[serde(default)]
222 message: Option<A2aMessage>,
223 #[serde(default)]
224 input: Option<String>,
225 #[serde(default)]
226 decisions: Vec<ToolCallDecision>,
227}
228
229fn latest_public_assistant_output(
230 thread: &tirea_agentos::contracts::thread::Thread,
231) -> Option<String> {
232 thread
233 .messages
234 .iter()
235 .rev()
236 .find(|message| message.visibility == Visibility::All && message.role == Role::Assistant)
237 .map(|message| message.content.trim().to_string())
238 .filter(|content| !content.is_empty())
239}
240
241fn public_history(thread: &tirea_agentos::contracts::thread::Thread) -> Vec<Value> {
242 thread
243 .messages
244 .iter()
245 .filter(|message| message.visibility == Visibility::All)
246 .filter_map(|message| match message.role {
247 Role::User | Role::Assistant | Role::System => Some(json!({
248 "role": match message.role {
249 Role::User => "user",
250 Role::Assistant => "assistant",
251 Role::System => "system",
252 Role::Tool => unreachable!("tool messages are filtered out"),
253 },
254 "content": message.content,
255 })),
256 Role::Tool => None,
257 })
258 .collect()
259}
260
261impl A2aMessageSendPayload {
262 fn to_messages(&self) -> Vec<Message> {
263 let mut out = Vec::new();
264 if let Some(input) = self.input.as_deref() {
265 let trimmed = input.trim();
266 if !trimmed.is_empty() {
267 out.push(Message::user(trimmed));
268 }
269 }
270 if let Some(message) = self.message.as_ref() {
271 let content = message.content.trim();
272 if !content.is_empty() {
273 let mapped = match message
274 .role
275 .as_deref()
276 .map(str::trim)
277 .map(str::to_ascii_lowercase)
278 {
279 Some(role) if role == "assistant" => Message::assistant(content),
280 Some(role) if role == "system" => Message::system(content),
281 _ => Message::user(content),
282 };
283 out.push(mapped);
284 }
285 }
286 out
287 }
288}
289
290async fn message_send(
291 State(st): State<AppState>,
292 Path((agent_id, action)): Path<(String, String)>,
293 Json(payload): Json<A2aMessageSendPayload>,
294) -> Result<Response, ApiError> {
295 if action != "message:send" {
296 return Err(ApiError::BadRequest(
297 "unsupported A2A action; expected 'message:send'".to_string(),
298 ));
299 }
300
301 st.os
302 .validate_agent(&agent_id)
303 .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
304
305 let task_id = normalize_optional_id(payload.task_id.clone());
306 let context_id = normalize_optional_id(payload.context_id.clone());
307 let messages = payload.to_messages();
308 let decisions = payload.decisions;
309
310 if task_id.is_none() && context_id.is_none() && messages.is_empty() && decisions.is_empty() {
311 return Err(ApiError::BadRequest(
312 "message send payload must include input/message/decisions/context/task".to_string(),
313 ));
314 }
315
316 if messages.is_empty() && !decisions.is_empty() {
317 let Some(task_id) = task_id.as_deref() else {
318 return Err(ApiError::BadRequest(
319 "task_id is required when forwarding decisions only".to_string(),
320 ));
321 };
322
323 let forwarded = try_forward_decisions_to_active_run_by_id(
324 &st.os,
325 st.read_store.as_ref(),
326 task_id,
327 decisions,
328 )
329 .await?;
330 return Ok((
331 StatusCode::ACCEPTED,
332 Json(json!({
333 "contextId": forwarded.thread_id,
334 "taskId": task_id,
335 "status": "decision_forwarded",
336 })),
337 )
338 .into_response());
339 }
340
341 let thread_id = if let Some(context_id) = context_id {
342 Some(context_id)
343 } else if let Some(task_id) = task_id.as_deref() {
344 resolve_thread_id_from_run(st.read_store.as_ref(), task_id).await?
345 } else {
346 None
347 };
348
349 if task_id.is_some() && thread_id.is_none() {
350 return Err(ApiError::RunNotFound(task_id.unwrap_or_default()));
351 }
352
353 let run_request = RunRequest {
354 agent_id: agent_id.clone(),
355 thread_id,
356 run_id: None,
357 parent_run_id: task_id,
358 parent_thread_id: None,
359 resource_id: None,
360 origin: RunOrigin::A2a,
361 state: None,
362 messages,
363 initial_decisions: decisions,
364 source_mailbox_entry_id: None,
365 };
366
367 let (context_id, _run_id, task_id) = start_background_run(
368 &st.mailbox_service,
369 &agent_id,
370 run_request,
371 EnqueueOptions::default(),
372 )
373 .await?;
374 Ok((
375 StatusCode::ACCEPTED,
376 Json(json!({
377 "contextId": context_id,
378 "taskId": task_id,
379 "status": "submitted",
380 })),
381 )
382 .into_response())
383}
384
385async fn get_task(
386 State(st): State<AppState>,
387 Path((agent_id, task_action)): Path<(String, String)>,
388) -> Result<Response, ApiError> {
389 st.os
390 .validate_agent(&agent_id)
391 .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
392 if task_action.ends_with(":cancel") {
393 return Err(ApiError::BadRequest(
394 "use POST for task cancellation".to_string(),
395 ));
396 }
397 let task_id = task_action.trim().to_string();
398 if task_id.is_empty() {
399 return Err(ApiError::BadRequest(
400 "task_id is required in task path".to_string(),
401 ));
402 }
403 let Some(task) = load_background_task(
404 st.read_store.as_ref(),
405 st.mailbox_store().as_ref(),
406 &task_id,
407 )
408 .await?
409 else {
410 return Err(ApiError::RunNotFound(task_id));
411 };
412 Ok(match task {
413 BackgroundTaskLookup::Run(record) => {
414 let thread = st
415 .read_store
416 .load(&record.thread_id)
417 .await
418 .map_err(|err| ApiError::Internal(err.to_string()))?;
419 let latest_output = thread
420 .as_ref()
421 .and_then(|head| latest_public_assistant_output(&head.thread));
422 let history = thread
423 .as_ref()
424 .map(|head| public_history(&head.thread))
425 .unwrap_or_default();
426 let artifacts = latest_output
427 .as_ref()
428 .map(|content| vec![json!({ "content": content })])
429 .unwrap_or_default();
430 let message = latest_output
431 .as_ref()
432 .map(|content| json!({ "role": "assistant", "content": content }));
433
434 Json(json!({
435 "taskId": task_id,
436 "contextId": record.thread_id,
437 "status": record.status,
438 "origin": record.origin,
439 "terminationCode": record.termination_code,
440 "terminationDetail": record.termination_detail,
441 "createdAt": record.created_at,
442 "updatedAt": record.updated_at,
443 "message": message,
444 "artifacts": artifacts,
445 "history": history,
446 }))
447 }
448 BackgroundTaskLookup::Mailbox(entry) => Json(json!({
449 "taskId": task_id,
450 "contextId": entry.mailbox_id,
451 "status": entry.status,
452 "createdAt": entry.created_at,
453 "updatedAt": entry.updated_at,
454 })),
455 }
456 .into_response())
457}
458
459async fn cancel_task(
460 State(st): State<AppState>,
461 Path((agent_id, task_action)): Path<(String, String)>,
462) -> Result<Response, ApiError> {
463 st.os
464 .validate_agent(&agent_id)
465 .map_err(|_| ApiError::AgentNotFound(agent_id.clone()))?;
466
467 let Some(task_id) = task_action.strip_suffix(":cancel") else {
468 return Err(ApiError::BadRequest(
469 "task cancel path must end with ':cancel'".to_string(),
470 ));
471 };
472 let task_id = task_id.trim().to_string();
473 if task_id.is_empty() {
474 return Err(ApiError::BadRequest(
475 "task_id is required in cancel path".to_string(),
476 ));
477 }
478
479 if try_cancel_active_or_queued_run_by_id(&st.os, st.mailbox_store(), &task_id)
480 .await?
481 .is_some()
482 {
483 return Ok((
484 StatusCode::ACCEPTED,
485 Json(json!({
486 "taskId": task_id,
487 "status": "cancel_requested",
488 })),
489 )
490 .into_response());
491 }
492
493 Err(
494 match load_background_task(
495 st.read_store.as_ref(),
496 st.mailbox_store().as_ref(),
497 &task_id,
498 )
499 .await?
500 {
501 Some(_) => ApiError::BadRequest("task is not active".to_string()),
502 None => match check_run_liveness(st.read_store.as_ref(), &task_id).await? {
503 RunLookup::ExistsButInactive => {
504 ApiError::BadRequest("task is not active".to_string())
505 }
506 RunLookup::NotFound => ApiError::RunNotFound(task_id),
507 },
508 },
509 )
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn well_known_etag_is_stable_and_changes_with_agent_set() {
518 let etag_a = build_well_known_etag(&["alpha".to_string()]);
519 let etag_b = build_well_known_etag(&["alpha".to_string()]);
520 let etag_c = build_well_known_etag(&["beta".to_string(), "alpha".to_string()]);
521 let etag_d = build_well_known_etag(&["alpha".to_string(), "beta".to_string()]);
522 assert_eq!(etag_a, etag_b);
523 assert_eq!(etag_c, etag_d);
524 assert_ne!(
525 etag_a, etag_d,
526 "adding an extra agent id should change ETag"
527 );
528 }
529
530 #[test]
531 fn if_none_match_matches_star_and_csv_values() {
532 let etag = "W/\"a2a-agents-deadbeef\"";
533
534 let mut headers = HeaderMap::new();
535 headers.insert(IF_NONE_MATCH, HeaderValue::from_static("*"));
536 assert!(if_none_match_matches(&headers, etag));
537
538 let mut headers = HeaderMap::new();
539 headers.insert(
540 IF_NONE_MATCH,
541 HeaderValue::from_str(&format!("\"other\", {etag}")).expect("valid header"),
542 );
543 assert!(if_none_match_matches(&headers, etag));
544
545 let mut headers = HeaderMap::new();
546 headers.insert(IF_NONE_MATCH, HeaderValue::from_static("\"other\""));
547 assert!(!if_none_match_matches(&headers, etag));
548 }
549}