1use async_trait::async_trait;
2#[cfg(feature = "postgres")]
3use sqlx::{Postgres, QueryBuilder};
4use tirea_contract::storage::{
5 paginate_mailbox_entries, Committed, MailboxEntry, MailboxEntryOrigin, MailboxInterrupt,
6 MailboxPage, MailboxQuery, MailboxReader, MailboxState, MailboxStoreError, MailboxWriter,
7 MessagePage, MessageQuery, MessageWithCursor, RunOrigin, RunPage, RunQuery, RunReader,
8 RunRecord, RunStatus, RunStoreError, RunWriter, SortOrder, ThreadHead, ThreadListPage,
9 ThreadListQuery, ThreadReader, ThreadStoreError, ThreadWriter, VersionPrecondition,
10};
11use tirea_contract::{Message, Thread, ThreadChangeSet, Visibility};
12
13pub struct PostgresStore {
14 pool: sqlx::PgPool,
15 table: String,
16 messages_table: String,
17 runs_table: String,
18 mailbox_table: String,
19 mailbox_threads_table: String,
20 schema_ready: tokio::sync::Mutex<bool>,
21}
22
23#[cfg(feature = "postgres")]
24impl PostgresStore {
25 pub fn new(pool: sqlx::PgPool) -> Self {
30 Self {
31 pool,
32 table: "agent_sessions".to_string(),
33 messages_table: "agent_messages".to_string(),
34 runs_table: "agent_runs".to_string(),
35 mailbox_table: "agent_mailbox".to_string(),
36 mailbox_threads_table: "agent_mailbox_threads".to_string(),
37 schema_ready: tokio::sync::Mutex::new(false),
38 }
39 }
40
41 pub fn with_table(pool: sqlx::PgPool, table: impl Into<String>) -> Self {
45 let table = table.into();
46 let messages_table = format!("{}_messages", table);
47 let runs_table = format!("{}_runs", table);
48 let mailbox_table = format!("{}_mailbox", table);
49 let mailbox_threads_table = format!("{}_mailbox_threads", table);
50 Self {
51 pool,
52 table,
53 messages_table,
54 runs_table,
55 mailbox_table,
56 mailbox_threads_table,
57 schema_ready: tokio::sync::Mutex::new(false),
58 }
59 }
60
61 fn schema_statements(&self) -> Vec<String> {
62 vec![
63 format!(
64 "CREATE TABLE IF NOT EXISTS {} (id TEXT PRIMARY KEY, data JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now())",
65 self.table
66 ),
67 format!(
68 "CREATE TABLE IF NOT EXISTS {} (seq BIGSERIAL PRIMARY KEY, session_id TEXT NOT NULL REFERENCES {}(id) ON DELETE CASCADE, message_id TEXT, run_id TEXT, step_index INTEGER, data JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now())",
69 self.messages_table, self.table
70 ),
71 format!(
72 "CREATE INDEX IF NOT EXISTS idx_{}_session_seq ON {} (session_id, seq)",
73 self.messages_table, self.messages_table
74 ),
75 format!(
76 "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_message_id ON {} (message_id) WHERE message_id IS NOT NULL",
77 self.messages_table, self.messages_table
78 ),
79 format!(
80 "CREATE INDEX IF NOT EXISTS idx_{}_session_run ON {} (session_id, run_id) WHERE run_id IS NOT NULL",
81 self.messages_table, self.messages_table
82 ),
83 format!(
84 "CREATE INDEX IF NOT EXISTS idx_{}_resource_id ON {} ((data->>'resource_id')) WHERE data ? 'resource_id'",
85 self.table, self.table
86 ),
87 format!(
88 "CREATE INDEX IF NOT EXISTS idx_{}_parent_thread_id ON {} ((data->>'parent_thread_id')) WHERE data ? 'parent_thread_id'",
89 self.table, self.table
90 ),
91 format!(
92 "CREATE TABLE IF NOT EXISTS {} (run_id TEXT PRIMARY KEY, thread_id TEXT NOT NULL, agent_id TEXT NOT NULL DEFAULT '', parent_run_id TEXT, parent_thread_id TEXT, origin TEXT NOT NULL, status TEXT NOT NULL, termination_code TEXT, termination_detail TEXT, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL, source_mailbox_entry_id TEXT, metadata JSONB)",
93 self.runs_table
94 ),
95 format!(
96 "CREATE INDEX IF NOT EXISTS idx_{}_thread_id ON {} (thread_id)",
97 self.runs_table, self.runs_table
98 ),
99 format!(
100 "CREATE INDEX IF NOT EXISTS idx_{}_thread_active ON {} (thread_id, created_at DESC) WHERE status != 'done'",
101 self.runs_table, self.runs_table
102 ),
103 format!(
104 "CREATE INDEX IF NOT EXISTS idx_{}_parent_run_id ON {} (parent_run_id) WHERE parent_run_id IS NOT NULL",
105 self.runs_table, self.runs_table
106 ),
107 format!(
108 "CREATE INDEX IF NOT EXISTS idx_{}_status ON {} (status)",
109 self.runs_table, self.runs_table
110 ),
111 format!(
112 "CREATE INDEX IF NOT EXISTS idx_{}_termination_code ON {} (termination_code) WHERE termination_code IS NOT NULL",
113 self.runs_table, self.runs_table
114 ),
115 format!(
116 "CREATE INDEX IF NOT EXISTS idx_{}_origin ON {} (origin)",
117 self.runs_table, self.runs_table
118 ),
119 format!(
120 "CREATE INDEX IF NOT EXISTS idx_{}_created_at ON {} (created_at, run_id)",
121 self.runs_table, self.runs_table
122 ),
123 format!(
124 "CREATE TABLE IF NOT EXISTS {} (entry_id TEXT PRIMARY KEY, mailbox_id TEXT NOT NULL, origin TEXT NOT NULL DEFAULT 'external', sender_id TEXT, payload JSONB NOT NULL, priority SMALLINT NOT NULL DEFAULT 0, dedupe_key TEXT, generation BIGINT NOT NULL DEFAULT 0, status TEXT NOT NULL, available_at BIGINT NOT NULL, attempt_count INTEGER NOT NULL DEFAULT 0, last_error TEXT, claim_token TEXT, claimed_by TEXT, lease_until BIGINT, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL)",
125 self.mailbox_table
126 ),
127 format!(
128 "CREATE TABLE IF NOT EXISTS {} (mailbox_id TEXT PRIMARY KEY, current_generation BIGINT NOT NULL DEFAULT 0, updated_at BIGINT NOT NULL)",
129 self.mailbox_threads_table
130 ),
131 format!(
132 "CREATE INDEX IF NOT EXISTS idx_{}_status_available ON {} (status, available_at, created_at)",
133 self.mailbox_table, self.mailbox_table
134 ),
135 format!(
136 "CREATE INDEX IF NOT EXISTS idx_{}_mailbox_status ON {} (mailbox_id, status, created_at)",
137 self.mailbox_table, self.mailbox_table
138 ),
139 format!(
140 "CREATE INDEX IF NOT EXISTS idx_{}_mailbox_origin_status ON {} (mailbox_id, origin, status, created_at)",
141 self.mailbox_table, self.mailbox_table
142 ),
143 format!(
144 "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_mailbox_dedupe ON {} (mailbox_id, dedupe_key) WHERE dedupe_key IS NOT NULL",
145 self.mailbox_table, self.mailbox_table
146 ),
147 format!(
149 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS agent_id TEXT NOT NULL DEFAULT ''",
150 self.runs_table
151 ),
152 format!(
153 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS origin TEXT NOT NULL DEFAULT 'external'",
154 self.mailbox_table
155 ),
156 format!(
158 "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_thread_active_unique ON {} (thread_id) WHERE status != 'done'",
159 self.runs_table, self.runs_table
160 ),
161 ]
162 }
163
164 async fn ensure_schema_ready(&self) -> Result<(), sqlx::Error> {
165 let mut schema_ready = self.schema_ready.lock().await;
166 if *schema_ready {
167 return Ok(());
168 }
169
170 for sql in self.schema_statements() {
172 sqlx::query(&sql).execute(&self.pool).await?;
173 }
174
175 *schema_ready = true;
176 Ok(())
177 }
178
179 async fn ensure_thread_schema_ready(&self) -> Result<(), ThreadStoreError> {
180 self.ensure_schema_ready().await.map_err(Self::sql_err)
181 }
182
183 async fn ensure_run_schema_ready(&self) -> Result<(), RunStoreError> {
184 self.ensure_schema_ready().await.map_err(Self::run_sql_err)
185 }
186
187 pub async fn ensure_table(&self) -> Result<(), ThreadStoreError> {
192 self.ensure_thread_schema_ready().await?;
193 Ok(())
194 }
195
196 fn sql_err(e: sqlx::Error) -> ThreadStoreError {
197 ThreadStoreError::Io(std::io::Error::other(e.to_string()))
198 }
199
200 fn run_sql_err(e: sqlx::Error) -> RunStoreError {
201 RunStoreError::Backend(e.to_string())
202 }
203
204 fn mailbox_sql_err(e: sqlx::Error) -> MailboxStoreError {
205 MailboxStoreError::Backend(e.to_string())
206 }
207
208 fn encode_origin(origin: RunOrigin) -> &'static str {
209 match origin {
210 RunOrigin::User => "user",
211 RunOrigin::Subagent => "subagent",
212 RunOrigin::AgUi => "ag_ui",
213 RunOrigin::AiSdk => "ai_sdk",
214 RunOrigin::A2a => "a2a",
215 RunOrigin::Internal => "internal",
216 }
217 }
218
219 fn decode_origin(raw: &str) -> Result<RunOrigin, RunStoreError> {
220 match raw {
221 "user" => Ok(RunOrigin::User),
222 "subagent" => Ok(RunOrigin::Subagent),
223 "ag_ui" => Ok(RunOrigin::AgUi),
224 "ai_sdk" => Ok(RunOrigin::AiSdk),
225 "a2a" => Ok(RunOrigin::A2a),
226 "internal" => Ok(RunOrigin::Internal),
227 _ => Err(RunStoreError::Backend(format!(
228 "invalid run origin value: {raw}"
229 ))),
230 }
231 }
232
233 fn encode_mailbox_origin(origin: MailboxEntryOrigin) -> &'static str {
234 match origin {
235 MailboxEntryOrigin::External => "external",
236 MailboxEntryOrigin::Internal => "internal",
237 }
238 }
239
240 fn decode_mailbox_origin(raw: &str) -> Result<MailboxEntryOrigin, MailboxStoreError> {
241 match raw {
242 "external" => Ok(MailboxEntryOrigin::External),
243 "internal" => Ok(MailboxEntryOrigin::Internal),
244 _ => Err(MailboxStoreError::Backend(format!(
245 "invalid mailbox origin value: {raw}"
246 ))),
247 }
248 }
249
250 fn encode_status(status: RunStatus) -> &'static str {
251 match status {
252 RunStatus::Running => "running",
253 RunStatus::Waiting => "waiting",
254 RunStatus::Done => "done",
255 }
256 }
257
258 fn decode_status(raw: &str) -> Result<RunStatus, RunStoreError> {
259 match raw {
260 "running" => Ok(RunStatus::Running),
261 "waiting" => Ok(RunStatus::Waiting),
262 "done" => Ok(RunStatus::Done),
263 "submitted" | "working" => Ok(RunStatus::Running),
265 "input_required" | "auth_required" => Ok(RunStatus::Waiting),
266 "completed" | "failed" | "canceled" | "cancelled" | "rejected" => Ok(RunStatus::Done),
267 _ => Err(RunStoreError::Backend(format!(
268 "invalid run status value: {raw}"
269 ))),
270 }
271 }
272
273 fn encode_mailbox_status(status: tirea_contract::MailboxEntryStatus) -> &'static str {
274 match status {
275 tirea_contract::MailboxEntryStatus::Queued => "queued",
276 tirea_contract::MailboxEntryStatus::Claimed => "claimed",
277 tirea_contract::MailboxEntryStatus::Accepted => "accepted",
278 tirea_contract::MailboxEntryStatus::Superseded => "superseded",
279 tirea_contract::MailboxEntryStatus::Cancelled => "cancelled",
280 tirea_contract::MailboxEntryStatus::DeadLetter => "dead_letter",
281 }
282 }
283
284 fn decode_mailbox_status(
285 raw: &str,
286 ) -> Result<tirea_contract::MailboxEntryStatus, MailboxStoreError> {
287 match raw {
288 "queued" => Ok(tirea_contract::MailboxEntryStatus::Queued),
289 "claimed" => Ok(tirea_contract::MailboxEntryStatus::Claimed),
290 "accepted" => Ok(tirea_contract::MailboxEntryStatus::Accepted),
291 "superseded" => Ok(tirea_contract::MailboxEntryStatus::Superseded),
292 "cancelled" | "canceled" => Ok(tirea_contract::MailboxEntryStatus::Cancelled),
293 "dead_letter" => Ok(tirea_contract::MailboxEntryStatus::DeadLetter),
294 _ => Err(MailboxStoreError::Backend(format!(
295 "invalid mailbox status value: {raw}"
296 ))),
297 }
298 }
299
300 fn to_db_timestamp(value: u64, field: &str) -> Result<i64, RunStoreError> {
301 i64::try_from(value).map_err(|_| {
302 RunStoreError::Backend(format!("{field} is too large for postgres BIGINT: {value}"))
303 })
304 }
305
306 fn from_db_timestamp(value: i64, field: &str) -> Result<u64, RunStoreError> {
307 u64::try_from(value).map_err(|_| {
308 RunStoreError::Backend(format!(
309 "{field} cannot be negative in postgres BIGINT: {value}"
310 ))
311 })
312 }
313}
314
315#[cfg(feature = "postgres")]
316#[async_trait]
317impl MailboxReader for PostgresStore {
318 async fn load_mailbox_entry(
319 &self,
320 entry_id: &str,
321 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
322 let sql = format!(
323 "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
324 generation, status, available_at, attempt_count, last_error, claim_token, \
325 claimed_by, lease_until, created_at, updated_at FROM {} WHERE entry_id = $1",
326 self.mailbox_table
327 );
328 let row = sqlx::query_as::<_, MailboxRow>(&sql)
329 .bind(entry_id)
330 .fetch_optional(&self.pool)
331 .await
332 .map_err(Self::mailbox_sql_err)?;
333 row.map(Self::mailbox_entry_from_row).transpose()
334 }
335
336 async fn load_mailbox_state(
337 &self,
338 mailbox_id: &str,
339 ) -> Result<Option<MailboxState>, MailboxStoreError> {
340 let row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
341 "SELECT mailbox_id, current_generation, updated_at FROM {} WHERE mailbox_id = $1",
342 self.mailbox_threads_table
343 ))
344 .bind(mailbox_id)
345 .fetch_optional(&self.pool)
346 .await
347 .map_err(Self::mailbox_sql_err)?;
348
349 row.map(|(mailbox_id, current_generation, updated_at)| {
350 Ok(MailboxState {
351 mailbox_id,
352 current_generation: u64::try_from(current_generation).map_err(|_| {
353 MailboxStoreError::Backend(format!(
354 "current_generation cannot be negative in postgres BIGINT: {current_generation}"
355 ))
356 })?,
357 updated_at: u64::try_from(updated_at).map_err(|_| {
358 MailboxStoreError::Backend(format!(
359 "updated_at cannot be negative in postgres BIGINT: {updated_at}"
360 ))
361 })?,
362 })
363 })
364 .transpose()
365 }
366
367 async fn list_mailbox_entries(
368 &self,
369 query: &MailboxQuery,
370 ) -> Result<MailboxPage, MailboxStoreError> {
371 let mut qb = QueryBuilder::<Postgres>::new(format!(
372 "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
373 generation, status, available_at, attempt_count, last_error, claim_token, \
374 claimed_by, lease_until, created_at, updated_at FROM {}",
375 self.mailbox_table
376 ));
377
378 let mut has_where = false;
379 if let Some(mailbox_id) = query.mailbox_id.as_deref() {
380 qb.push(if has_where { " AND " } else { " WHERE " });
381 has_where = true;
382 qb.push("mailbox_id = ").push_bind(mailbox_id);
383 }
384 if let Some(origin) = query.origin {
385 qb.push(if has_where { " AND " } else { " WHERE " });
386 has_where = true;
387 qb.push("origin = ")
388 .push_bind(Self::encode_mailbox_origin(origin));
389 }
390 if let Some(status) = query.status {
391 qb.push(if has_where { " AND " } else { " WHERE " });
392 qb.push("status = ")
393 .push_bind(Self::encode_mailbox_status(status));
394 }
395 qb.push(" ORDER BY created_at ASC, entry_id ASC");
396
397 let rows = qb
398 .build_query_as::<MailboxRow>()
399 .fetch_all(&self.pool)
400 .await
401 .map_err(Self::mailbox_sql_err)?;
402
403 let mut entries = Vec::with_capacity(rows.len());
404 for row in rows {
405 entries.push(Self::mailbox_entry_from_row(row)?);
406 }
407 Ok(paginate_mailbox_entries(&entries, query))
408 }
409}
410
411#[cfg(feature = "postgres")]
412#[async_trait]
413impl MailboxWriter for PostgresStore {
414 async fn enqueue_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
415 let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
416 let now_i64 = i64::try_from(entry.updated_at).map_err(|_| {
417 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
418 })?;
419
420 let state_row = sqlx::query_as::<_, (i64,)>(&format!(
421 "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 0, $2) \
422 ON CONFLICT (mailbox_id) DO UPDATE SET updated_at = EXCLUDED.updated_at \
423 RETURNING current_generation",
424 self.mailbox_threads_table
425 ))
426 .bind(&entry.mailbox_id)
427 .bind(now_i64)
428 .fetch_one(&mut *tx)
429 .await
430 .map_err(Self::mailbox_sql_err)?;
431
432 let current_generation = u64::try_from(state_row.0).map_err(|_| {
433 MailboxStoreError::Backend(format!(
434 "current_generation cannot be negative: {}",
435 state_row.0
436 ))
437 })?;
438 if current_generation != entry.generation {
439 return Err(MailboxStoreError::GenerationMismatch {
440 mailbox_id: entry.mailbox_id.clone(),
441 expected: current_generation,
442 actual: entry.generation,
443 });
444 }
445
446 sqlx::query(&format!(
447 "INSERT INTO {} (entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
448 generation, status, available_at, attempt_count, last_error, claim_token, \
449 claimed_by, lease_until, created_at, updated_at) \
450 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
451 self.mailbox_table
452 ))
453 .bind(&entry.entry_id)
454 .bind(&entry.mailbox_id)
455 .bind(Self::encode_mailbox_origin(entry.origin))
456 .bind(entry.sender_id.as_deref())
457 .bind(&entry.payload)
458 .bind(i16::from(entry.priority))
459 .bind(entry.dedupe_key.as_deref())
460 .bind(i64::try_from(entry.generation).map_err(|_| {
461 MailboxStoreError::Backend("generation too large for postgres BIGINT".to_string())
462 })?)
463 .bind(Self::encode_mailbox_status(entry.status))
464 .bind(i64::try_from(entry.available_at).map_err(|_| {
465 MailboxStoreError::Backend("available_at too large for postgres BIGINT".to_string())
466 })?)
467 .bind(i32::try_from(entry.attempt_count).map_err(|_| {
468 MailboxStoreError::Backend(
469 "attempt_count too large for postgres INTEGER".to_string(),
470 )
471 })?)
472 .bind(entry.last_error.as_deref())
473 .bind(entry.claim_token.as_deref())
474 .bind(entry.claimed_by.as_deref())
475 .bind(
476 entry
477 .lease_until
478 .map(i64::try_from)
479 .transpose()
480 .map_err(|_| {
481 MailboxStoreError::Backend(
482 "lease_until too large for postgres BIGINT".to_string(),
483 )
484 })?,
485 )
486 .bind(i64::try_from(entry.created_at).map_err(|_| {
487 MailboxStoreError::Backend("created_at too large for postgres BIGINT".to_string())
488 })?)
489 .bind(now_i64)
490 .execute(&mut *tx)
491 .await
492 .map_err(|error| {
493 let message = error.to_string();
494 if message.contains("duplicate key") || message.contains("unique constraint") {
495 MailboxStoreError::AlreadyExists(entry.entry_id.clone())
496 } else {
497 Self::mailbox_sql_err(error)
498 }
499 })?;
500
501 tx.commit().await.map_err(Self::mailbox_sql_err)?;
502 Ok(())
503 }
504
505 async fn ensure_mailbox_state(
506 &self,
507 mailbox_id: &str,
508 now: u64,
509 ) -> Result<MailboxState, MailboxStoreError> {
510 let now_i64 = i64::try_from(now).map_err(|_| {
511 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
512 })?;
513 let row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
514 "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 0, $2) \
515 ON CONFLICT (mailbox_id) DO UPDATE SET updated_at = EXCLUDED.updated_at \
516 RETURNING mailbox_id, current_generation, updated_at",
517 self.mailbox_threads_table
518 ))
519 .bind(mailbox_id)
520 .bind(now_i64)
521 .fetch_one(&self.pool)
522 .await
523 .map_err(Self::mailbox_sql_err)?;
524 Ok(MailboxState {
525 mailbox_id: row.0,
526 current_generation: u64::try_from(row.1).map_err(|_| {
527 MailboxStoreError::Backend(format!(
528 "current_generation cannot be negative in postgres BIGINT: {}",
529 row.1
530 ))
531 })?,
532 updated_at: u64::try_from(row.2).map_err(|_| {
533 MailboxStoreError::Backend(format!(
534 "updated_at cannot be negative in postgres BIGINT: {}",
535 row.2
536 ))
537 })?,
538 })
539 }
540
541 async fn claim_mailbox_entries(
542 &self,
543 mailbox_id: Option<&str>,
544 limit: usize,
545 consumer_id: &str,
546 now: u64,
547 lease_duration_ms: u64,
548 ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
549 let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
550 let limit = i64::try_from(limit)
551 .map_err(|_| MailboxStoreError::Backend("claim limit too large".to_string()))?;
552 let now_i64 = i64::try_from(now).map_err(|_| {
553 MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
554 })?;
555 let exclusive_filter = format!(
558 "AND NOT EXISTS (\
559 SELECT 1 FROM {} e2 \
560 WHERE e2.mailbox_id = e1.mailbox_id \
561 AND e2.status = 'claimed' \
562 AND e2.lease_until IS NOT NULL \
563 AND e2.lease_until > $1 \
564 AND e2.entry_id != e1.entry_id\
565 )",
566 self.mailbox_table
567 );
568 let rows: Vec<MailboxRow> = if let Some(mailbox_id) = mailbox_id {
569 sqlx::query_as(&format!(
570 "SELECT e1.entry_id, e1.mailbox_id, e1.origin, e1.sender_id, e1.payload, \
571 e1.priority, e1.dedupe_key, e1.generation, e1.status, e1.available_at, \
572 e1.attempt_count, e1.last_error, e1.claim_token, e1.claimed_by, e1.lease_until, \
573 e1.created_at, e1.updated_at FROM {} e1 \
574 WHERE e1.mailbox_id = $3 \
575 AND ((e1.status = 'queued' AND e1.available_at <= $1) \
576 OR (e1.status = 'claimed' AND e1.lease_until IS NOT NULL AND e1.lease_until <= $1)) \
577 {exclusive_filter} \
578 ORDER BY e1.priority DESC, e1.available_at ASC, e1.created_at ASC, e1.entry_id ASC \
579 FOR UPDATE SKIP LOCKED LIMIT $2",
580 self.mailbox_table
581 ))
582 .bind(now_i64)
583 .bind(limit)
584 .bind(mailbox_id)
585 .fetch_all(&mut *tx)
586 .await
587 .map_err(Self::mailbox_sql_err)?
588 } else {
589 sqlx::query_as(&format!(
590 "SELECT e1.entry_id, e1.mailbox_id, e1.origin, e1.sender_id, e1.payload, \
591 e1.priority, e1.dedupe_key, e1.generation, e1.status, e1.available_at, \
592 e1.attempt_count, e1.last_error, e1.claim_token, e1.claimed_by, e1.lease_until, \
593 e1.created_at, e1.updated_at FROM {} e1 \
594 WHERE (e1.status = 'queued' AND e1.available_at <= $1) \
595 OR (e1.status = 'claimed' AND e1.lease_until IS NOT NULL AND e1.lease_until <= $1) \
596 {exclusive_filter} \
597 ORDER BY e1.priority DESC, e1.available_at ASC, e1.created_at ASC, e1.entry_id ASC \
598 FOR UPDATE SKIP LOCKED LIMIT $2",
599 self.mailbox_table
600 ))
601 .bind(now_i64)
602 .bind(limit)
603 .fetch_all(&mut *tx)
604 .await
605 .map_err(Self::mailbox_sql_err)?
606 };
607
608 let mut seen_mailbox_ids = std::collections::HashSet::new();
610 let mut claimed = Vec::with_capacity(rows.len());
611 for row in rows {
612 let mut entry = Self::mailbox_entry_from_row(row)?;
613 if !seen_mailbox_ids.insert(entry.mailbox_id.clone()) {
614 continue;
615 }
616 let claim_token = uuid::Uuid::now_v7().simple().to_string();
617 let lease_until = now.saturating_add(lease_duration_ms);
618 sqlx::query(&format!(
619 "UPDATE {} SET status = $1, claim_token = $2, claimed_by = $3, lease_until = $4, \
620 attempt_count = attempt_count + 1, updated_at = $5 WHERE entry_id = $6",
621 self.mailbox_table
622 ))
623 .bind(Self::encode_mailbox_status(
624 tirea_contract::MailboxEntryStatus::Claimed,
625 ))
626 .bind(&claim_token)
627 .bind(consumer_id)
628 .bind(i64::try_from(lease_until).map_err(|_| {
629 MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
630 })?)
631 .bind(now_i64)
632 .bind(&entry.entry_id)
633 .execute(&mut *tx)
634 .await
635 .map_err(Self::mailbox_sql_err)?;
636 entry.status = tirea_contract::MailboxEntryStatus::Claimed;
637 entry.claim_token = Some(claim_token);
638 entry.claimed_by = Some(consumer_id.to_string());
639 entry.lease_until = Some(lease_until);
640 entry.attempt_count = entry.attempt_count.saturating_add(1);
641 entry.updated_at = now;
642 claimed.push(entry);
643 }
644 tx.commit().await.map_err(Self::mailbox_sql_err)?;
645 Ok(claimed)
646 }
647
648 async fn claim_mailbox_entry(
649 &self,
650 entry_id: &str,
651 consumer_id: &str,
652 now: u64,
653 lease_duration_ms: u64,
654 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
655 let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
656 let now_i64 = i64::try_from(now).map_err(|_| {
657 MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
658 })?;
659 let row = sqlx::query_as::<_, MailboxRow>(&format!(
662 "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
663 generation, status, available_at, attempt_count, last_error, claim_token, \
664 claimed_by, lease_until, created_at, updated_at FROM {} \
665 WHERE entry_id = $1 \
666 AND (status = 'queued' OR (status = 'claimed' AND lease_until IS NOT NULL AND lease_until <= $2)) \
667 AND NOT EXISTS (\
668 SELECT 1 FROM {} e2 \
669 WHERE e2.mailbox_id = (SELECT mailbox_id FROM {} WHERE entry_id = $1) \
670 AND e2.status = 'claimed' \
671 AND e2.lease_until IS NOT NULL \
672 AND e2.lease_until > $2 \
673 AND e2.entry_id != $1\
674 ) \
675 FOR UPDATE SKIP LOCKED",
676 self.mailbox_table, self.mailbox_table, self.mailbox_table
677 ))
678 .bind(entry_id)
679 .bind(now_i64)
680 .fetch_optional(&mut *tx)
681 .await
682 .map_err(Self::mailbox_sql_err)?;
683
684 let Some(row) = row else {
685 tx.commit().await.map_err(Self::mailbox_sql_err)?;
686 return Ok(None);
687 };
688
689 let mut entry = Self::mailbox_entry_from_row(row)?;
690 let claim_token = uuid::Uuid::now_v7().simple().to_string();
691 let lease_until = now.saturating_add(lease_duration_ms);
692 sqlx::query(&format!(
693 "UPDATE {} SET status = $1, claim_token = $2, claimed_by = $3, lease_until = $4, \
694 attempt_count = attempt_count + 1, updated_at = $5 WHERE entry_id = $6",
695 self.mailbox_table
696 ))
697 .bind(Self::encode_mailbox_status(
698 tirea_contract::MailboxEntryStatus::Claimed,
699 ))
700 .bind(&claim_token)
701 .bind(consumer_id)
702 .bind(i64::try_from(lease_until).map_err(|_| {
703 MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
704 })?)
705 .bind(now_i64)
706 .bind(&entry.entry_id)
707 .execute(&mut *tx)
708 .await
709 .map_err(Self::mailbox_sql_err)?;
710 tx.commit().await.map_err(Self::mailbox_sql_err)?;
711
712 entry.status = tirea_contract::MailboxEntryStatus::Claimed;
713 entry.claim_token = Some(claim_token);
714 entry.claimed_by = Some(consumer_id.to_string());
715 entry.lease_until = Some(lease_until);
716 entry.attempt_count = entry.attempt_count.saturating_add(1);
717 entry.updated_at = now;
718 Ok(Some(entry))
719 }
720
721 async fn ack_mailbox_entry(
722 &self,
723 entry_id: &str,
724 claim_token: &str,
725 now: u64,
726 ) -> Result<(), MailboxStoreError> {
727 let updated = sqlx::query(&format!(
728 "UPDATE {} SET status = $1, claim_token = NULL, claimed_by = NULL, \
729 lease_until = NULL, updated_at = $2 WHERE entry_id = $3 AND claim_token = $4",
730 self.mailbox_table
731 ))
732 .bind(Self::encode_mailbox_status(
733 tirea_contract::MailboxEntryStatus::Accepted,
734 ))
735 .bind(i64::try_from(now).map_err(|_| {
736 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
737 })?)
738 .bind(entry_id)
739 .bind(claim_token)
740 .execute(&self.pool)
741 .await
742 .map_err(Self::mailbox_sql_err)?;
743 if updated.rows_affected() == 0 {
744 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
745 }
746 Ok(())
747 }
748
749 async fn nack_mailbox_entry(
750 &self,
751 entry_id: &str,
752 claim_token: &str,
753 retry_at: u64,
754 error: &str,
755 now: u64,
756 ) -> Result<(), MailboxStoreError> {
757 let updated = sqlx::query(&format!(
758 "UPDATE {} SET status = $1, available_at = $2, last_error = $3, claim_token = NULL, \
759 claimed_by = NULL, lease_until = NULL, updated_at = $4 WHERE entry_id = $5 AND claim_token = $6",
760 self.mailbox_table
761 ))
762 .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Queued))
763 .bind(i64::try_from(retry_at).map_err(|_| {
764 MailboxStoreError::Backend("retry_at too large for postgres BIGINT".to_string())
765 })?)
766 .bind(error)
767 .bind(i64::try_from(now).map_err(|_| {
768 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
769 })?)
770 .bind(entry_id)
771 .bind(claim_token)
772 .execute(&self.pool)
773 .await
774 .map_err(Self::mailbox_sql_err)?;
775 if updated.rows_affected() == 0 {
776 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
777 }
778 Ok(())
779 }
780
781 async fn dead_letter_mailbox_entry(
782 &self,
783 entry_id: &str,
784 claim_token: &str,
785 error: &str,
786 now: u64,
787 ) -> Result<(), MailboxStoreError> {
788 let updated = sqlx::query(&format!(
789 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
790 lease_until = NULL, updated_at = $3 WHERE entry_id = $4 AND claim_token = $5",
791 self.mailbox_table
792 ))
793 .bind(Self::encode_mailbox_status(
794 tirea_contract::MailboxEntryStatus::DeadLetter,
795 ))
796 .bind(error)
797 .bind(i64::try_from(now).map_err(|_| {
798 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
799 })?)
800 .bind(entry_id)
801 .bind(claim_token)
802 .execute(&self.pool)
803 .await
804 .map_err(Self::mailbox_sql_err)?;
805 if updated.rows_affected() == 0 {
806 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
807 }
808 Ok(())
809 }
810
811 async fn cancel_mailbox_entry(
812 &self,
813 entry_id: &str,
814 now: u64,
815 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
816 let updated = sqlx::query(&format!(
817 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
818 lease_until = NULL, updated_at = $3 \
819 WHERE entry_id = $4 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter')",
820 self.mailbox_table
821 ))
822 .bind(Self::encode_mailbox_status(
823 tirea_contract::MailboxEntryStatus::Cancelled,
824 ))
825 .bind("cancelled")
826 .bind(i64::try_from(now).map_err(|_| {
827 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
828 })?)
829 .bind(entry_id)
830 .execute(&self.pool)
831 .await
832 .map_err(Self::mailbox_sql_err)?;
833 if updated.rows_affected() == 0 {
834 return self.load_mailbox_entry(entry_id).await;
835 }
836 self.load_mailbox_entry(entry_id).await
837 }
838
839 async fn supersede_mailbox_entry(
840 &self,
841 entry_id: &str,
842 now: u64,
843 reason: &str,
844 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
845 let updated = sqlx::query(&format!(
846 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
847 lease_until = NULL, updated_at = $3 \
848 WHERE entry_id = $4 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter')",
849 self.mailbox_table
850 ))
851 .bind(Self::encode_mailbox_status(
852 tirea_contract::MailboxEntryStatus::Superseded,
853 ))
854 .bind(reason)
855 .bind(i64::try_from(now).map_err(|_| {
856 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
857 })?)
858 .bind(entry_id)
859 .execute(&self.pool)
860 .await
861 .map_err(Self::mailbox_sql_err)?;
862 if updated.rows_affected() == 0 {
863 return self.load_mailbox_entry(entry_id).await;
864 }
865 self.load_mailbox_entry(entry_id).await
866 }
867
868 async fn cancel_pending_for_mailbox(
869 &self,
870 mailbox_id: &str,
871 now: u64,
872 exclude_entry_id: Option<&str>,
873 ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
874 let now_i64 = i64::try_from(now).map_err(|_| {
875 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
876 })?;
877 let returning_cols =
878 "entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
879 generation, status, available_at, attempt_count, last_error, claim_token, \
880 claimed_by, lease_until, created_at, updated_at";
881
882 let rows: Vec<MailboxRow> = match exclude_entry_id {
883 Some(entry_id) => {
884 sqlx::query_as(&format!(
885 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
886 lease_until = NULL, updated_at = $3 WHERE mailbox_id = $4 AND entry_id != $5 \
887 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
888 RETURNING {returning_cols}",
889 self.mailbox_table
890 ))
891 .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Cancelled))
892 .bind("cancelled")
893 .bind(now_i64)
894 .bind(mailbox_id)
895 .bind(entry_id)
896 .fetch_all(&self.pool)
897 .await
898 .map_err(Self::mailbox_sql_err)?
899 }
900 None => {
901 sqlx::query_as(&format!(
902 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
903 lease_until = NULL, updated_at = $3 WHERE mailbox_id = $4 \
904 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
905 RETURNING {returning_cols}",
906 self.mailbox_table
907 ))
908 .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Cancelled))
909 .bind("cancelled")
910 .bind(now_i64)
911 .bind(mailbox_id)
912 .fetch_all(&self.pool)
913 .await
914 .map_err(Self::mailbox_sql_err)?
915 }
916 };
917
918 let mut entries = Vec::with_capacity(rows.len());
919 for row in rows {
920 entries.push(Self::mailbox_entry_from_row(row)?);
921 }
922 Ok(entries)
923 }
924
925 async fn interrupt_mailbox(
926 &self,
927 mailbox_id: &str,
928 now: u64,
929 ) -> Result<MailboxInterrupt, MailboxStoreError> {
930 let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
931 let now_i64 = i64::try_from(now).map_err(|_| {
932 MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
933 })?;
934 let state_row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
935 "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 1, $2) \
936 ON CONFLICT (mailbox_id) DO UPDATE SET current_generation = {}.current_generation + 1, updated_at = EXCLUDED.updated_at \
937 RETURNING mailbox_id, current_generation, updated_at",
938 self.mailbox_threads_table, self.mailbox_threads_table
939 ))
940 .bind(mailbox_id)
941 .bind(now_i64)
942 .fetch_one(&mut *tx)
943 .await
944 .map_err(Self::mailbox_sql_err)?;
945
946 let mailbox_state = MailboxState {
947 mailbox_id: state_row.0,
948 current_generation: u64::try_from(state_row.1).map_err(|_| {
949 MailboxStoreError::Backend(format!(
950 "current_generation cannot be negative in postgres BIGINT: {}",
951 state_row.1
952 ))
953 })?,
954 updated_at: u64::try_from(state_row.2).map_err(|_| {
955 MailboxStoreError::Backend(format!(
956 "updated_at cannot be negative in postgres BIGINT: {}",
957 state_row.2
958 ))
959 })?,
960 };
961
962 let rows = sqlx::query_as::<_, MailboxRow>(&format!(
963 "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
964 lease_until = NULL, updated_at = $3 \
965 WHERE mailbox_id = $4 AND generation < $5 \
966 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
967 RETURNING entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
968 generation, status, available_at, attempt_count, last_error, claim_token, \
969 claimed_by, lease_until, created_at, updated_at",
970 self.mailbox_table
971 ))
972 .bind(Self::encode_mailbox_status(
973 tirea_contract::MailboxEntryStatus::Superseded,
974 ))
975 .bind("superseded by interrupt")
976 .bind(now_i64)
977 .bind(mailbox_id)
978 .bind(
979 i64::try_from(mailbox_state.current_generation).map_err(|_| {
980 MailboxStoreError::Backend("generation too large for postgres BIGINT".to_string())
981 })?,
982 )
983 .fetch_all(&mut *tx)
984 .await
985 .map_err(Self::mailbox_sql_err)?;
986
987 tx.commit().await.map_err(Self::mailbox_sql_err)?;
988
989 let mut superseded_entries = Vec::with_capacity(rows.len());
990 for row in rows {
991 superseded_entries.push(Self::mailbox_entry_from_row(row)?);
992 }
993
994 Ok(MailboxInterrupt {
995 mailbox_state,
996 superseded_entries,
997 })
998 }
999
1000 async fn extend_lease(
1001 &self,
1002 entry_id: &str,
1003 claim_token: &str,
1004 extension_ms: u64,
1005 now: u64,
1006 ) -> Result<bool, MailboxStoreError> {
1007 let now_i64 = i64::try_from(now).map_err(|_| {
1008 MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
1009 })?;
1010 let lease_until = i64::try_from(now.saturating_add(extension_ms)).map_err(|_| {
1011 MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
1012 })?;
1013 let result = sqlx::query(&format!(
1014 "UPDATE {} SET lease_until = $1, updated_at = $2 \
1015 WHERE entry_id = $3 AND status = 'claimed' AND claim_token = $4",
1016 self.mailbox_table
1017 ))
1018 .bind(lease_until)
1019 .bind(now_i64)
1020 .bind(entry_id)
1021 .bind(claim_token)
1022 .execute(&self.pool)
1023 .await
1024 .map_err(Self::mailbox_sql_err)?;
1025 Ok(result.rows_affected() > 0)
1026 }
1027
1028 async fn purge_terminal_mailbox_entries(
1029 &self,
1030 older_than: u64,
1031 ) -> Result<usize, MailboxStoreError> {
1032 let older_than_i64 = i64::try_from(older_than).map_err(|_| {
1033 MailboxStoreError::Backend("older_than too large for postgres BIGINT".to_string())
1034 })?;
1035 let result = sqlx::query(&format!(
1036 "DELETE FROM {} WHERE status IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
1037 AND updated_at < $1",
1038 self.mailbox_table
1039 ))
1040 .bind(older_than_i64)
1041 .execute(&self.pool)
1042 .await
1043 .map_err(Self::mailbox_sql_err)?;
1044 Ok(result.rows_affected() as usize)
1045 }
1046}
1047
1048#[cfg(feature = "postgres")]
1049#[async_trait]
1050impl ThreadWriter for PostgresStore {
1051 async fn create(&self, thread: &Thread) -> Result<Committed, ThreadStoreError> {
1052 self.ensure_thread_schema_ready().await?;
1053
1054 let mut v = serde_json::to_value(thread)
1055 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1056 if let Some(obj) = v.as_object_mut() {
1057 obj.insert("messages".to_string(), serde_json::Value::Array(Vec::new()));
1058 obj.insert("_version".to_string(), serde_json::Value::Number(0.into()));
1059 }
1060
1061 let sql = format!(
1062 "INSERT INTO {} (id, data, updated_at) VALUES ($1, $2, now())",
1063 self.table
1064 );
1065 sqlx::query(&sql)
1066 .bind(&thread.id)
1067 .bind(&v)
1068 .execute(&self.pool)
1069 .await
1070 .map_err(|e| {
1071 if e.to_string().contains("duplicate key")
1072 || e.to_string().contains("unique constraint")
1073 {
1074 ThreadStoreError::AlreadyExists
1075 } else {
1076 Self::sql_err(e)
1077 }
1078 })?;
1079
1080 let insert_sql = format!(
1082 "INSERT INTO {} (session_id, message_id, run_id, step_index, data) VALUES ($1, $2, $3, $4, $5)",
1083 self.messages_table,
1084 );
1085 for msg in &thread.messages {
1086 let data = serde_json::to_value(msg.as_ref())
1087 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1088 let message_id = msg.id.as_deref();
1089 let (run_id, step_index) = msg
1090 .metadata
1091 .as_ref()
1092 .map(|m| (m.run_id.as_deref(), m.step_index.map(|s| s as i32)))
1093 .unwrap_or((None, None));
1094 sqlx::query(&insert_sql)
1095 .bind(&thread.id)
1096 .bind(message_id)
1097 .bind(run_id)
1098 .bind(step_index)
1099 .bind(&data)
1100 .execute(&self.pool)
1101 .await
1102 .map_err(Self::sql_err)?;
1103 }
1104
1105 Ok(Committed { version: 0 })
1106 }
1107
1108 async fn append(
1109 &self,
1110 thread_id: &str,
1111 delta: &ThreadChangeSet,
1112 precondition: VersionPrecondition,
1113 ) -> Result<Committed, ThreadStoreError> {
1114 self.ensure_thread_schema_ready().await?;
1115
1116 let mut tx = self.pool.begin().await.map_err(Self::sql_err)?;
1117
1118 let sql = format!("SELECT data FROM {} WHERE id = $1 FOR UPDATE", self.table);
1120 let row: Option<(serde_json::Value,)> = sqlx::query_as(&sql)
1121 .bind(thread_id)
1122 .fetch_optional(&mut *tx)
1123 .await
1124 .map_err(Self::sql_err)?;
1125
1126 let Some((mut v,)) = row else {
1127 return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1128 };
1129
1130 let current_version = v.get("_version").and_then(|v| v.as_u64()).unwrap_or(0);
1131 if let VersionPrecondition::Exact(expected) = precondition {
1132 if current_version != expected {
1133 return Err(ThreadStoreError::VersionConflict {
1134 expected,
1135 actual: current_version,
1136 });
1137 }
1138 }
1139 let new_version = current_version + 1;
1140
1141 if let Some(ref snapshot) = delta.snapshot {
1143 if let Some(obj) = v.as_object_mut() {
1144 obj.insert("state".to_string(), snapshot.clone());
1145 obj.insert("patches".to_string(), serde_json::Value::Array(Vec::new()));
1146 }
1147 } else if !delta.patches.is_empty() {
1148 let patches_arr = v
1149 .get("patches")
1150 .cloned()
1151 .unwrap_or(serde_json::Value::Array(Vec::new()));
1152 let mut patches: Vec<serde_json::Value> =
1153 if let serde_json::Value::Array(arr) = patches_arr {
1154 arr
1155 } else {
1156 Vec::new()
1157 };
1158 for p in &delta.patches {
1159 if let Ok(pv) = serde_json::to_value(p) {
1160 patches.push(pv);
1161 }
1162 }
1163 if let Some(obj) = v.as_object_mut() {
1164 obj.insert("patches".to_string(), serde_json::Value::Array(patches));
1165 }
1166 }
1167
1168 if let Some(obj) = v.as_object_mut() {
1169 obj.insert(
1170 "_version".to_string(),
1171 serde_json::Value::Number(new_version.into()),
1172 );
1173 }
1174
1175 let update_sql = format!(
1176 "UPDATE {} SET data = $1, updated_at = now() WHERE id = $2",
1177 self.table
1178 );
1179 sqlx::query(&update_sql)
1180 .bind(&v)
1181 .bind(thread_id)
1182 .execute(&mut *tx)
1183 .await
1184 .map_err(Self::sql_err)?;
1185
1186 if !delta.messages.is_empty() {
1188 let insert_sql = format!(
1189 "INSERT INTO {} (session_id, message_id, run_id, step_index, data) VALUES ($1, $2, $3, $4, $5)",
1190 self.messages_table,
1191 );
1192 for msg in &delta.messages {
1193 let data = serde_json::to_value(msg.as_ref())
1194 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1195 let message_id = msg.id.as_deref();
1196 let (run_id, step_index) = msg
1197 .metadata
1198 .as_ref()
1199 .map(|m| (m.run_id.as_deref(), m.step_index.map(|s| s as i32)))
1200 .unwrap_or((None, None));
1201 sqlx::query(&insert_sql)
1202 .bind(thread_id)
1203 .bind(message_id)
1204 .bind(run_id)
1205 .bind(step_index)
1206 .bind(&data)
1207 .execute(&mut *tx)
1208 .await
1209 .map_err(Self::sql_err)?;
1210 }
1211 }
1212
1213 tx.commit().await.map_err(Self::sql_err)?;
1214 Ok(Committed {
1215 version: new_version,
1216 })
1217 }
1218
1219 async fn delete(&self, thread_id: &str) -> Result<(), ThreadStoreError> {
1220 self.ensure_thread_schema_ready().await?;
1221
1222 let sql = format!("DELETE FROM {} WHERE id = $1", self.table);
1224 sqlx::query(&sql)
1225 .bind(thread_id)
1226 .execute(&self.pool)
1227 .await
1228 .map_err(Self::sql_err)?;
1229 Ok(())
1230 }
1231
1232 async fn save(&self, thread: &Thread) -> Result<(), ThreadStoreError> {
1233 self.ensure_thread_schema_ready().await?;
1234
1235 let mut v = serde_json::to_value(thread)
1237 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1238 if let Some(obj) = v.as_object_mut() {
1239 obj.insert("messages".to_string(), serde_json::Value::Array(Vec::new()));
1240 }
1241
1242 let mut tx = self.pool.begin().await.map_err(Self::sql_err)?;
1244
1245 let select_sql = format!("SELECT data FROM {} WHERE id = $1 FOR UPDATE", self.table);
1247 let existing: Option<(serde_json::Value,)> = sqlx::query_as(&select_sql)
1248 .bind(&thread.id)
1249 .fetch_optional(&mut *tx)
1250 .await
1251 .map_err(Self::sql_err)?;
1252
1253 let next_version = existing
1254 .as_ref()
1255 .and_then(|(data,)| data.get("_version").and_then(serde_json::Value::as_u64))
1256 .map_or(0, |version| version.saturating_add(1));
1257 if let Some(obj) = v.as_object_mut() {
1258 obj.insert(
1259 "_version".to_string(),
1260 serde_json::Value::Number(next_version.into()),
1261 );
1262 }
1263
1264 if existing.is_some() {
1265 let update_sql = format!(
1266 "UPDATE {} SET data = $1, updated_at = now() WHERE id = $2",
1267 self.table
1268 );
1269 sqlx::query(&update_sql)
1270 .bind(&v)
1271 .bind(&thread.id)
1272 .execute(&mut *tx)
1273 .await
1274 .map_err(Self::sql_err)?;
1275 } else {
1276 let insert_sql = format!(
1277 "INSERT INTO {} (id, data, updated_at) VALUES ($1, $2, now())",
1278 self.table
1279 );
1280 sqlx::query(&insert_sql)
1281 .bind(&thread.id)
1282 .bind(&v)
1283 .execute(&mut *tx)
1284 .await
1285 .map_err(Self::sql_err)?;
1286 }
1287
1288 let delete_messages_sql =
1290 format!("DELETE FROM {} WHERE session_id = $1", self.messages_table);
1291 sqlx::query(&delete_messages_sql)
1292 .bind(&thread.id)
1293 .execute(&mut *tx)
1294 .await
1295 .map_err(Self::sql_err)?;
1296
1297 if !thread.messages.is_empty() {
1298 let mut rows = Vec::with_capacity(thread.messages.len());
1299 for msg in &thread.messages {
1300 let data = serde_json::to_value(msg.as_ref())
1301 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1302 let message_id = msg.id.clone();
1303 let (run_id, step_index) = msg
1304 .metadata
1305 .as_ref()
1306 .map(|m| (m.run_id.clone(), m.step_index.map(|s| s as i32)))
1307 .unwrap_or((None, None));
1308 rows.push((message_id, run_id, step_index, data));
1309 }
1310
1311 let mut qb = QueryBuilder::<Postgres>::new(format!(
1312 "INSERT INTO {} (session_id, message_id, run_id, step_index, data) ",
1313 self.messages_table
1314 ));
1315 qb.push_values(
1316 rows.iter(),
1317 |mut b, (message_id, run_id, step_index, data)| {
1318 b.push_bind(&thread.id)
1319 .push_bind(message_id.as_deref())
1320 .push_bind(run_id.as_deref())
1321 .push_bind(*step_index)
1322 .push_bind(data);
1323 },
1324 );
1325 qb.build().execute(&mut *tx).await.map_err(Self::sql_err)?;
1326 }
1327
1328 tx.commit().await.map_err(Self::sql_err)?;
1329 Ok(())
1330 }
1331}
1332
1333#[cfg(feature = "postgres")]
1334#[async_trait]
1335impl ThreadReader for PostgresStore {
1336 async fn load(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
1337 self.ensure_thread_schema_ready().await?;
1338
1339 let sql = format!("SELECT data FROM {} WHERE id = $1", self.table);
1340 let row: Option<(serde_json::Value,)> = sqlx::query_as(&sql)
1341 .bind(thread_id)
1342 .fetch_optional(&self.pool)
1343 .await
1344 .map_err(Self::sql_err)?;
1345
1346 let Some((mut v,)) = row else {
1347 return Ok(None);
1348 };
1349
1350 let version = v.get("_version").and_then(|v| v.as_u64()).unwrap_or(0);
1351
1352 let msg_sql = format!(
1353 "SELECT data FROM {} WHERE session_id = $1 ORDER BY seq",
1354 self.messages_table
1355 );
1356 let msg_rows: Vec<(serde_json::Value,)> = sqlx::query_as(&msg_sql)
1357 .bind(thread_id)
1358 .fetch_all(&self.pool)
1359 .await
1360 .map_err(Self::sql_err)?;
1361
1362 let messages: Vec<serde_json::Value> = msg_rows.into_iter().map(|(d,)| d).collect();
1363 if let Some(obj) = v.as_object_mut() {
1364 obj.insert("messages".to_string(), serde_json::Value::Array(messages));
1365 obj.remove("_version");
1366 }
1367
1368 let thread: Thread = serde_json::from_value(v)
1369 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1370 Ok(Some(ThreadHead { thread, version }))
1371 }
1372
1373 async fn load_messages(
1374 &self,
1375 thread_id: &str,
1376 query: &MessageQuery,
1377 ) -> Result<MessagePage, ThreadStoreError> {
1378 self.ensure_thread_schema_ready().await?;
1379
1380 let exists_sql = format!("SELECT 1 FROM {} WHERE id = $1", self.table);
1382 let exists: Option<(i32,)> = sqlx::query_as(&exists_sql)
1383 .bind(thread_id)
1384 .fetch_optional(&self.pool)
1385 .await
1386 .map_err(Self::sql_err)?;
1387 if exists.is_none() {
1388 return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1389 }
1390
1391 let limit = query.limit.clamp(1, 200);
1392 let fetch_limit = (limit + 1) as i64;
1394
1395 let vis_clause = match query.visibility {
1397 Some(Visibility::All) => {
1398 " AND COALESCE(data->>'visibility', 'all') = 'all'".to_string()
1399 }
1400 Some(Visibility::Internal) => " AND data->>'visibility' = 'internal'".to_string(),
1401 None => String::new(),
1402 };
1403
1404 let run_clause = if query.run_id.is_some() {
1406 " AND run_id = $4"
1407 } else {
1408 ""
1409 };
1410
1411 let extra_param_idx = if query.run_id.is_some() { 5 } else { 4 };
1412
1413 let (sql, cursor_val) = match query.order {
1414 SortOrder::Asc => {
1415 let cursor = query.after.unwrap_or(-1);
1416 let before_clause = if query.before.is_some() {
1417 format!("AND seq < ${extra_param_idx}")
1418 } else {
1419 String::new()
1420 };
1421 let sql = format!(
1422 "SELECT seq, data FROM {} WHERE session_id = $1 AND seq > $2{}{} {} ORDER BY seq ASC LIMIT $3",
1423 self.messages_table, vis_clause, run_clause, before_clause,
1424 );
1425 (sql, cursor)
1426 }
1427 SortOrder::Desc => {
1428 let cursor = query.before.unwrap_or(i64::MAX);
1429 let after_clause = if query.after.is_some() {
1430 format!("AND seq > ${extra_param_idx}")
1431 } else {
1432 String::new()
1433 };
1434 let sql = format!(
1435 "SELECT seq, data FROM {} WHERE session_id = $1 AND seq < $2{}{} {} ORDER BY seq DESC LIMIT $3",
1436 self.messages_table, vis_clause, run_clause, after_clause,
1437 );
1438 (sql, cursor)
1439 }
1440 };
1441
1442 let rows: Vec<(i64, serde_json::Value)> = match query.order {
1443 SortOrder::Asc => {
1444 let mut q = sqlx::query_as(&sql)
1445 .bind(thread_id)
1446 .bind(cursor_val)
1447 .bind(fetch_limit);
1448 if let Some(ref rid) = query.run_id {
1449 q = q.bind(rid);
1450 }
1451 if let Some(before) = query.before {
1452 q = q.bind(before);
1453 }
1454 q.fetch_all(&self.pool).await.map_err(Self::sql_err)?
1455 }
1456 SortOrder::Desc => {
1457 let mut q = sqlx::query_as(&sql)
1458 .bind(thread_id)
1459 .bind(cursor_val)
1460 .bind(fetch_limit);
1461 if let Some(ref rid) = query.run_id {
1462 q = q.bind(rid);
1463 }
1464 if let Some(after) = query.after {
1465 q = q.bind(after);
1466 }
1467 q.fetch_all(&self.pool).await.map_err(Self::sql_err)?
1468 }
1469 };
1470
1471 let has_more = rows.len() > limit;
1472 let limited: Vec<_> = rows.into_iter().take(limit).collect();
1473
1474 let messages: Vec<MessageWithCursor> = limited
1475 .into_iter()
1476 .map(
1477 |(seq, data)| -> Result<MessageWithCursor, ThreadStoreError> {
1478 let message: Message = serde_json::from_value(data).map_err(|e| {
1479 ThreadStoreError::Serialization(format!(
1480 "failed to deserialize message row (thread_id={thread_id}, seq={seq}): {e}"
1481 ))
1482 })?;
1483 Ok(MessageWithCursor {
1484 cursor: seq,
1485 message,
1486 })
1487 },
1488 )
1489 .collect::<Result<Vec<_>, _>>()?;
1490
1491 Ok(MessagePage {
1492 next_cursor: messages.last().map(|m| m.cursor),
1493 prev_cursor: messages.first().map(|m| m.cursor),
1494 messages,
1495 has_more,
1496 })
1497 }
1498
1499 async fn message_count(&self, thread_id: &str) -> Result<usize, ThreadStoreError> {
1500 self.ensure_thread_schema_ready().await?;
1501
1502 let exists_sql = format!("SELECT 1 FROM {} WHERE id = $1", self.table);
1504 let exists: Option<(i32,)> = sqlx::query_as(&exists_sql)
1505 .bind(thread_id)
1506 .fetch_optional(&self.pool)
1507 .await
1508 .map_err(Self::sql_err)?;
1509 if exists.is_none() {
1510 return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1511 }
1512
1513 let sql = format!(
1514 "SELECT COUNT(*)::bigint FROM {} WHERE session_id = $1",
1515 self.messages_table
1516 );
1517 let row: (i64,) = sqlx::query_as(&sql)
1518 .bind(thread_id)
1519 .fetch_one(&self.pool)
1520 .await
1521 .map_err(Self::sql_err)?;
1522 Ok(row.0 as usize)
1523 }
1524
1525 async fn list_threads(
1526 &self,
1527 query: &ThreadListQuery,
1528 ) -> Result<ThreadListPage, ThreadStoreError> {
1529 self.ensure_thread_schema_ready().await?;
1530
1531 let limit = query.limit.clamp(1, 200);
1532 let fetch_limit = (limit + 1) as i64;
1533 let offset = query.offset as i64;
1534
1535 let mut count_filters = Vec::new();
1536 let mut data_filters = Vec::new();
1537 if query.resource_id.is_some() {
1538 count_filters.push("data->>'resource_id' = $1".to_string());
1539 data_filters.push("data->>'resource_id' = $3".to_string());
1540 }
1541 if query.parent_thread_id.is_some() {
1542 let idx = if query.resource_id.is_some() { 2 } else { 1 };
1543 count_filters.push(format!("data->>'parent_thread_id' = ${idx}"));
1544 let data_idx = if query.resource_id.is_some() { 4 } else { 3 };
1545 data_filters.push(format!("data->>'parent_thread_id' = ${data_idx}"));
1546 }
1547
1548 let where_count = if count_filters.is_empty() {
1549 String::new()
1550 } else {
1551 format!(" WHERE {}", count_filters.join(" AND "))
1552 };
1553
1554 let count_sql = format!("SELECT COUNT(*)::bigint FROM {}{}", self.table, where_count);
1555 let where_data = if data_filters.is_empty() {
1556 String::new()
1557 } else {
1558 format!(" WHERE {}", data_filters.join(" AND "))
1559 };
1560 let data_sql = format!(
1561 "SELECT id FROM {}{} ORDER BY id LIMIT $1 OFFSET $2",
1562 self.table, where_data
1563 );
1564
1565 let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql);
1566 if let Some(ref rid) = query.resource_id {
1567 count_q = count_q.bind(rid);
1568 }
1569 if let Some(ref pid) = query.parent_thread_id {
1570 count_q = count_q.bind(pid);
1571 }
1572 let total = count_q.fetch_one(&self.pool).await.map_err(Self::sql_err)?;
1573
1574 let mut data_q = sqlx::query_scalar::<_, String>(&data_sql)
1575 .bind(fetch_limit)
1576 .bind(offset);
1577 if let Some(ref rid) = query.resource_id {
1578 data_q = data_q.bind(rid);
1579 }
1580 if let Some(ref pid) = query.parent_thread_id {
1581 data_q = data_q.bind(pid);
1582 }
1583 let rows: Vec<String> = data_q.fetch_all(&self.pool).await.map_err(Self::sql_err)?;
1584
1585 let has_more = rows.len() > limit;
1586 let items = rows.into_iter().take(limit).collect();
1587
1588 Ok(ThreadListPage {
1589 items,
1590 total: total as usize,
1591 has_more,
1592 })
1593 }
1594
1595 async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
1596 <Self as RunReader>::load_run(self, run_id)
1597 .await
1598 .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1599 }
1600
1601 async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, ThreadStoreError> {
1602 <Self as RunReader>::list_runs(self, query)
1603 .await
1604 .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1605 }
1606
1607 async fn active_run_for_thread(
1608 &self,
1609 thread_id: &str,
1610 ) -> Result<Option<RunRecord>, ThreadStoreError> {
1611 <Self as RunReader>::load_current_run(self, thread_id)
1612 .await
1613 .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1614 }
1615}
1616
1617#[cfg(feature = "postgres")]
1618type RunRowTuple = (
1619 String,
1620 String,
1621 String,
1622 Option<String>,
1623 Option<String>,
1624 String,
1625 String,
1626 Option<String>,
1627 Option<String>,
1628 i64,
1629 i64,
1630 Option<String>,
1631 Option<serde_json::Value>,
1632);
1633
1634#[cfg(feature = "postgres")]
1635struct MailboxRow {
1636 entry_id: String,
1637 mailbox_id: String,
1638 origin: String,
1639 sender_id: Option<String>,
1640 payload: serde_json::Value,
1641 priority: i16,
1642 dedupe_key: Option<String>,
1643 generation: i64,
1644 status: String,
1645 available_at: i64,
1646 attempt_count: i32,
1647 last_error: Option<String>,
1648 claim_token: Option<String>,
1649 claimed_by: Option<String>,
1650 lease_until: Option<i64>,
1651 created_at: i64,
1652 updated_at: i64,
1653}
1654
1655#[cfg(feature = "postgres")]
1656impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for MailboxRow {
1657 fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
1658 use sqlx::Row;
1659
1660 Ok(Self {
1661 entry_id: row.try_get("entry_id")?,
1662 mailbox_id: row.try_get("mailbox_id")?,
1663 origin: row.try_get("origin")?,
1664 sender_id: row.try_get("sender_id")?,
1665 payload: row.try_get("payload")?,
1666 priority: row.try_get("priority")?,
1667 dedupe_key: row.try_get("dedupe_key")?,
1668 generation: row.try_get("generation")?,
1669 status: row.try_get("status")?,
1670 available_at: row.try_get("available_at")?,
1671 attempt_count: row.try_get("attempt_count")?,
1672 last_error: row.try_get("last_error")?,
1673 claim_token: row.try_get("claim_token")?,
1674 claimed_by: row.try_get("claimed_by")?,
1675 lease_until: row.try_get("lease_until")?,
1676 created_at: row.try_get("created_at")?,
1677 updated_at: row.try_get("updated_at")?,
1678 })
1679 }
1680}
1681
1682#[cfg(feature = "postgres")]
1683impl PostgresStore {
1684 fn run_from_row(row: RunRowTuple) -> Result<RunRecord, RunStoreError> {
1685 let (
1686 run_id,
1687 thread_id,
1688 agent_id,
1689 parent_run_id,
1690 parent_thread_id,
1691 origin,
1692 status,
1693 termination_code,
1694 termination_detail,
1695 created_at,
1696 updated_at,
1697 source_mailbox_entry_id,
1698 metadata,
1699 ) = row;
1700 Ok(RunRecord {
1701 run_id,
1702 thread_id,
1703 agent_id,
1704 parent_run_id,
1705 parent_thread_id,
1706 origin: Self::decode_origin(&origin)?,
1707 status: Self::decode_status(&status)?,
1708 termination_code,
1709 termination_detail,
1710 created_at: Self::from_db_timestamp(created_at, "created_at")?,
1711 updated_at: Self::from_db_timestamp(updated_at, "updated_at")?,
1712 source_mailbox_entry_id,
1713 metadata,
1714 })
1715 }
1716
1717 fn mailbox_entry_from_row(row: MailboxRow) -> Result<MailboxEntry, MailboxStoreError> {
1718 Ok(MailboxEntry {
1719 entry_id: row.entry_id,
1720 mailbox_id: row.mailbox_id,
1721 origin: Self::decode_mailbox_origin(&row.origin)?,
1722 sender_id: row.sender_id,
1723 payload: row.payload,
1724 priority: u8::try_from(row.priority).unwrap_or(0),
1725 dedupe_key: row.dedupe_key,
1726 generation: u64::try_from(row.generation).map_err(|_| {
1727 MailboxStoreError::Backend(format!(
1728 "generation cannot be negative in postgres BIGINT: {}",
1729 row.generation
1730 ))
1731 })?,
1732 status: Self::decode_mailbox_status(&row.status)?,
1733 available_at: Self::from_db_timestamp(row.available_at, "available_at")
1734 .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1735 attempt_count: u32::try_from(row.attempt_count).map_err(|_| {
1736 MailboxStoreError::Backend(format!(
1737 "attempt_count cannot be negative in postgres INTEGER: {}",
1738 row.attempt_count
1739 ))
1740 })?,
1741 last_error: row.last_error,
1742 claim_token: row.claim_token,
1743 claimed_by: row.claimed_by,
1744 lease_until: row
1745 .lease_until
1746 .map(|value| {
1747 Self::from_db_timestamp(value, "lease_until")
1748 .map_err(|e| MailboxStoreError::Backend(e.to_string()))
1749 })
1750 .transpose()?,
1751 created_at: Self::from_db_timestamp(row.created_at, "created_at")
1752 .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1753 updated_at: Self::from_db_timestamp(row.updated_at, "updated_at")
1754 .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1755 })
1756 }
1757}
1758
1759#[cfg(feature = "postgres")]
1760#[async_trait]
1761impl RunReader for PostgresStore {
1762 async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
1763 self.ensure_run_schema_ready().await?;
1764
1765 let sql = format!(
1766 "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata FROM {} WHERE run_id = $1",
1767 self.runs_table
1768 );
1769 let row = sqlx::query_as::<_, RunRowTuple>(&sql)
1770 .bind(run_id)
1771 .fetch_optional(&self.pool)
1772 .await
1773 .map_err(Self::run_sql_err)?;
1774 row.map(Self::run_from_row).transpose()
1775 }
1776
1777 async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, RunStoreError> {
1778 self.ensure_run_schema_ready().await?;
1779
1780 let limit = query.limit.clamp(1, 200);
1781 let fetch_limit = (limit + 1) as i64;
1782 let offset = i64::try_from(query.offset)
1783 .map_err(|_| RunStoreError::Backend("offset is too large".to_string()))?;
1784
1785 let mut count_qb = QueryBuilder::<Postgres>::new(format!(
1786 "SELECT COUNT(*)::bigint FROM {}",
1787 self.runs_table
1788 ));
1789 let mut has_where = false;
1790 if let Some(thread_id) = query.thread_id.as_deref() {
1791 count_qb.push(if has_where { " AND " } else { " WHERE " });
1792 has_where = true;
1793 count_qb.push("thread_id = ").push_bind(thread_id);
1794 }
1795 if let Some(parent_run_id) = query.parent_run_id.as_deref() {
1796 count_qb.push(if has_where { " AND " } else { " WHERE " });
1797 has_where = true;
1798 count_qb.push("parent_run_id = ").push_bind(parent_run_id);
1799 }
1800 if let Some(status) = query.status {
1801 count_qb.push(if has_where { " AND " } else { " WHERE " });
1802 has_where = true;
1803 count_qb
1804 .push("status = ")
1805 .push_bind(Self::encode_status(status));
1806 }
1807 if let Some(termination_code) = query.termination_code.as_deref() {
1808 count_qb.push(if has_where { " AND " } else { " WHERE " });
1809 has_where = true;
1810 count_qb
1811 .push("termination_code = ")
1812 .push_bind(termination_code);
1813 }
1814 if let Some(origin) = query.origin {
1815 count_qb.push(if has_where { " AND " } else { " WHERE " });
1816 has_where = true;
1817 count_qb
1818 .push("origin = ")
1819 .push_bind(Self::encode_origin(origin));
1820 }
1821 if let Some(created_at_from) = query.created_at_from {
1822 let created_at_from = Self::to_db_timestamp(created_at_from, "created_at_from")?;
1823 count_qb.push(if has_where { " AND " } else { " WHERE " });
1824 has_where = true;
1825 count_qb.push("created_at >= ").push_bind(created_at_from);
1826 }
1827 if let Some(created_at_to) = query.created_at_to {
1828 let created_at_to = Self::to_db_timestamp(created_at_to, "created_at_to")?;
1829 count_qb.push(if has_where { " AND " } else { " WHERE " });
1830 has_where = true;
1831 count_qb.push("created_at <= ").push_bind(created_at_to);
1832 }
1833 if let Some(updated_at_from) = query.updated_at_from {
1834 let updated_at_from = Self::to_db_timestamp(updated_at_from, "updated_at_from")?;
1835 count_qb.push(if has_where { " AND " } else { " WHERE " });
1836 has_where = true;
1837 count_qb.push("updated_at >= ").push_bind(updated_at_from);
1838 }
1839 if let Some(updated_at_to) = query.updated_at_to {
1840 let updated_at_to = Self::to_db_timestamp(updated_at_to, "updated_at_to")?;
1841 count_qb.push(if has_where { " AND " } else { " WHERE " });
1842 count_qb.push("updated_at <= ").push_bind(updated_at_to);
1843 }
1844 let total: i64 = count_qb
1845 .build_query_scalar()
1846 .fetch_one(&self.pool)
1847 .await
1848 .map_err(Self::run_sql_err)?;
1849
1850 let mut data_qb = QueryBuilder::<Postgres>::new(format!(
1851 "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata FROM {}",
1852 self.runs_table
1853 ));
1854 let mut has_where = false;
1855 if let Some(thread_id) = query.thread_id.as_deref() {
1856 data_qb.push(if has_where { " AND " } else { " WHERE " });
1857 has_where = true;
1858 data_qb.push("thread_id = ").push_bind(thread_id);
1859 }
1860 if let Some(parent_run_id) = query.parent_run_id.as_deref() {
1861 data_qb.push(if has_where { " AND " } else { " WHERE " });
1862 has_where = true;
1863 data_qb.push("parent_run_id = ").push_bind(parent_run_id);
1864 }
1865 if let Some(status) = query.status {
1866 data_qb.push(if has_where { " AND " } else { " WHERE " });
1867 has_where = true;
1868 data_qb
1869 .push("status = ")
1870 .push_bind(Self::encode_status(status));
1871 }
1872 if let Some(termination_code) = query.termination_code.as_deref() {
1873 data_qb.push(if has_where { " AND " } else { " WHERE " });
1874 has_where = true;
1875 data_qb
1876 .push("termination_code = ")
1877 .push_bind(termination_code);
1878 }
1879 if let Some(origin) = query.origin {
1880 data_qb.push(if has_where { " AND " } else { " WHERE " });
1881 has_where = true;
1882 data_qb
1883 .push("origin = ")
1884 .push_bind(Self::encode_origin(origin));
1885 }
1886 if let Some(created_at_from) = query.created_at_from {
1887 let created_at_from = Self::to_db_timestamp(created_at_from, "created_at_from")?;
1888 data_qb.push(if has_where { " AND " } else { " WHERE " });
1889 has_where = true;
1890 data_qb.push("created_at >= ").push_bind(created_at_from);
1891 }
1892 if let Some(created_at_to) = query.created_at_to {
1893 let created_at_to = Self::to_db_timestamp(created_at_to, "created_at_to")?;
1894 data_qb.push(if has_where { " AND " } else { " WHERE " });
1895 has_where = true;
1896 data_qb.push("created_at <= ").push_bind(created_at_to);
1897 }
1898 if let Some(updated_at_from) = query.updated_at_from {
1899 let updated_at_from = Self::to_db_timestamp(updated_at_from, "updated_at_from")?;
1900 data_qb.push(if has_where { " AND " } else { " WHERE " });
1901 has_where = true;
1902 data_qb.push("updated_at >= ").push_bind(updated_at_from);
1903 }
1904 if let Some(updated_at_to) = query.updated_at_to {
1905 let updated_at_to = Self::to_db_timestamp(updated_at_to, "updated_at_to")?;
1906 data_qb.push(if has_where { " AND " } else { " WHERE " });
1907 data_qb.push("updated_at <= ").push_bind(updated_at_to);
1908 }
1909 data_qb
1910 .push(" ORDER BY created_at ASC, run_id ASC LIMIT ")
1911 .push_bind(fetch_limit)
1912 .push(" OFFSET ")
1913 .push_bind(offset);
1914
1915 let rows: Vec<RunRowTuple> = data_qb
1916 .build_query_as()
1917 .fetch_all(&self.pool)
1918 .await
1919 .map_err(Self::run_sql_err)?;
1920 let has_more = rows.len() > limit;
1921 let items = rows
1922 .into_iter()
1923 .take(limit)
1924 .map(Self::run_from_row)
1925 .collect::<Result<Vec<_>, _>>()?;
1926
1927 Ok(RunPage {
1928 items,
1929 total: usize::try_from(total)
1930 .map_err(|_| RunStoreError::Backend("total is negative".to_string()))?,
1931 has_more,
1932 })
1933 }
1934
1935 async fn resolve_thread_id(&self, run_id: &str) -> Result<Option<String>, RunStoreError> {
1936 self.ensure_run_schema_ready().await?;
1937
1938 let sql = format!(
1939 "SELECT thread_id FROM {} WHERE run_id = $1",
1940 self.runs_table
1941 );
1942 sqlx::query_scalar::<_, String>(&sql)
1943 .bind(run_id)
1944 .fetch_optional(&self.pool)
1945 .await
1946 .map_err(Self::run_sql_err)
1947 }
1948
1949 async fn load_current_run(&self, thread_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
1950 self.ensure_run_schema_ready().await?;
1951
1952 let sql = format!(
1953 "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, \
1954 termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata \
1955 FROM {} WHERE thread_id = $1 AND status != $2 \
1956 ORDER BY created_at DESC, updated_at DESC, run_id DESC LIMIT 1",
1957 self.runs_table
1958 );
1959 let row = sqlx::query_as::<_, RunRowTuple>(&sql)
1960 .bind(thread_id)
1961 .bind(Self::encode_status(RunStatus::Done))
1962 .fetch_optional(&self.pool)
1963 .await
1964 .map_err(Self::run_sql_err)?;
1965 row.map(Self::run_from_row).transpose()
1966 }
1967}
1968
1969#[cfg(feature = "postgres")]
1970#[async_trait]
1971impl RunWriter for PostgresStore {
1972 async fn upsert_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
1973 self.ensure_run_schema_ready().await?;
1974
1975 let created_at = Self::to_db_timestamp(record.created_at, "created_at")?;
1976 let updated_at = Self::to_db_timestamp(record.updated_at, "updated_at")?;
1977 let sql = format!(
1978 "INSERT INTO {} (run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, \
1979 termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata) \
1980 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \
1981 ON CONFLICT (run_id) DO UPDATE SET thread_id = EXCLUDED.thread_id, agent_id = EXCLUDED.agent_id, \
1982 parent_run_id = EXCLUDED.parent_run_id, parent_thread_id = EXCLUDED.parent_thread_id, \
1983 origin = EXCLUDED.origin, status = EXCLUDED.status, termination_code = EXCLUDED.termination_code, \
1984 termination_detail = EXCLUDED.termination_detail, created_at = EXCLUDED.created_at, \
1985 updated_at = EXCLUDED.updated_at, source_mailbox_entry_id = EXCLUDED.source_mailbox_entry_id, \
1986 metadata = EXCLUDED.metadata",
1987 self.runs_table
1988 );
1989 sqlx::query(&sql)
1990 .bind(&record.run_id)
1991 .bind(&record.thread_id)
1992 .bind(&record.agent_id)
1993 .bind(record.parent_run_id.as_deref())
1994 .bind(record.parent_thread_id.as_deref())
1995 .bind(Self::encode_origin(record.origin))
1996 .bind(Self::encode_status(record.status))
1997 .bind(record.termination_code.as_deref())
1998 .bind(record.termination_detail.as_deref())
1999 .bind(created_at)
2000 .bind(updated_at)
2001 .bind(record.source_mailbox_entry_id.as_deref())
2002 .bind(&record.metadata)
2003 .execute(&self.pool)
2004 .await
2005 .map_err(Self::run_sql_err)?;
2006 Ok(())
2007 }
2008
2009 async fn delete_run(&self, run_id: &str) -> Result<(), RunStoreError> {
2010 self.ensure_run_schema_ready().await?;
2011
2012 let sql = format!("DELETE FROM {} WHERE run_id = $1", self.runs_table);
2013 sqlx::query(&sql)
2014 .bind(run_id)
2015 .execute(&self.pool)
2016 .await
2017 .map_err(Self::run_sql_err)?;
2018 Ok(())
2019 }
2020}