tirea_store_adapters/
postgres_store.rs

1use async_trait::async_trait;
2#[cfg(feature = "postgres")]
3use sqlx::{Postgres, QueryBuilder};
4use tirea_contract::storage::{
5    paginate_mailbox_entries, Committed, MailboxEntry, MailboxEntryOrigin, MailboxInterrupt,
6    MailboxPage, MailboxQuery, MailboxReader, MailboxState, MailboxStoreError, MailboxWriter,
7    MessagePage, MessageQuery, MessageWithCursor, RunOrigin, RunPage, RunQuery, RunReader,
8    RunRecord, RunStatus, RunStoreError, RunWriter, SortOrder, ThreadHead, ThreadListPage,
9    ThreadListQuery, ThreadReader, ThreadStoreError, ThreadWriter, VersionPrecondition,
10};
11use tirea_contract::{Message, Thread, ThreadChangeSet, Visibility};
12
13pub struct PostgresStore {
14    pool: sqlx::PgPool,
15    table: String,
16    messages_table: String,
17    runs_table: String,
18    mailbox_table: String,
19    mailbox_threads_table: String,
20    schema_ready: tokio::sync::Mutex<bool>,
21}
22
23#[cfg(feature = "postgres")]
24impl PostgresStore {
25    /// Create a new PostgreSQL storage using the given connection pool.
26    ///
27    /// Sessions are stored in the `agent_sessions` table by default,
28    /// messages in `agent_messages`.
29    pub fn new(pool: sqlx::PgPool) -> Self {
30        Self {
31            pool,
32            table: "agent_sessions".to_string(),
33            messages_table: "agent_messages".to_string(),
34            runs_table: "agent_runs".to_string(),
35            mailbox_table: "agent_mailbox".to_string(),
36            mailbox_threads_table: "agent_mailbox_threads".to_string(),
37            schema_ready: tokio::sync::Mutex::new(false),
38        }
39    }
40
41    /// Create a new PostgreSQL storage with a custom table name.
42    ///
43    /// The messages table will be named `{table}_messages`.
44    pub fn with_table(pool: sqlx::PgPool, table: impl Into<String>) -> Self {
45        let table = table.into();
46        let messages_table = format!("{}_messages", table);
47        let runs_table = format!("{}_runs", table);
48        let mailbox_table = format!("{}_mailbox", table);
49        let mailbox_threads_table = format!("{}_mailbox_threads", table);
50        Self {
51            pool,
52            table,
53            messages_table,
54            runs_table,
55            mailbox_table,
56            mailbox_threads_table,
57            schema_ready: tokio::sync::Mutex::new(false),
58        }
59    }
60
61    fn schema_statements(&self) -> Vec<String> {
62        vec![
63            format!(
64                "CREATE TABLE IF NOT EXISTS {} (id TEXT PRIMARY KEY, data JSONB NOT NULL, updated_at TIMESTAMPTZ NOT NULL DEFAULT now())",
65                self.table
66            ),
67            format!(
68                "CREATE TABLE IF NOT EXISTS {} (seq BIGSERIAL PRIMARY KEY, session_id TEXT NOT NULL REFERENCES {}(id) ON DELETE CASCADE, message_id TEXT, run_id TEXT, step_index INTEGER, data JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now())",
69                self.messages_table, self.table
70            ),
71            format!(
72                "CREATE INDEX IF NOT EXISTS idx_{}_session_seq ON {} (session_id, seq)",
73                self.messages_table, self.messages_table
74            ),
75            format!(
76                "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_message_id ON {} (message_id) WHERE message_id IS NOT NULL",
77                self.messages_table, self.messages_table
78            ),
79            format!(
80                "CREATE INDEX IF NOT EXISTS idx_{}_session_run ON {} (session_id, run_id) WHERE run_id IS NOT NULL",
81                self.messages_table, self.messages_table
82            ),
83            format!(
84                "CREATE INDEX IF NOT EXISTS idx_{}_resource_id ON {} ((data->>'resource_id')) WHERE data ? 'resource_id'",
85                self.table, self.table
86            ),
87            format!(
88                "CREATE INDEX IF NOT EXISTS idx_{}_parent_thread_id ON {} ((data->>'parent_thread_id')) WHERE data ? 'parent_thread_id'",
89                self.table, self.table
90            ),
91            format!(
92                "CREATE TABLE IF NOT EXISTS {} (run_id TEXT PRIMARY KEY, thread_id TEXT NOT NULL, agent_id TEXT NOT NULL DEFAULT '', parent_run_id TEXT, parent_thread_id TEXT, origin TEXT NOT NULL, status TEXT NOT NULL, termination_code TEXT, termination_detail TEXT, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL, source_mailbox_entry_id TEXT, metadata JSONB)",
93                self.runs_table
94            ),
95            format!(
96                "CREATE INDEX IF NOT EXISTS idx_{}_thread_id ON {} (thread_id)",
97                self.runs_table, self.runs_table
98            ),
99            format!(
100                "CREATE INDEX IF NOT EXISTS idx_{}_thread_active ON {} (thread_id, created_at DESC) WHERE status != 'done'",
101                self.runs_table, self.runs_table
102            ),
103            format!(
104                "CREATE INDEX IF NOT EXISTS idx_{}_parent_run_id ON {} (parent_run_id) WHERE parent_run_id IS NOT NULL",
105                self.runs_table, self.runs_table
106            ),
107            format!(
108                "CREATE INDEX IF NOT EXISTS idx_{}_status ON {} (status)",
109                self.runs_table, self.runs_table
110            ),
111            format!(
112                "CREATE INDEX IF NOT EXISTS idx_{}_termination_code ON {} (termination_code) WHERE termination_code IS NOT NULL",
113                self.runs_table, self.runs_table
114            ),
115            format!(
116                "CREATE INDEX IF NOT EXISTS idx_{}_origin ON {} (origin)",
117                self.runs_table, self.runs_table
118            ),
119            format!(
120                "CREATE INDEX IF NOT EXISTS idx_{}_created_at ON {} (created_at, run_id)",
121                self.runs_table, self.runs_table
122            ),
123            format!(
124                "CREATE TABLE IF NOT EXISTS {} (entry_id TEXT PRIMARY KEY, mailbox_id TEXT NOT NULL, origin TEXT NOT NULL DEFAULT 'external', sender_id TEXT, payload JSONB NOT NULL, priority SMALLINT NOT NULL DEFAULT 0, dedupe_key TEXT, generation BIGINT NOT NULL DEFAULT 0, status TEXT NOT NULL, available_at BIGINT NOT NULL, attempt_count INTEGER NOT NULL DEFAULT 0, last_error TEXT, claim_token TEXT, claimed_by TEXT, lease_until BIGINT, created_at BIGINT NOT NULL, updated_at BIGINT NOT NULL)",
125                self.mailbox_table
126            ),
127            format!(
128                "CREATE TABLE IF NOT EXISTS {} (mailbox_id TEXT PRIMARY KEY, current_generation BIGINT NOT NULL DEFAULT 0, updated_at BIGINT NOT NULL)",
129                self.mailbox_threads_table
130            ),
131            format!(
132                "CREATE INDEX IF NOT EXISTS idx_{}_status_available ON {} (status, available_at, created_at)",
133                self.mailbox_table, self.mailbox_table
134            ),
135            format!(
136                "CREATE INDEX IF NOT EXISTS idx_{}_mailbox_status ON {} (mailbox_id, status, created_at)",
137                self.mailbox_table, self.mailbox_table
138            ),
139            format!(
140                "CREATE INDEX IF NOT EXISTS idx_{}_mailbox_origin_status ON {} (mailbox_id, origin, status, created_at)",
141                self.mailbox_table, self.mailbox_table
142            ),
143            format!(
144                "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_mailbox_dedupe ON {} (mailbox_id, dedupe_key) WHERE dedupe_key IS NOT NULL",
145                self.mailbox_table, self.mailbox_table
146            ),
147            // Migration: add agent_id column to existing tables.
148            format!(
149                "ALTER TABLE {} ADD COLUMN IF NOT EXISTS agent_id TEXT NOT NULL DEFAULT ''",
150                self.runs_table
151            ),
152            format!(
153                "ALTER TABLE {} ADD COLUMN IF NOT EXISTS origin TEXT NOT NULL DEFAULT 'external'",
154                self.mailbox_table
155            ),
156            // Enforce at most one non-terminal run per thread.
157            format!(
158                "CREATE UNIQUE INDEX IF NOT EXISTS idx_{}_thread_active_unique ON {} (thread_id) WHERE status != 'done'",
159                self.runs_table, self.runs_table
160            ),
161        ]
162    }
163
164    async fn ensure_schema_ready(&self) -> Result<(), sqlx::Error> {
165        let mut schema_ready = self.schema_ready.lock().await;
166        if *schema_ready {
167            return Ok(());
168        }
169
170        // Only flip the flag after all statements succeed so transient failures can retry.
171        for sql in self.schema_statements() {
172            sqlx::query(&sql).execute(&self.pool).await?;
173        }
174
175        *schema_ready = true;
176        Ok(())
177    }
178
179    async fn ensure_thread_schema_ready(&self) -> Result<(), ThreadStoreError> {
180        self.ensure_schema_ready().await.map_err(Self::sql_err)
181    }
182
183    async fn ensure_run_schema_ready(&self) -> Result<(), RunStoreError> {
184        self.ensure_schema_ready().await.map_err(Self::run_sql_err)
185    }
186
187    /// Ensure the storage tables exist (idempotent).
188    ///
189    /// This is optional for callers; the store also initializes its schema
190    /// automatically on first access.
191    pub async fn ensure_table(&self) -> Result<(), ThreadStoreError> {
192        self.ensure_thread_schema_ready().await?;
193        Ok(())
194    }
195
196    fn sql_err(e: sqlx::Error) -> ThreadStoreError {
197        ThreadStoreError::Io(std::io::Error::other(e.to_string()))
198    }
199
200    fn run_sql_err(e: sqlx::Error) -> RunStoreError {
201        RunStoreError::Backend(e.to_string())
202    }
203
204    fn mailbox_sql_err(e: sqlx::Error) -> MailboxStoreError {
205        MailboxStoreError::Backend(e.to_string())
206    }
207
208    fn encode_origin(origin: RunOrigin) -> &'static str {
209        match origin {
210            RunOrigin::User => "user",
211            RunOrigin::Subagent => "subagent",
212            RunOrigin::AgUi => "ag_ui",
213            RunOrigin::AiSdk => "ai_sdk",
214            RunOrigin::A2a => "a2a",
215            RunOrigin::Internal => "internal",
216        }
217    }
218
219    fn decode_origin(raw: &str) -> Result<RunOrigin, RunStoreError> {
220        match raw {
221            "user" => Ok(RunOrigin::User),
222            "subagent" => Ok(RunOrigin::Subagent),
223            "ag_ui" => Ok(RunOrigin::AgUi),
224            "ai_sdk" => Ok(RunOrigin::AiSdk),
225            "a2a" => Ok(RunOrigin::A2a),
226            "internal" => Ok(RunOrigin::Internal),
227            _ => Err(RunStoreError::Backend(format!(
228                "invalid run origin value: {raw}"
229            ))),
230        }
231    }
232
233    fn encode_mailbox_origin(origin: MailboxEntryOrigin) -> &'static str {
234        match origin {
235            MailboxEntryOrigin::External => "external",
236            MailboxEntryOrigin::Internal => "internal",
237        }
238    }
239
240    fn decode_mailbox_origin(raw: &str) -> Result<MailboxEntryOrigin, MailboxStoreError> {
241        match raw {
242            "external" => Ok(MailboxEntryOrigin::External),
243            "internal" => Ok(MailboxEntryOrigin::Internal),
244            _ => Err(MailboxStoreError::Backend(format!(
245                "invalid mailbox origin value: {raw}"
246            ))),
247        }
248    }
249
250    fn encode_status(status: RunStatus) -> &'static str {
251        match status {
252            RunStatus::Running => "running",
253            RunStatus::Waiting => "waiting",
254            RunStatus::Done => "done",
255        }
256    }
257
258    fn decode_status(raw: &str) -> Result<RunStatus, RunStoreError> {
259        match raw {
260            "running" => Ok(RunStatus::Running),
261            "waiting" => Ok(RunStatus::Waiting),
262            "done" => Ok(RunStatus::Done),
263            // Backward compatibility for legacy persisted values.
264            "submitted" | "working" => Ok(RunStatus::Running),
265            "input_required" | "auth_required" => Ok(RunStatus::Waiting),
266            "completed" | "failed" | "canceled" | "cancelled" | "rejected" => Ok(RunStatus::Done),
267            _ => Err(RunStoreError::Backend(format!(
268                "invalid run status value: {raw}"
269            ))),
270        }
271    }
272
273    fn encode_mailbox_status(status: tirea_contract::MailboxEntryStatus) -> &'static str {
274        match status {
275            tirea_contract::MailboxEntryStatus::Queued => "queued",
276            tirea_contract::MailboxEntryStatus::Claimed => "claimed",
277            tirea_contract::MailboxEntryStatus::Accepted => "accepted",
278            tirea_contract::MailboxEntryStatus::Superseded => "superseded",
279            tirea_contract::MailboxEntryStatus::Cancelled => "cancelled",
280            tirea_contract::MailboxEntryStatus::DeadLetter => "dead_letter",
281        }
282    }
283
284    fn decode_mailbox_status(
285        raw: &str,
286    ) -> Result<tirea_contract::MailboxEntryStatus, MailboxStoreError> {
287        match raw {
288            "queued" => Ok(tirea_contract::MailboxEntryStatus::Queued),
289            "claimed" => Ok(tirea_contract::MailboxEntryStatus::Claimed),
290            "accepted" => Ok(tirea_contract::MailboxEntryStatus::Accepted),
291            "superseded" => Ok(tirea_contract::MailboxEntryStatus::Superseded),
292            "cancelled" | "canceled" => Ok(tirea_contract::MailboxEntryStatus::Cancelled),
293            "dead_letter" => Ok(tirea_contract::MailboxEntryStatus::DeadLetter),
294            _ => Err(MailboxStoreError::Backend(format!(
295                "invalid mailbox status value: {raw}"
296            ))),
297        }
298    }
299
300    fn to_db_timestamp(value: u64, field: &str) -> Result<i64, RunStoreError> {
301        i64::try_from(value).map_err(|_| {
302            RunStoreError::Backend(format!("{field} is too large for postgres BIGINT: {value}"))
303        })
304    }
305
306    fn from_db_timestamp(value: i64, field: &str) -> Result<u64, RunStoreError> {
307        u64::try_from(value).map_err(|_| {
308            RunStoreError::Backend(format!(
309                "{field} cannot be negative in postgres BIGINT: {value}"
310            ))
311        })
312    }
313}
314
315#[cfg(feature = "postgres")]
316#[async_trait]
317impl MailboxReader for PostgresStore {
318    async fn load_mailbox_entry(
319        &self,
320        entry_id: &str,
321    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
322        let sql = format!(
323            "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
324             generation, status, available_at, attempt_count, last_error, claim_token, \
325             claimed_by, lease_until, created_at, updated_at FROM {} WHERE entry_id = $1",
326            self.mailbox_table
327        );
328        let row = sqlx::query_as::<_, MailboxRow>(&sql)
329            .bind(entry_id)
330            .fetch_optional(&self.pool)
331            .await
332            .map_err(Self::mailbox_sql_err)?;
333        row.map(Self::mailbox_entry_from_row).transpose()
334    }
335
336    async fn load_mailbox_state(
337        &self,
338        mailbox_id: &str,
339    ) -> Result<Option<MailboxState>, MailboxStoreError> {
340        let row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
341            "SELECT mailbox_id, current_generation, updated_at FROM {} WHERE mailbox_id = $1",
342            self.mailbox_threads_table
343        ))
344        .bind(mailbox_id)
345        .fetch_optional(&self.pool)
346        .await
347        .map_err(Self::mailbox_sql_err)?;
348
349        row.map(|(mailbox_id, current_generation, updated_at)| {
350            Ok(MailboxState {
351                mailbox_id,
352                current_generation: u64::try_from(current_generation).map_err(|_| {
353                    MailboxStoreError::Backend(format!(
354                        "current_generation cannot be negative in postgres BIGINT: {current_generation}"
355                    ))
356                })?,
357                updated_at: u64::try_from(updated_at).map_err(|_| {
358                    MailboxStoreError::Backend(format!(
359                        "updated_at cannot be negative in postgres BIGINT: {updated_at}"
360                    ))
361                })?,
362            })
363        })
364        .transpose()
365    }
366
367    async fn list_mailbox_entries(
368        &self,
369        query: &MailboxQuery,
370    ) -> Result<MailboxPage, MailboxStoreError> {
371        let mut qb = QueryBuilder::<Postgres>::new(format!(
372            "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
373             generation, status, available_at, attempt_count, last_error, claim_token, \
374             claimed_by, lease_until, created_at, updated_at FROM {}",
375            self.mailbox_table
376        ));
377
378        let mut has_where = false;
379        if let Some(mailbox_id) = query.mailbox_id.as_deref() {
380            qb.push(if has_where { " AND " } else { " WHERE " });
381            has_where = true;
382            qb.push("mailbox_id = ").push_bind(mailbox_id);
383        }
384        if let Some(origin) = query.origin {
385            qb.push(if has_where { " AND " } else { " WHERE " });
386            has_where = true;
387            qb.push("origin = ")
388                .push_bind(Self::encode_mailbox_origin(origin));
389        }
390        if let Some(status) = query.status {
391            qb.push(if has_where { " AND " } else { " WHERE " });
392            qb.push("status = ")
393                .push_bind(Self::encode_mailbox_status(status));
394        }
395        qb.push(" ORDER BY created_at ASC, entry_id ASC");
396
397        let rows = qb
398            .build_query_as::<MailboxRow>()
399            .fetch_all(&self.pool)
400            .await
401            .map_err(Self::mailbox_sql_err)?;
402
403        let mut entries = Vec::with_capacity(rows.len());
404        for row in rows {
405            entries.push(Self::mailbox_entry_from_row(row)?);
406        }
407        Ok(paginate_mailbox_entries(&entries, query))
408    }
409}
410
411#[cfg(feature = "postgres")]
412#[async_trait]
413impl MailboxWriter for PostgresStore {
414    async fn enqueue_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
415        let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
416        let now_i64 = i64::try_from(entry.updated_at).map_err(|_| {
417            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
418        })?;
419
420        let state_row = sqlx::query_as::<_, (i64,)>(&format!(
421            "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 0, $2) \
422             ON CONFLICT (mailbox_id) DO UPDATE SET updated_at = EXCLUDED.updated_at \
423             RETURNING current_generation",
424            self.mailbox_threads_table
425        ))
426        .bind(&entry.mailbox_id)
427        .bind(now_i64)
428        .fetch_one(&mut *tx)
429        .await
430        .map_err(Self::mailbox_sql_err)?;
431
432        let current_generation = u64::try_from(state_row.0).map_err(|_| {
433            MailboxStoreError::Backend(format!(
434                "current_generation cannot be negative: {}",
435                state_row.0
436            ))
437        })?;
438        if current_generation != entry.generation {
439            return Err(MailboxStoreError::GenerationMismatch {
440                mailbox_id: entry.mailbox_id.clone(),
441                expected: current_generation,
442                actual: entry.generation,
443            });
444        }
445
446        sqlx::query(&format!(
447            "INSERT INTO {} (entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
448             generation, status, available_at, attempt_count, last_error, claim_token, \
449             claimed_by, lease_until, created_at, updated_at) \
450             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
451            self.mailbox_table
452        ))
453        .bind(&entry.entry_id)
454        .bind(&entry.mailbox_id)
455        .bind(Self::encode_mailbox_origin(entry.origin))
456        .bind(entry.sender_id.as_deref())
457        .bind(&entry.payload)
458        .bind(i16::from(entry.priority))
459        .bind(entry.dedupe_key.as_deref())
460        .bind(i64::try_from(entry.generation).map_err(|_| {
461            MailboxStoreError::Backend("generation too large for postgres BIGINT".to_string())
462        })?)
463        .bind(Self::encode_mailbox_status(entry.status))
464        .bind(i64::try_from(entry.available_at).map_err(|_| {
465            MailboxStoreError::Backend("available_at too large for postgres BIGINT".to_string())
466        })?)
467        .bind(i32::try_from(entry.attempt_count).map_err(|_| {
468            MailboxStoreError::Backend(
469                "attempt_count too large for postgres INTEGER".to_string(),
470            )
471        })?)
472        .bind(entry.last_error.as_deref())
473        .bind(entry.claim_token.as_deref())
474        .bind(entry.claimed_by.as_deref())
475        .bind(
476            entry
477                .lease_until
478                .map(i64::try_from)
479                .transpose()
480                .map_err(|_| {
481                    MailboxStoreError::Backend(
482                        "lease_until too large for postgres BIGINT".to_string(),
483                    )
484                })?,
485        )
486        .bind(i64::try_from(entry.created_at).map_err(|_| {
487            MailboxStoreError::Backend("created_at too large for postgres BIGINT".to_string())
488        })?)
489        .bind(now_i64)
490        .execute(&mut *tx)
491        .await
492        .map_err(|error| {
493            let message = error.to_string();
494            if message.contains("duplicate key") || message.contains("unique constraint") {
495                MailboxStoreError::AlreadyExists(entry.entry_id.clone())
496            } else {
497                Self::mailbox_sql_err(error)
498            }
499        })?;
500
501        tx.commit().await.map_err(Self::mailbox_sql_err)?;
502        Ok(())
503    }
504
505    async fn ensure_mailbox_state(
506        &self,
507        mailbox_id: &str,
508        now: u64,
509    ) -> Result<MailboxState, MailboxStoreError> {
510        let now_i64 = i64::try_from(now).map_err(|_| {
511            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
512        })?;
513        let row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
514            "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 0, $2) \
515             ON CONFLICT (mailbox_id) DO UPDATE SET updated_at = EXCLUDED.updated_at \
516             RETURNING mailbox_id, current_generation, updated_at",
517            self.mailbox_threads_table
518        ))
519        .bind(mailbox_id)
520        .bind(now_i64)
521        .fetch_one(&self.pool)
522        .await
523        .map_err(Self::mailbox_sql_err)?;
524        Ok(MailboxState {
525            mailbox_id: row.0,
526            current_generation: u64::try_from(row.1).map_err(|_| {
527                MailboxStoreError::Backend(format!(
528                    "current_generation cannot be negative in postgres BIGINT: {}",
529                    row.1
530                ))
531            })?,
532            updated_at: u64::try_from(row.2).map_err(|_| {
533                MailboxStoreError::Backend(format!(
534                    "updated_at cannot be negative in postgres BIGINT: {}",
535                    row.2
536                ))
537            })?,
538        })
539    }
540
541    async fn claim_mailbox_entries(
542        &self,
543        mailbox_id: Option<&str>,
544        limit: usize,
545        consumer_id: &str,
546        now: u64,
547        lease_duration_ms: u64,
548    ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
549        let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
550        let limit = i64::try_from(limit)
551            .map_err(|_| MailboxStoreError::Backend("claim limit too large".to_string()))?;
552        let now_i64 = i64::try_from(now).map_err(|_| {
553            MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
554        })?;
555        // Mailbox-level exclusive claim: only select entries whose mailbox
556        // does NOT already have an active (non-expired) claim.
557        let exclusive_filter = format!(
558            "AND NOT EXISTS (\
559                SELECT 1 FROM {} e2 \
560                WHERE e2.mailbox_id = e1.mailbox_id \
561                  AND e2.status = 'claimed' \
562                  AND e2.lease_until IS NOT NULL \
563                  AND e2.lease_until > $1 \
564                  AND e2.entry_id != e1.entry_id\
565            )",
566            self.mailbox_table
567        );
568        let rows: Vec<MailboxRow> = if let Some(mailbox_id) = mailbox_id {
569            sqlx::query_as(&format!(
570                "SELECT e1.entry_id, e1.mailbox_id, e1.origin, e1.sender_id, e1.payload, \
571                 e1.priority, e1.dedupe_key, e1.generation, e1.status, e1.available_at, \
572                 e1.attempt_count, e1.last_error, e1.claim_token, e1.claimed_by, e1.lease_until, \
573                 e1.created_at, e1.updated_at FROM {} e1 \
574                 WHERE e1.mailbox_id = $3 \
575                   AND ((e1.status = 'queued' AND e1.available_at <= $1) \
576                    OR (e1.status = 'claimed' AND e1.lease_until IS NOT NULL AND e1.lease_until <= $1)) \
577                 {exclusive_filter} \
578                 ORDER BY e1.priority DESC, e1.available_at ASC, e1.created_at ASC, e1.entry_id ASC \
579                 FOR UPDATE SKIP LOCKED LIMIT $2",
580                self.mailbox_table
581            ))
582            .bind(now_i64)
583            .bind(limit)
584            .bind(mailbox_id)
585            .fetch_all(&mut *tx)
586            .await
587            .map_err(Self::mailbox_sql_err)?
588        } else {
589            sqlx::query_as(&format!(
590                "SELECT e1.entry_id, e1.mailbox_id, e1.origin, e1.sender_id, e1.payload, \
591                 e1.priority, e1.dedupe_key, e1.generation, e1.status, e1.available_at, \
592                 e1.attempt_count, e1.last_error, e1.claim_token, e1.claimed_by, e1.lease_until, \
593                 e1.created_at, e1.updated_at FROM {} e1 \
594                 WHERE (e1.status = 'queued' AND e1.available_at <= $1) \
595                    OR (e1.status = 'claimed' AND e1.lease_until IS NOT NULL AND e1.lease_until <= $1) \
596                 {exclusive_filter} \
597                 ORDER BY e1.priority DESC, e1.available_at ASC, e1.created_at ASC, e1.entry_id ASC \
598                 FOR UPDATE SKIP LOCKED LIMIT $2",
599                self.mailbox_table
600            ))
601            .bind(now_i64)
602            .bind(limit)
603            .fetch_all(&mut *tx)
604            .await
605            .map_err(Self::mailbox_sql_err)?
606        };
607
608        // Deduplicate by mailbox_id: only claim one entry per mailbox.
609        let mut seen_mailbox_ids = std::collections::HashSet::new();
610        let mut claimed = Vec::with_capacity(rows.len());
611        for row in rows {
612            let mut entry = Self::mailbox_entry_from_row(row)?;
613            if !seen_mailbox_ids.insert(entry.mailbox_id.clone()) {
614                continue;
615            }
616            let claim_token = uuid::Uuid::now_v7().simple().to_string();
617            let lease_until = now.saturating_add(lease_duration_ms);
618            sqlx::query(&format!(
619                "UPDATE {} SET status = $1, claim_token = $2, claimed_by = $3, lease_until = $4, \
620                 attempt_count = attempt_count + 1, updated_at = $5 WHERE entry_id = $6",
621                self.mailbox_table
622            ))
623            .bind(Self::encode_mailbox_status(
624                tirea_contract::MailboxEntryStatus::Claimed,
625            ))
626            .bind(&claim_token)
627            .bind(consumer_id)
628            .bind(i64::try_from(lease_until).map_err(|_| {
629                MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
630            })?)
631            .bind(now_i64)
632            .bind(&entry.entry_id)
633            .execute(&mut *tx)
634            .await
635            .map_err(Self::mailbox_sql_err)?;
636            entry.status = tirea_contract::MailboxEntryStatus::Claimed;
637            entry.claim_token = Some(claim_token);
638            entry.claimed_by = Some(consumer_id.to_string());
639            entry.lease_until = Some(lease_until);
640            entry.attempt_count = entry.attempt_count.saturating_add(1);
641            entry.updated_at = now;
642            claimed.push(entry);
643        }
644        tx.commit().await.map_err(Self::mailbox_sql_err)?;
645        Ok(claimed)
646    }
647
648    async fn claim_mailbox_entry(
649        &self,
650        entry_id: &str,
651        consumer_id: &str,
652        now: u64,
653        lease_duration_ms: u64,
654    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
655        let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
656        let now_i64 = i64::try_from(now).map_err(|_| {
657            MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
658        })?;
659        // Mailbox-level exclusive claim: reject if another entry in the same
660        // mailbox already holds an active (non-expired) lease.
661        let row = sqlx::query_as::<_, MailboxRow>(&format!(
662            "SELECT entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
663             generation, status, available_at, attempt_count, last_error, claim_token, \
664             claimed_by, lease_until, created_at, updated_at FROM {} \
665             WHERE entry_id = $1 \
666               AND (status = 'queued' OR (status = 'claimed' AND lease_until IS NOT NULL AND lease_until <= $2)) \
667               AND NOT EXISTS (\
668                   SELECT 1 FROM {} e2 \
669                   WHERE e2.mailbox_id = (SELECT mailbox_id FROM {} WHERE entry_id = $1) \
670                     AND e2.status = 'claimed' \
671                     AND e2.lease_until IS NOT NULL \
672                     AND e2.lease_until > $2 \
673                     AND e2.entry_id != $1\
674               ) \
675             FOR UPDATE SKIP LOCKED",
676            self.mailbox_table, self.mailbox_table, self.mailbox_table
677        ))
678        .bind(entry_id)
679        .bind(now_i64)
680        .fetch_optional(&mut *tx)
681        .await
682        .map_err(Self::mailbox_sql_err)?;
683
684        let Some(row) = row else {
685            tx.commit().await.map_err(Self::mailbox_sql_err)?;
686            return Ok(None);
687        };
688
689        let mut entry = Self::mailbox_entry_from_row(row)?;
690        let claim_token = uuid::Uuid::now_v7().simple().to_string();
691        let lease_until = now.saturating_add(lease_duration_ms);
692        sqlx::query(&format!(
693            "UPDATE {} SET status = $1, claim_token = $2, claimed_by = $3, lease_until = $4, \
694             attempt_count = attempt_count + 1, updated_at = $5 WHERE entry_id = $6",
695            self.mailbox_table
696        ))
697        .bind(Self::encode_mailbox_status(
698            tirea_contract::MailboxEntryStatus::Claimed,
699        ))
700        .bind(&claim_token)
701        .bind(consumer_id)
702        .bind(i64::try_from(lease_until).map_err(|_| {
703            MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
704        })?)
705        .bind(now_i64)
706        .bind(&entry.entry_id)
707        .execute(&mut *tx)
708        .await
709        .map_err(Self::mailbox_sql_err)?;
710        tx.commit().await.map_err(Self::mailbox_sql_err)?;
711
712        entry.status = tirea_contract::MailboxEntryStatus::Claimed;
713        entry.claim_token = Some(claim_token);
714        entry.claimed_by = Some(consumer_id.to_string());
715        entry.lease_until = Some(lease_until);
716        entry.attempt_count = entry.attempt_count.saturating_add(1);
717        entry.updated_at = now;
718        Ok(Some(entry))
719    }
720
721    async fn ack_mailbox_entry(
722        &self,
723        entry_id: &str,
724        claim_token: &str,
725        now: u64,
726    ) -> Result<(), MailboxStoreError> {
727        let updated = sqlx::query(&format!(
728            "UPDATE {} SET status = $1, claim_token = NULL, claimed_by = NULL, \
729             lease_until = NULL, updated_at = $2 WHERE entry_id = $3 AND claim_token = $4",
730            self.mailbox_table
731        ))
732        .bind(Self::encode_mailbox_status(
733            tirea_contract::MailboxEntryStatus::Accepted,
734        ))
735        .bind(i64::try_from(now).map_err(|_| {
736            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
737        })?)
738        .bind(entry_id)
739        .bind(claim_token)
740        .execute(&self.pool)
741        .await
742        .map_err(Self::mailbox_sql_err)?;
743        if updated.rows_affected() == 0 {
744            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
745        }
746        Ok(())
747    }
748
749    async fn nack_mailbox_entry(
750        &self,
751        entry_id: &str,
752        claim_token: &str,
753        retry_at: u64,
754        error: &str,
755        now: u64,
756    ) -> Result<(), MailboxStoreError> {
757        let updated = sqlx::query(&format!(
758            "UPDATE {} SET status = $1, available_at = $2, last_error = $3, claim_token = NULL, \
759             claimed_by = NULL, lease_until = NULL, updated_at = $4 WHERE entry_id = $5 AND claim_token = $6",
760            self.mailbox_table
761        ))
762        .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Queued))
763        .bind(i64::try_from(retry_at).map_err(|_| {
764            MailboxStoreError::Backend("retry_at too large for postgres BIGINT".to_string())
765        })?)
766        .bind(error)
767        .bind(i64::try_from(now).map_err(|_| {
768            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
769        })?)
770        .bind(entry_id)
771        .bind(claim_token)
772        .execute(&self.pool)
773        .await
774        .map_err(Self::mailbox_sql_err)?;
775        if updated.rows_affected() == 0 {
776            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
777        }
778        Ok(())
779    }
780
781    async fn dead_letter_mailbox_entry(
782        &self,
783        entry_id: &str,
784        claim_token: &str,
785        error: &str,
786        now: u64,
787    ) -> Result<(), MailboxStoreError> {
788        let updated = sqlx::query(&format!(
789            "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
790             lease_until = NULL, updated_at = $3 WHERE entry_id = $4 AND claim_token = $5",
791            self.mailbox_table
792        ))
793        .bind(Self::encode_mailbox_status(
794            tirea_contract::MailboxEntryStatus::DeadLetter,
795        ))
796        .bind(error)
797        .bind(i64::try_from(now).map_err(|_| {
798            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
799        })?)
800        .bind(entry_id)
801        .bind(claim_token)
802        .execute(&self.pool)
803        .await
804        .map_err(Self::mailbox_sql_err)?;
805        if updated.rows_affected() == 0 {
806            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
807        }
808        Ok(())
809    }
810
811    async fn cancel_mailbox_entry(
812        &self,
813        entry_id: &str,
814        now: u64,
815    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
816        let updated = sqlx::query(&format!(
817            "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
818             lease_until = NULL, updated_at = $3 \
819             WHERE entry_id = $4 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter')",
820            self.mailbox_table
821        ))
822        .bind(Self::encode_mailbox_status(
823            tirea_contract::MailboxEntryStatus::Cancelled,
824        ))
825        .bind("cancelled")
826        .bind(i64::try_from(now).map_err(|_| {
827            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
828        })?)
829        .bind(entry_id)
830        .execute(&self.pool)
831        .await
832        .map_err(Self::mailbox_sql_err)?;
833        if updated.rows_affected() == 0 {
834            return self.load_mailbox_entry(entry_id).await;
835        }
836        self.load_mailbox_entry(entry_id).await
837    }
838
839    async fn supersede_mailbox_entry(
840        &self,
841        entry_id: &str,
842        now: u64,
843        reason: &str,
844    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
845        let updated = sqlx::query(&format!(
846            "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
847             lease_until = NULL, updated_at = $3 \
848             WHERE entry_id = $4 AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter')",
849            self.mailbox_table
850        ))
851        .bind(Self::encode_mailbox_status(
852            tirea_contract::MailboxEntryStatus::Superseded,
853        ))
854        .bind(reason)
855        .bind(i64::try_from(now).map_err(|_| {
856            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
857        })?)
858        .bind(entry_id)
859        .execute(&self.pool)
860        .await
861        .map_err(Self::mailbox_sql_err)?;
862        if updated.rows_affected() == 0 {
863            return self.load_mailbox_entry(entry_id).await;
864        }
865        self.load_mailbox_entry(entry_id).await
866    }
867
868    async fn cancel_pending_for_mailbox(
869        &self,
870        mailbox_id: &str,
871        now: u64,
872        exclude_entry_id: Option<&str>,
873    ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
874        let now_i64 = i64::try_from(now).map_err(|_| {
875            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
876        })?;
877        let returning_cols =
878            "entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
879             generation, status, available_at, attempt_count, last_error, claim_token, \
880             claimed_by, lease_until, created_at, updated_at";
881
882        let rows: Vec<MailboxRow> = match exclude_entry_id {
883            Some(entry_id) => {
884                sqlx::query_as(&format!(
885                    "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
886                     lease_until = NULL, updated_at = $3 WHERE mailbox_id = $4 AND entry_id != $5 \
887                     AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
888                     RETURNING {returning_cols}",
889                    self.mailbox_table
890                ))
891                .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Cancelled))
892                .bind("cancelled")
893                .bind(now_i64)
894                .bind(mailbox_id)
895                .bind(entry_id)
896                .fetch_all(&self.pool)
897                .await
898                .map_err(Self::mailbox_sql_err)?
899            }
900            None => {
901                sqlx::query_as(&format!(
902                    "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
903                     lease_until = NULL, updated_at = $3 WHERE mailbox_id = $4 \
904                     AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
905                     RETURNING {returning_cols}",
906                    self.mailbox_table
907                ))
908                .bind(Self::encode_mailbox_status(tirea_contract::MailboxEntryStatus::Cancelled))
909                .bind("cancelled")
910                .bind(now_i64)
911                .bind(mailbox_id)
912                .fetch_all(&self.pool)
913                .await
914                .map_err(Self::mailbox_sql_err)?
915            }
916        };
917
918        let mut entries = Vec::with_capacity(rows.len());
919        for row in rows {
920            entries.push(Self::mailbox_entry_from_row(row)?);
921        }
922        Ok(entries)
923    }
924
925    async fn interrupt_mailbox(
926        &self,
927        mailbox_id: &str,
928        now: u64,
929    ) -> Result<MailboxInterrupt, MailboxStoreError> {
930        let mut tx = self.pool.begin().await.map_err(Self::mailbox_sql_err)?;
931        let now_i64 = i64::try_from(now).map_err(|_| {
932            MailboxStoreError::Backend("updated_at too large for postgres BIGINT".to_string())
933        })?;
934        let state_row = sqlx::query_as::<_, (String, i64, i64)>(&format!(
935            "INSERT INTO {} (mailbox_id, current_generation, updated_at) VALUES ($1, 1, $2) \
936             ON CONFLICT (mailbox_id) DO UPDATE SET current_generation = {}.current_generation + 1, updated_at = EXCLUDED.updated_at \
937             RETURNING mailbox_id, current_generation, updated_at",
938            self.mailbox_threads_table, self.mailbox_threads_table
939        ))
940        .bind(mailbox_id)
941        .bind(now_i64)
942        .fetch_one(&mut *tx)
943        .await
944        .map_err(Self::mailbox_sql_err)?;
945
946        let mailbox_state = MailboxState {
947            mailbox_id: state_row.0,
948            current_generation: u64::try_from(state_row.1).map_err(|_| {
949                MailboxStoreError::Backend(format!(
950                    "current_generation cannot be negative in postgres BIGINT: {}",
951                    state_row.1
952                ))
953            })?,
954            updated_at: u64::try_from(state_row.2).map_err(|_| {
955                MailboxStoreError::Backend(format!(
956                    "updated_at cannot be negative in postgres BIGINT: {}",
957                    state_row.2
958                ))
959            })?,
960        };
961
962        let rows = sqlx::query_as::<_, MailboxRow>(&format!(
963            "UPDATE {} SET status = $1, last_error = $2, claim_token = NULL, claimed_by = NULL, \
964             lease_until = NULL, updated_at = $3 \
965             WHERE mailbox_id = $4 AND generation < $5 \
966             AND status NOT IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
967             RETURNING entry_id, mailbox_id, origin, sender_id, payload, priority, dedupe_key, \
968             generation, status, available_at, attempt_count, last_error, claim_token, \
969             claimed_by, lease_until, created_at, updated_at",
970            self.mailbox_table
971        ))
972        .bind(Self::encode_mailbox_status(
973            tirea_contract::MailboxEntryStatus::Superseded,
974        ))
975        .bind("superseded by interrupt")
976        .bind(now_i64)
977        .bind(mailbox_id)
978        .bind(
979            i64::try_from(mailbox_state.current_generation).map_err(|_| {
980                MailboxStoreError::Backend("generation too large for postgres BIGINT".to_string())
981            })?,
982        )
983        .fetch_all(&mut *tx)
984        .await
985        .map_err(Self::mailbox_sql_err)?;
986
987        tx.commit().await.map_err(Self::mailbox_sql_err)?;
988
989        let mut superseded_entries = Vec::with_capacity(rows.len());
990        for row in rows {
991            superseded_entries.push(Self::mailbox_entry_from_row(row)?);
992        }
993
994        Ok(MailboxInterrupt {
995            mailbox_state,
996            superseded_entries,
997        })
998    }
999
1000    async fn extend_lease(
1001        &self,
1002        entry_id: &str,
1003        claim_token: &str,
1004        extension_ms: u64,
1005        now: u64,
1006    ) -> Result<bool, MailboxStoreError> {
1007        let now_i64 = i64::try_from(now).map_err(|_| {
1008            MailboxStoreError::Backend("now too large for postgres BIGINT".to_string())
1009        })?;
1010        let lease_until = i64::try_from(now.saturating_add(extension_ms)).map_err(|_| {
1011            MailboxStoreError::Backend("lease_until too large for postgres BIGINT".to_string())
1012        })?;
1013        let result = sqlx::query(&format!(
1014            "UPDATE {} SET lease_until = $1, updated_at = $2 \
1015             WHERE entry_id = $3 AND status = 'claimed' AND claim_token = $4",
1016            self.mailbox_table
1017        ))
1018        .bind(lease_until)
1019        .bind(now_i64)
1020        .bind(entry_id)
1021        .bind(claim_token)
1022        .execute(&self.pool)
1023        .await
1024        .map_err(Self::mailbox_sql_err)?;
1025        Ok(result.rows_affected() > 0)
1026    }
1027
1028    async fn purge_terminal_mailbox_entries(
1029        &self,
1030        older_than: u64,
1031    ) -> Result<usize, MailboxStoreError> {
1032        let older_than_i64 = i64::try_from(older_than).map_err(|_| {
1033            MailboxStoreError::Backend("older_than too large for postgres BIGINT".to_string())
1034        })?;
1035        let result = sqlx::query(&format!(
1036            "DELETE FROM {} WHERE status IN ('accepted', 'superseded', 'cancelled', 'dead_letter') \
1037             AND updated_at < $1",
1038            self.mailbox_table
1039        ))
1040        .bind(older_than_i64)
1041        .execute(&self.pool)
1042        .await
1043        .map_err(Self::mailbox_sql_err)?;
1044        Ok(result.rows_affected() as usize)
1045    }
1046}
1047
1048#[cfg(feature = "postgres")]
1049#[async_trait]
1050impl ThreadWriter for PostgresStore {
1051    async fn create(&self, thread: &Thread) -> Result<Committed, ThreadStoreError> {
1052        self.ensure_thread_schema_ready().await?;
1053
1054        let mut v = serde_json::to_value(thread)
1055            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1056        if let Some(obj) = v.as_object_mut() {
1057            obj.insert("messages".to_string(), serde_json::Value::Array(Vec::new()));
1058            obj.insert("_version".to_string(), serde_json::Value::Number(0.into()));
1059        }
1060
1061        let sql = format!(
1062            "INSERT INTO {} (id, data, updated_at) VALUES ($1, $2, now())",
1063            self.table
1064        );
1065        sqlx::query(&sql)
1066            .bind(&thread.id)
1067            .bind(&v)
1068            .execute(&self.pool)
1069            .await
1070            .map_err(|e| {
1071                if e.to_string().contains("duplicate key")
1072                    || e.to_string().contains("unique constraint")
1073                {
1074                    ThreadStoreError::AlreadyExists
1075                } else {
1076                    Self::sql_err(e)
1077                }
1078            })?;
1079
1080        // Insert messages into separate table.
1081        let insert_sql = format!(
1082            "INSERT INTO {} (session_id, message_id, run_id, step_index, data) VALUES ($1, $2, $3, $4, $5)",
1083            self.messages_table,
1084        );
1085        for msg in &thread.messages {
1086            let data = serde_json::to_value(msg.as_ref())
1087                .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1088            let message_id = msg.id.as_deref();
1089            let (run_id, step_index) = msg
1090                .metadata
1091                .as_ref()
1092                .map(|m| (m.run_id.as_deref(), m.step_index.map(|s| s as i32)))
1093                .unwrap_or((None, None));
1094            sqlx::query(&insert_sql)
1095                .bind(&thread.id)
1096                .bind(message_id)
1097                .bind(run_id)
1098                .bind(step_index)
1099                .bind(&data)
1100                .execute(&self.pool)
1101                .await
1102                .map_err(Self::sql_err)?;
1103        }
1104
1105        Ok(Committed { version: 0 })
1106    }
1107
1108    async fn append(
1109        &self,
1110        thread_id: &str,
1111        delta: &ThreadChangeSet,
1112        precondition: VersionPrecondition,
1113    ) -> Result<Committed, ThreadStoreError> {
1114        self.ensure_thread_schema_ready().await?;
1115
1116        let mut tx = self.pool.begin().await.map_err(Self::sql_err)?;
1117
1118        // Lock the row for atomic read-modify-write.
1119        let sql = format!("SELECT data FROM {} WHERE id = $1 FOR UPDATE", self.table);
1120        let row: Option<(serde_json::Value,)> = sqlx::query_as(&sql)
1121            .bind(thread_id)
1122            .fetch_optional(&mut *tx)
1123            .await
1124            .map_err(Self::sql_err)?;
1125
1126        let Some((mut v,)) = row else {
1127            return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1128        };
1129
1130        let current_version = v.get("_version").and_then(|v| v.as_u64()).unwrap_or(0);
1131        if let VersionPrecondition::Exact(expected) = precondition {
1132            if current_version != expected {
1133                return Err(ThreadStoreError::VersionConflict {
1134                    expected,
1135                    actual: current_version,
1136                });
1137            }
1138        }
1139        let new_version = current_version + 1;
1140
1141        // Apply snapshot or patches to stored data.
1142        if let Some(ref snapshot) = delta.snapshot {
1143            if let Some(obj) = v.as_object_mut() {
1144                obj.insert("state".to_string(), snapshot.clone());
1145                obj.insert("patches".to_string(), serde_json::Value::Array(Vec::new()));
1146            }
1147        } else if !delta.patches.is_empty() {
1148            let patches_arr = v
1149                .get("patches")
1150                .cloned()
1151                .unwrap_or(serde_json::Value::Array(Vec::new()));
1152            let mut patches: Vec<serde_json::Value> =
1153                if let serde_json::Value::Array(arr) = patches_arr {
1154                    arr
1155                } else {
1156                    Vec::new()
1157                };
1158            for p in &delta.patches {
1159                if let Ok(pv) = serde_json::to_value(p) {
1160                    patches.push(pv);
1161                }
1162            }
1163            if let Some(obj) = v.as_object_mut() {
1164                obj.insert("patches".to_string(), serde_json::Value::Array(patches));
1165            }
1166        }
1167
1168        if let Some(obj) = v.as_object_mut() {
1169            obj.insert(
1170                "_version".to_string(),
1171                serde_json::Value::Number(new_version.into()),
1172            );
1173        }
1174
1175        let update_sql = format!(
1176            "UPDATE {} SET data = $1, updated_at = now() WHERE id = $2",
1177            self.table
1178        );
1179        sqlx::query(&update_sql)
1180            .bind(&v)
1181            .bind(thread_id)
1182            .execute(&mut *tx)
1183            .await
1184            .map_err(Self::sql_err)?;
1185
1186        // Append new messages.
1187        if !delta.messages.is_empty() {
1188            let insert_sql = format!(
1189                "INSERT INTO {} (session_id, message_id, run_id, step_index, data) VALUES ($1, $2, $3, $4, $5)",
1190                self.messages_table,
1191            );
1192            for msg in &delta.messages {
1193                let data = serde_json::to_value(msg.as_ref())
1194                    .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1195                let message_id = msg.id.as_deref();
1196                let (run_id, step_index) = msg
1197                    .metadata
1198                    .as_ref()
1199                    .map(|m| (m.run_id.as_deref(), m.step_index.map(|s| s as i32)))
1200                    .unwrap_or((None, None));
1201                sqlx::query(&insert_sql)
1202                    .bind(thread_id)
1203                    .bind(message_id)
1204                    .bind(run_id)
1205                    .bind(step_index)
1206                    .bind(&data)
1207                    .execute(&mut *tx)
1208                    .await
1209                    .map_err(Self::sql_err)?;
1210            }
1211        }
1212
1213        tx.commit().await.map_err(Self::sql_err)?;
1214        Ok(Committed {
1215            version: new_version,
1216        })
1217    }
1218
1219    async fn delete(&self, thread_id: &str) -> Result<(), ThreadStoreError> {
1220        self.ensure_thread_schema_ready().await?;
1221
1222        // CASCADE will delete messages automatically.
1223        let sql = format!("DELETE FROM {} WHERE id = $1", self.table);
1224        sqlx::query(&sql)
1225            .bind(thread_id)
1226            .execute(&self.pool)
1227            .await
1228            .map_err(Self::sql_err)?;
1229        Ok(())
1230    }
1231
1232    async fn save(&self, thread: &Thread) -> Result<(), ThreadStoreError> {
1233        self.ensure_thread_schema_ready().await?;
1234
1235        // Serialize session skeleton (without messages).
1236        let mut v = serde_json::to_value(thread)
1237            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1238        if let Some(obj) = v.as_object_mut() {
1239            obj.insert("messages".to_string(), serde_json::Value::Array(Vec::new()));
1240        }
1241
1242        // Use a transaction to keep sessions and messages consistent.
1243        let mut tx = self.pool.begin().await.map_err(Self::sql_err)?;
1244
1245        // Lock existing row to preserve save-version semantics (create = 0, update = +1).
1246        let select_sql = format!("SELECT data FROM {} WHERE id = $1 FOR UPDATE", self.table);
1247        let existing: Option<(serde_json::Value,)> = sqlx::query_as(&select_sql)
1248            .bind(&thread.id)
1249            .fetch_optional(&mut *tx)
1250            .await
1251            .map_err(Self::sql_err)?;
1252
1253        let next_version = existing
1254            .as_ref()
1255            .and_then(|(data,)| data.get("_version").and_then(serde_json::Value::as_u64))
1256            .map_or(0, |version| version.saturating_add(1));
1257        if let Some(obj) = v.as_object_mut() {
1258            obj.insert(
1259                "_version".to_string(),
1260                serde_json::Value::Number(next_version.into()),
1261            );
1262        }
1263
1264        if existing.is_some() {
1265            let update_sql = format!(
1266                "UPDATE {} SET data = $1, updated_at = now() WHERE id = $2",
1267                self.table
1268            );
1269            sqlx::query(&update_sql)
1270                .bind(&v)
1271                .bind(&thread.id)
1272                .execute(&mut *tx)
1273                .await
1274                .map_err(Self::sql_err)?;
1275        } else {
1276            let insert_sql = format!(
1277                "INSERT INTO {} (id, data, updated_at) VALUES ($1, $2, now())",
1278                self.table
1279            );
1280            sqlx::query(&insert_sql)
1281                .bind(&thread.id)
1282                .bind(&v)
1283                .execute(&mut *tx)
1284                .await
1285                .map_err(Self::sql_err)?;
1286        }
1287
1288        // `save()` is replace semantics: persist exactly the provided message set.
1289        let delete_messages_sql =
1290            format!("DELETE FROM {} WHERE session_id = $1", self.messages_table);
1291        sqlx::query(&delete_messages_sql)
1292            .bind(&thread.id)
1293            .execute(&mut *tx)
1294            .await
1295            .map_err(Self::sql_err)?;
1296
1297        if !thread.messages.is_empty() {
1298            let mut rows = Vec::with_capacity(thread.messages.len());
1299            for msg in &thread.messages {
1300                let data = serde_json::to_value(msg.as_ref())
1301                    .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1302                let message_id = msg.id.clone();
1303                let (run_id, step_index) = msg
1304                    .metadata
1305                    .as_ref()
1306                    .map(|m| (m.run_id.clone(), m.step_index.map(|s| s as i32)))
1307                    .unwrap_or((None, None));
1308                rows.push((message_id, run_id, step_index, data));
1309            }
1310
1311            let mut qb = QueryBuilder::<Postgres>::new(format!(
1312                "INSERT INTO {} (session_id, message_id, run_id, step_index, data) ",
1313                self.messages_table
1314            ));
1315            qb.push_values(
1316                rows.iter(),
1317                |mut b, (message_id, run_id, step_index, data)| {
1318                    b.push_bind(&thread.id)
1319                        .push_bind(message_id.as_deref())
1320                        .push_bind(run_id.as_deref())
1321                        .push_bind(*step_index)
1322                        .push_bind(data);
1323                },
1324            );
1325            qb.build().execute(&mut *tx).await.map_err(Self::sql_err)?;
1326        }
1327
1328        tx.commit().await.map_err(Self::sql_err)?;
1329        Ok(())
1330    }
1331}
1332
1333#[cfg(feature = "postgres")]
1334#[async_trait]
1335impl ThreadReader for PostgresStore {
1336    async fn load(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
1337        self.ensure_thread_schema_ready().await?;
1338
1339        let sql = format!("SELECT data FROM {} WHERE id = $1", self.table);
1340        let row: Option<(serde_json::Value,)> = sqlx::query_as(&sql)
1341            .bind(thread_id)
1342            .fetch_optional(&self.pool)
1343            .await
1344            .map_err(Self::sql_err)?;
1345
1346        let Some((mut v,)) = row else {
1347            return Ok(None);
1348        };
1349
1350        let version = v.get("_version").and_then(|v| v.as_u64()).unwrap_or(0);
1351
1352        let msg_sql = format!(
1353            "SELECT data FROM {} WHERE session_id = $1 ORDER BY seq",
1354            self.messages_table
1355        );
1356        let msg_rows: Vec<(serde_json::Value,)> = sqlx::query_as(&msg_sql)
1357            .bind(thread_id)
1358            .fetch_all(&self.pool)
1359            .await
1360            .map_err(Self::sql_err)?;
1361
1362        let messages: Vec<serde_json::Value> = msg_rows.into_iter().map(|(d,)| d).collect();
1363        if let Some(obj) = v.as_object_mut() {
1364            obj.insert("messages".to_string(), serde_json::Value::Array(messages));
1365            obj.remove("_version");
1366        }
1367
1368        let thread: Thread = serde_json::from_value(v)
1369            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
1370        Ok(Some(ThreadHead { thread, version }))
1371    }
1372
1373    async fn load_messages(
1374        &self,
1375        thread_id: &str,
1376        query: &MessageQuery,
1377    ) -> Result<MessagePage, ThreadStoreError> {
1378        self.ensure_thread_schema_ready().await?;
1379
1380        // Check session exists.
1381        let exists_sql = format!("SELECT 1 FROM {} WHERE id = $1", self.table);
1382        let exists: Option<(i32,)> = sqlx::query_as(&exists_sql)
1383            .bind(thread_id)
1384            .fetch_optional(&self.pool)
1385            .await
1386            .map_err(Self::sql_err)?;
1387        if exists.is_none() {
1388            return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1389        }
1390
1391        let limit = query.limit.clamp(1, 200);
1392        // Fetch limit+1 rows to determine has_more.
1393        let fetch_limit = (limit + 1) as i64;
1394
1395        // Visibility filter on JSONB data.
1396        let vis_clause = match query.visibility {
1397            Some(Visibility::All) => {
1398                " AND COALESCE(data->>'visibility', 'all') = 'all'".to_string()
1399            }
1400            Some(Visibility::Internal) => " AND data->>'visibility' = 'internal'".to_string(),
1401            None => String::new(),
1402        };
1403
1404        // Run ID filter on the run_id column.
1405        let run_clause = if query.run_id.is_some() {
1406            " AND run_id = $4"
1407        } else {
1408            ""
1409        };
1410
1411        let extra_param_idx = if query.run_id.is_some() { 5 } else { 4 };
1412
1413        let (sql, cursor_val) = match query.order {
1414            SortOrder::Asc => {
1415                let cursor = query.after.unwrap_or(-1);
1416                let before_clause = if query.before.is_some() {
1417                    format!("AND seq < ${extra_param_idx}")
1418                } else {
1419                    String::new()
1420                };
1421                let sql = format!(
1422                    "SELECT seq, data FROM {} WHERE session_id = $1 AND seq > $2{}{} {} ORDER BY seq ASC LIMIT $3",
1423                    self.messages_table, vis_clause, run_clause, before_clause,
1424                );
1425                (sql, cursor)
1426            }
1427            SortOrder::Desc => {
1428                let cursor = query.before.unwrap_or(i64::MAX);
1429                let after_clause = if query.after.is_some() {
1430                    format!("AND seq > ${extra_param_idx}")
1431                } else {
1432                    String::new()
1433                };
1434                let sql = format!(
1435                    "SELECT seq, data FROM {} WHERE session_id = $1 AND seq < $2{}{} {} ORDER BY seq DESC LIMIT $3",
1436                    self.messages_table, vis_clause, run_clause, after_clause,
1437                );
1438                (sql, cursor)
1439            }
1440        };
1441
1442        let rows: Vec<(i64, serde_json::Value)> = match query.order {
1443            SortOrder::Asc => {
1444                let mut q = sqlx::query_as(&sql)
1445                    .bind(thread_id)
1446                    .bind(cursor_val)
1447                    .bind(fetch_limit);
1448                if let Some(ref rid) = query.run_id {
1449                    q = q.bind(rid);
1450                }
1451                if let Some(before) = query.before {
1452                    q = q.bind(before);
1453                }
1454                q.fetch_all(&self.pool).await.map_err(Self::sql_err)?
1455            }
1456            SortOrder::Desc => {
1457                let mut q = sqlx::query_as(&sql)
1458                    .bind(thread_id)
1459                    .bind(cursor_val)
1460                    .bind(fetch_limit);
1461                if let Some(ref rid) = query.run_id {
1462                    q = q.bind(rid);
1463                }
1464                if let Some(after) = query.after {
1465                    q = q.bind(after);
1466                }
1467                q.fetch_all(&self.pool).await.map_err(Self::sql_err)?
1468            }
1469        };
1470
1471        let has_more = rows.len() > limit;
1472        let limited: Vec<_> = rows.into_iter().take(limit).collect();
1473
1474        let messages: Vec<MessageWithCursor> = limited
1475            .into_iter()
1476            .map(
1477                |(seq, data)| -> Result<MessageWithCursor, ThreadStoreError> {
1478                    let message: Message = serde_json::from_value(data).map_err(|e| {
1479                        ThreadStoreError::Serialization(format!(
1480                        "failed to deserialize message row (thread_id={thread_id}, seq={seq}): {e}"
1481                    ))
1482                    })?;
1483                    Ok(MessageWithCursor {
1484                        cursor: seq,
1485                        message,
1486                    })
1487                },
1488            )
1489            .collect::<Result<Vec<_>, _>>()?;
1490
1491        Ok(MessagePage {
1492            next_cursor: messages.last().map(|m| m.cursor),
1493            prev_cursor: messages.first().map(|m| m.cursor),
1494            messages,
1495            has_more,
1496        })
1497    }
1498
1499    async fn message_count(&self, thread_id: &str) -> Result<usize, ThreadStoreError> {
1500        self.ensure_thread_schema_ready().await?;
1501
1502        // Check session exists.
1503        let exists_sql = format!("SELECT 1 FROM {} WHERE id = $1", self.table);
1504        let exists: Option<(i32,)> = sqlx::query_as(&exists_sql)
1505            .bind(thread_id)
1506            .fetch_optional(&self.pool)
1507            .await
1508            .map_err(Self::sql_err)?;
1509        if exists.is_none() {
1510            return Err(ThreadStoreError::NotFound(thread_id.to_string()));
1511        }
1512
1513        let sql = format!(
1514            "SELECT COUNT(*)::bigint FROM {} WHERE session_id = $1",
1515            self.messages_table
1516        );
1517        let row: (i64,) = sqlx::query_as(&sql)
1518            .bind(thread_id)
1519            .fetch_one(&self.pool)
1520            .await
1521            .map_err(Self::sql_err)?;
1522        Ok(row.0 as usize)
1523    }
1524
1525    async fn list_threads(
1526        &self,
1527        query: &ThreadListQuery,
1528    ) -> Result<ThreadListPage, ThreadStoreError> {
1529        self.ensure_thread_schema_ready().await?;
1530
1531        let limit = query.limit.clamp(1, 200);
1532        let fetch_limit = (limit + 1) as i64;
1533        let offset = query.offset as i64;
1534
1535        let mut count_filters = Vec::new();
1536        let mut data_filters = Vec::new();
1537        if query.resource_id.is_some() {
1538            count_filters.push("data->>'resource_id' = $1".to_string());
1539            data_filters.push("data->>'resource_id' = $3".to_string());
1540        }
1541        if query.parent_thread_id.is_some() {
1542            let idx = if query.resource_id.is_some() { 2 } else { 1 };
1543            count_filters.push(format!("data->>'parent_thread_id' = ${idx}"));
1544            let data_idx = if query.resource_id.is_some() { 4 } else { 3 };
1545            data_filters.push(format!("data->>'parent_thread_id' = ${data_idx}"));
1546        }
1547
1548        let where_count = if count_filters.is_empty() {
1549            String::new()
1550        } else {
1551            format!(" WHERE {}", count_filters.join(" AND "))
1552        };
1553
1554        let count_sql = format!("SELECT COUNT(*)::bigint FROM {}{}", self.table, where_count);
1555        let where_data = if data_filters.is_empty() {
1556            String::new()
1557        } else {
1558            format!(" WHERE {}", data_filters.join(" AND "))
1559        };
1560        let data_sql = format!(
1561            "SELECT id FROM {}{} ORDER BY id LIMIT $1 OFFSET $2",
1562            self.table, where_data
1563        );
1564
1565        let mut count_q = sqlx::query_scalar::<_, i64>(&count_sql);
1566        if let Some(ref rid) = query.resource_id {
1567            count_q = count_q.bind(rid);
1568        }
1569        if let Some(ref pid) = query.parent_thread_id {
1570            count_q = count_q.bind(pid);
1571        }
1572        let total = count_q.fetch_one(&self.pool).await.map_err(Self::sql_err)?;
1573
1574        let mut data_q = sqlx::query_scalar::<_, String>(&data_sql)
1575            .bind(fetch_limit)
1576            .bind(offset);
1577        if let Some(ref rid) = query.resource_id {
1578            data_q = data_q.bind(rid);
1579        }
1580        if let Some(ref pid) = query.parent_thread_id {
1581            data_q = data_q.bind(pid);
1582        }
1583        let rows: Vec<String> = data_q.fetch_all(&self.pool).await.map_err(Self::sql_err)?;
1584
1585        let has_more = rows.len() > limit;
1586        let items = rows.into_iter().take(limit).collect();
1587
1588        Ok(ThreadListPage {
1589            items,
1590            total: total as usize,
1591            has_more,
1592        })
1593    }
1594
1595    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
1596        <Self as RunReader>::load_run(self, run_id)
1597            .await
1598            .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1599    }
1600
1601    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, ThreadStoreError> {
1602        <Self as RunReader>::list_runs(self, query)
1603            .await
1604            .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1605    }
1606
1607    async fn active_run_for_thread(
1608        &self,
1609        thread_id: &str,
1610    ) -> Result<Option<RunRecord>, ThreadStoreError> {
1611        <Self as RunReader>::load_current_run(self, thread_id)
1612            .await
1613            .map_err(|e| ThreadStoreError::Io(std::io::Error::other(e.to_string())))
1614    }
1615}
1616
1617#[cfg(feature = "postgres")]
1618type RunRowTuple = (
1619    String,
1620    String,
1621    String,
1622    Option<String>,
1623    Option<String>,
1624    String,
1625    String,
1626    Option<String>,
1627    Option<String>,
1628    i64,
1629    i64,
1630    Option<String>,
1631    Option<serde_json::Value>,
1632);
1633
1634#[cfg(feature = "postgres")]
1635struct MailboxRow {
1636    entry_id: String,
1637    mailbox_id: String,
1638    origin: String,
1639    sender_id: Option<String>,
1640    payload: serde_json::Value,
1641    priority: i16,
1642    dedupe_key: Option<String>,
1643    generation: i64,
1644    status: String,
1645    available_at: i64,
1646    attempt_count: i32,
1647    last_error: Option<String>,
1648    claim_token: Option<String>,
1649    claimed_by: Option<String>,
1650    lease_until: Option<i64>,
1651    created_at: i64,
1652    updated_at: i64,
1653}
1654
1655#[cfg(feature = "postgres")]
1656impl<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> for MailboxRow {
1657    fn from_row(row: &'r sqlx::postgres::PgRow) -> Result<Self, sqlx::Error> {
1658        use sqlx::Row;
1659
1660        Ok(Self {
1661            entry_id: row.try_get("entry_id")?,
1662            mailbox_id: row.try_get("mailbox_id")?,
1663            origin: row.try_get("origin")?,
1664            sender_id: row.try_get("sender_id")?,
1665            payload: row.try_get("payload")?,
1666            priority: row.try_get("priority")?,
1667            dedupe_key: row.try_get("dedupe_key")?,
1668            generation: row.try_get("generation")?,
1669            status: row.try_get("status")?,
1670            available_at: row.try_get("available_at")?,
1671            attempt_count: row.try_get("attempt_count")?,
1672            last_error: row.try_get("last_error")?,
1673            claim_token: row.try_get("claim_token")?,
1674            claimed_by: row.try_get("claimed_by")?,
1675            lease_until: row.try_get("lease_until")?,
1676            created_at: row.try_get("created_at")?,
1677            updated_at: row.try_get("updated_at")?,
1678        })
1679    }
1680}
1681
1682#[cfg(feature = "postgres")]
1683impl PostgresStore {
1684    fn run_from_row(row: RunRowTuple) -> Result<RunRecord, RunStoreError> {
1685        let (
1686            run_id,
1687            thread_id,
1688            agent_id,
1689            parent_run_id,
1690            parent_thread_id,
1691            origin,
1692            status,
1693            termination_code,
1694            termination_detail,
1695            created_at,
1696            updated_at,
1697            source_mailbox_entry_id,
1698            metadata,
1699        ) = row;
1700        Ok(RunRecord {
1701            run_id,
1702            thread_id,
1703            agent_id,
1704            parent_run_id,
1705            parent_thread_id,
1706            origin: Self::decode_origin(&origin)?,
1707            status: Self::decode_status(&status)?,
1708            termination_code,
1709            termination_detail,
1710            created_at: Self::from_db_timestamp(created_at, "created_at")?,
1711            updated_at: Self::from_db_timestamp(updated_at, "updated_at")?,
1712            source_mailbox_entry_id,
1713            metadata,
1714        })
1715    }
1716
1717    fn mailbox_entry_from_row(row: MailboxRow) -> Result<MailboxEntry, MailboxStoreError> {
1718        Ok(MailboxEntry {
1719            entry_id: row.entry_id,
1720            mailbox_id: row.mailbox_id,
1721            origin: Self::decode_mailbox_origin(&row.origin)?,
1722            sender_id: row.sender_id,
1723            payload: row.payload,
1724            priority: u8::try_from(row.priority).unwrap_or(0),
1725            dedupe_key: row.dedupe_key,
1726            generation: u64::try_from(row.generation).map_err(|_| {
1727                MailboxStoreError::Backend(format!(
1728                    "generation cannot be negative in postgres BIGINT: {}",
1729                    row.generation
1730                ))
1731            })?,
1732            status: Self::decode_mailbox_status(&row.status)?,
1733            available_at: Self::from_db_timestamp(row.available_at, "available_at")
1734                .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1735            attempt_count: u32::try_from(row.attempt_count).map_err(|_| {
1736                MailboxStoreError::Backend(format!(
1737                    "attempt_count cannot be negative in postgres INTEGER: {}",
1738                    row.attempt_count
1739                ))
1740            })?,
1741            last_error: row.last_error,
1742            claim_token: row.claim_token,
1743            claimed_by: row.claimed_by,
1744            lease_until: row
1745                .lease_until
1746                .map(|value| {
1747                    Self::from_db_timestamp(value, "lease_until")
1748                        .map_err(|e| MailboxStoreError::Backend(e.to_string()))
1749                })
1750                .transpose()?,
1751            created_at: Self::from_db_timestamp(row.created_at, "created_at")
1752                .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1753            updated_at: Self::from_db_timestamp(row.updated_at, "updated_at")
1754                .map_err(|e| MailboxStoreError::Backend(e.to_string()))?,
1755        })
1756    }
1757}
1758
1759#[cfg(feature = "postgres")]
1760#[async_trait]
1761impl RunReader for PostgresStore {
1762    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
1763        self.ensure_run_schema_ready().await?;
1764
1765        let sql = format!(
1766            "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata FROM {} WHERE run_id = $1",
1767            self.runs_table
1768        );
1769        let row = sqlx::query_as::<_, RunRowTuple>(&sql)
1770            .bind(run_id)
1771            .fetch_optional(&self.pool)
1772            .await
1773            .map_err(Self::run_sql_err)?;
1774        row.map(Self::run_from_row).transpose()
1775    }
1776
1777    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, RunStoreError> {
1778        self.ensure_run_schema_ready().await?;
1779
1780        let limit = query.limit.clamp(1, 200);
1781        let fetch_limit = (limit + 1) as i64;
1782        let offset = i64::try_from(query.offset)
1783            .map_err(|_| RunStoreError::Backend("offset is too large".to_string()))?;
1784
1785        let mut count_qb = QueryBuilder::<Postgres>::new(format!(
1786            "SELECT COUNT(*)::bigint FROM {}",
1787            self.runs_table
1788        ));
1789        let mut has_where = false;
1790        if let Some(thread_id) = query.thread_id.as_deref() {
1791            count_qb.push(if has_where { " AND " } else { " WHERE " });
1792            has_where = true;
1793            count_qb.push("thread_id = ").push_bind(thread_id);
1794        }
1795        if let Some(parent_run_id) = query.parent_run_id.as_deref() {
1796            count_qb.push(if has_where { " AND " } else { " WHERE " });
1797            has_where = true;
1798            count_qb.push("parent_run_id = ").push_bind(parent_run_id);
1799        }
1800        if let Some(status) = query.status {
1801            count_qb.push(if has_where { " AND " } else { " WHERE " });
1802            has_where = true;
1803            count_qb
1804                .push("status = ")
1805                .push_bind(Self::encode_status(status));
1806        }
1807        if let Some(termination_code) = query.termination_code.as_deref() {
1808            count_qb.push(if has_where { " AND " } else { " WHERE " });
1809            has_where = true;
1810            count_qb
1811                .push("termination_code = ")
1812                .push_bind(termination_code);
1813        }
1814        if let Some(origin) = query.origin {
1815            count_qb.push(if has_where { " AND " } else { " WHERE " });
1816            has_where = true;
1817            count_qb
1818                .push("origin = ")
1819                .push_bind(Self::encode_origin(origin));
1820        }
1821        if let Some(created_at_from) = query.created_at_from {
1822            let created_at_from = Self::to_db_timestamp(created_at_from, "created_at_from")?;
1823            count_qb.push(if has_where { " AND " } else { " WHERE " });
1824            has_where = true;
1825            count_qb.push("created_at >= ").push_bind(created_at_from);
1826        }
1827        if let Some(created_at_to) = query.created_at_to {
1828            let created_at_to = Self::to_db_timestamp(created_at_to, "created_at_to")?;
1829            count_qb.push(if has_where { " AND " } else { " WHERE " });
1830            has_where = true;
1831            count_qb.push("created_at <= ").push_bind(created_at_to);
1832        }
1833        if let Some(updated_at_from) = query.updated_at_from {
1834            let updated_at_from = Self::to_db_timestamp(updated_at_from, "updated_at_from")?;
1835            count_qb.push(if has_where { " AND " } else { " WHERE " });
1836            has_where = true;
1837            count_qb.push("updated_at >= ").push_bind(updated_at_from);
1838        }
1839        if let Some(updated_at_to) = query.updated_at_to {
1840            let updated_at_to = Self::to_db_timestamp(updated_at_to, "updated_at_to")?;
1841            count_qb.push(if has_where { " AND " } else { " WHERE " });
1842            count_qb.push("updated_at <= ").push_bind(updated_at_to);
1843        }
1844        let total: i64 = count_qb
1845            .build_query_scalar()
1846            .fetch_one(&self.pool)
1847            .await
1848            .map_err(Self::run_sql_err)?;
1849
1850        let mut data_qb = QueryBuilder::<Postgres>::new(format!(
1851            "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata FROM {}",
1852            self.runs_table
1853        ));
1854        let mut has_where = false;
1855        if let Some(thread_id) = query.thread_id.as_deref() {
1856            data_qb.push(if has_where { " AND " } else { " WHERE " });
1857            has_where = true;
1858            data_qb.push("thread_id = ").push_bind(thread_id);
1859        }
1860        if let Some(parent_run_id) = query.parent_run_id.as_deref() {
1861            data_qb.push(if has_where { " AND " } else { " WHERE " });
1862            has_where = true;
1863            data_qb.push("parent_run_id = ").push_bind(parent_run_id);
1864        }
1865        if let Some(status) = query.status {
1866            data_qb.push(if has_where { " AND " } else { " WHERE " });
1867            has_where = true;
1868            data_qb
1869                .push("status = ")
1870                .push_bind(Self::encode_status(status));
1871        }
1872        if let Some(termination_code) = query.termination_code.as_deref() {
1873            data_qb.push(if has_where { " AND " } else { " WHERE " });
1874            has_where = true;
1875            data_qb
1876                .push("termination_code = ")
1877                .push_bind(termination_code);
1878        }
1879        if let Some(origin) = query.origin {
1880            data_qb.push(if has_where { " AND " } else { " WHERE " });
1881            has_where = true;
1882            data_qb
1883                .push("origin = ")
1884                .push_bind(Self::encode_origin(origin));
1885        }
1886        if let Some(created_at_from) = query.created_at_from {
1887            let created_at_from = Self::to_db_timestamp(created_at_from, "created_at_from")?;
1888            data_qb.push(if has_where { " AND " } else { " WHERE " });
1889            has_where = true;
1890            data_qb.push("created_at >= ").push_bind(created_at_from);
1891        }
1892        if let Some(created_at_to) = query.created_at_to {
1893            let created_at_to = Self::to_db_timestamp(created_at_to, "created_at_to")?;
1894            data_qb.push(if has_where { " AND " } else { " WHERE " });
1895            has_where = true;
1896            data_qb.push("created_at <= ").push_bind(created_at_to);
1897        }
1898        if let Some(updated_at_from) = query.updated_at_from {
1899            let updated_at_from = Self::to_db_timestamp(updated_at_from, "updated_at_from")?;
1900            data_qb.push(if has_where { " AND " } else { " WHERE " });
1901            has_where = true;
1902            data_qb.push("updated_at >= ").push_bind(updated_at_from);
1903        }
1904        if let Some(updated_at_to) = query.updated_at_to {
1905            let updated_at_to = Self::to_db_timestamp(updated_at_to, "updated_at_to")?;
1906            data_qb.push(if has_where { " AND " } else { " WHERE " });
1907            data_qb.push("updated_at <= ").push_bind(updated_at_to);
1908        }
1909        data_qb
1910            .push(" ORDER BY created_at ASC, run_id ASC LIMIT ")
1911            .push_bind(fetch_limit)
1912            .push(" OFFSET ")
1913            .push_bind(offset);
1914
1915        let rows: Vec<RunRowTuple> = data_qb
1916            .build_query_as()
1917            .fetch_all(&self.pool)
1918            .await
1919            .map_err(Self::run_sql_err)?;
1920        let has_more = rows.len() > limit;
1921        let items = rows
1922            .into_iter()
1923            .take(limit)
1924            .map(Self::run_from_row)
1925            .collect::<Result<Vec<_>, _>>()?;
1926
1927        Ok(RunPage {
1928            items,
1929            total: usize::try_from(total)
1930                .map_err(|_| RunStoreError::Backend("total is negative".to_string()))?,
1931            has_more,
1932        })
1933    }
1934
1935    async fn resolve_thread_id(&self, run_id: &str) -> Result<Option<String>, RunStoreError> {
1936        self.ensure_run_schema_ready().await?;
1937
1938        let sql = format!(
1939            "SELECT thread_id FROM {} WHERE run_id = $1",
1940            self.runs_table
1941        );
1942        sqlx::query_scalar::<_, String>(&sql)
1943            .bind(run_id)
1944            .fetch_optional(&self.pool)
1945            .await
1946            .map_err(Self::run_sql_err)
1947    }
1948
1949    async fn load_current_run(&self, thread_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
1950        self.ensure_run_schema_ready().await?;
1951
1952        let sql = format!(
1953            "SELECT run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, \
1954             termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata \
1955             FROM {} WHERE thread_id = $1 AND status != $2 \
1956             ORDER BY created_at DESC, updated_at DESC, run_id DESC LIMIT 1",
1957            self.runs_table
1958        );
1959        let row = sqlx::query_as::<_, RunRowTuple>(&sql)
1960            .bind(thread_id)
1961            .bind(Self::encode_status(RunStatus::Done))
1962            .fetch_optional(&self.pool)
1963            .await
1964            .map_err(Self::run_sql_err)?;
1965        row.map(Self::run_from_row).transpose()
1966    }
1967}
1968
1969#[cfg(feature = "postgres")]
1970#[async_trait]
1971impl RunWriter for PostgresStore {
1972    async fn upsert_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
1973        self.ensure_run_schema_ready().await?;
1974
1975        let created_at = Self::to_db_timestamp(record.created_at, "created_at")?;
1976        let updated_at = Self::to_db_timestamp(record.updated_at, "updated_at")?;
1977        let sql = format!(
1978            "INSERT INTO {} (run_id, thread_id, agent_id, parent_run_id, parent_thread_id, origin, status, \
1979             termination_code, termination_detail, created_at, updated_at, source_mailbox_entry_id, metadata) \
1980             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) \
1981             ON CONFLICT (run_id) DO UPDATE SET thread_id = EXCLUDED.thread_id, agent_id = EXCLUDED.agent_id, \
1982             parent_run_id = EXCLUDED.parent_run_id, parent_thread_id = EXCLUDED.parent_thread_id, \
1983             origin = EXCLUDED.origin, status = EXCLUDED.status, termination_code = EXCLUDED.termination_code, \
1984             termination_detail = EXCLUDED.termination_detail, created_at = EXCLUDED.created_at, \
1985             updated_at = EXCLUDED.updated_at, source_mailbox_entry_id = EXCLUDED.source_mailbox_entry_id, \
1986             metadata = EXCLUDED.metadata",
1987            self.runs_table
1988        );
1989        sqlx::query(&sql)
1990            .bind(&record.run_id)
1991            .bind(&record.thread_id)
1992            .bind(&record.agent_id)
1993            .bind(record.parent_run_id.as_deref())
1994            .bind(record.parent_thread_id.as_deref())
1995            .bind(Self::encode_origin(record.origin))
1996            .bind(Self::encode_status(record.status))
1997            .bind(record.termination_code.as_deref())
1998            .bind(record.termination_detail.as_deref())
1999            .bind(created_at)
2000            .bind(updated_at)
2001            .bind(record.source_mailbox_entry_id.as_deref())
2002            .bind(&record.metadata)
2003            .execute(&self.pool)
2004            .await
2005            .map_err(Self::run_sql_err)?;
2006        Ok(())
2007    }
2008
2009    async fn delete_run(&self, run_id: &str) -> Result<(), RunStoreError> {
2010        self.ensure_run_schema_ready().await?;
2011
2012        let sql = format!("DELETE FROM {} WHERE run_id = $1", self.runs_table);
2013        sqlx::query(&sql)
2014            .bind(run_id)
2015            .execute(&self.pool)
2016            .await
2017            .map_err(Self::run_sql_err)?;
2018        Ok(())
2019    }
2020}