tirea_store_adapters/
file_store.rs

1use crate::file_utils;
2use async_trait::async_trait;
3use serde::Deserialize;
4use std::path::PathBuf;
5use std::time::{SystemTime, UNIX_EPOCH};
6use tirea_contract::storage::{
7    has_active_claim_for_mailbox, paginate_mailbox_entries, paginate_runs_in_memory, Committed,
8    MailboxEntry, MailboxInterrupt, MailboxPage, MailboxQuery, MailboxReader, MailboxState,
9    MailboxStoreError, MailboxWriter, RunPage, RunQuery, RunRecord, ThreadHead, ThreadListPage,
10    ThreadListQuery, ThreadReader, ThreadStoreError, ThreadWriter, VersionPrecondition,
11};
12use tirea_contract::{Thread, ThreadChangeSet, Version};
13
14fn now_millis() -> u64 {
15    SystemTime::now()
16        .duration_since(UNIX_EPOCH)
17        .unwrap_or_default()
18        .as_millis() as u64
19}
20
21pub struct FileStore {
22    base_path: PathBuf,
23}
24
25impl FileStore {
26    /// Create a new file storage with the given base path.
27    pub fn new(base_path: impl Into<PathBuf>) -> Self {
28        Self {
29            base_path: base_path.into(),
30        }
31    }
32
33    pub(super) fn thread_path(&self, thread_id: &str) -> Result<PathBuf, ThreadStoreError> {
34        Self::validate_thread_id(thread_id)?;
35        Ok(self.base_path.join(format!("{}.json", thread_id)))
36    }
37
38    fn validate_thread_id(thread_id: &str) -> Result<(), ThreadStoreError> {
39        file_utils::validate_fs_id(thread_id, "thread id").map_err(ThreadStoreError::InvalidId)
40    }
41
42    fn mailbox_dir(&self) -> PathBuf {
43        self.base_path.join("_mailbox")
44    }
45
46    fn mailbox_threads_dir(&self) -> PathBuf {
47        self.base_path.join("_mailbox_threads")
48    }
49
50    fn mailbox_path(&self, entry_id: &str) -> Result<PathBuf, MailboxStoreError> {
51        file_utils::validate_fs_id(entry_id, "mailbox entry id")
52            .map_err(MailboxStoreError::Backend)?;
53        Ok(self.mailbox_dir().join(format!("{entry_id}.json")))
54    }
55
56    fn mailbox_state_path(&self, mailbox_id: &str) -> Result<PathBuf, MailboxStoreError> {
57        file_utils::validate_fs_id(mailbox_id, "mailbox id").map_err(MailboxStoreError::Backend)?;
58        Ok(self
59            .mailbox_threads_dir()
60            .join(format!("{mailbox_id}.json")))
61    }
62
63    async fn save_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
64        let payload = serde_json::to_string_pretty(entry)
65            .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
66        let filename = format!("{}.json", entry.entry_id);
67        file_utils::atomic_json_write(&self.mailbox_dir(), &filename, &payload)
68            .await
69            .map_err(MailboxStoreError::Io)
70    }
71
72    async fn save_mailbox_state(&self, state: &MailboxState) -> Result<(), MailboxStoreError> {
73        let payload = serde_json::to_string_pretty(state)
74            .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
75        let filename = format!("{}.json", state.mailbox_id);
76        file_utils::atomic_json_write(&self.mailbox_threads_dir(), &filename, &payload)
77            .await
78            .map_err(MailboxStoreError::Io)
79    }
80
81    async fn load_mailbox_state_inner(
82        &self,
83        mailbox_id: &str,
84    ) -> Result<Option<MailboxState>, MailboxStoreError> {
85        let path = self.mailbox_state_path(mailbox_id)?;
86        if !path.exists() {
87            return Ok(None);
88        }
89        let content = tokio::fs::read_to_string(path)
90            .await
91            .map_err(MailboxStoreError::Io)?;
92        let state = serde_json::from_str(&content)
93            .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
94        Ok(Some(state))
95    }
96
97    async fn load_all_mailbox_entries(&self) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
98        let dir = self.mailbox_dir();
99        if !dir.exists() {
100            return Ok(Vec::new());
101        }
102        let mut entries = tokio::fs::read_dir(&dir)
103            .await
104            .map_err(MailboxStoreError::Io)?;
105        let mut mailbox_entries = Vec::new();
106        while let Some(entry) = entries.next_entry().await.map_err(MailboxStoreError::Io)? {
107            let path = entry.path();
108            if path.extension().is_none_or(|ext| ext != "json") {
109                continue;
110            }
111            let content = tokio::fs::read_to_string(path)
112                .await
113                .map_err(MailboxStoreError::Io)?;
114            let mailbox_entry: MailboxEntry = serde_json::from_str(&content)
115                .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
116            mailbox_entries.push(mailbox_entry);
117        }
118        Ok(mailbox_entries)
119    }
120}
121
122#[async_trait]
123impl MailboxReader for FileStore {
124    async fn load_mailbox_entry(
125        &self,
126        entry_id: &str,
127    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
128        let path = self.mailbox_path(entry_id)?;
129        if !path.exists() {
130            return Ok(None);
131        }
132        let content = tokio::fs::read_to_string(path)
133            .await
134            .map_err(MailboxStoreError::Io)?;
135        let entry: MailboxEntry = serde_json::from_str(&content)
136            .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
137        Ok(Some(entry))
138    }
139
140    async fn load_mailbox_state(
141        &self,
142        mailbox_id: &str,
143    ) -> Result<Option<MailboxState>, MailboxStoreError> {
144        self.load_mailbox_state_inner(mailbox_id).await
145    }
146
147    async fn list_mailbox_entries(
148        &self,
149        query: &MailboxQuery,
150    ) -> Result<MailboxPage, MailboxStoreError> {
151        let entries = self.load_all_mailbox_entries().await?;
152        Ok(paginate_mailbox_entries(&entries, query))
153    }
154}
155
156#[async_trait]
157impl MailboxWriter for FileStore {
158    async fn enqueue_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
159        let path = self.mailbox_path(&entry.entry_id)?;
160        if path.exists() {
161            return Err(MailboxStoreError::AlreadyExists(entry.entry_id.clone()));
162        }
163        let mailbox_state = self
164            .load_mailbox_state_inner(&entry.mailbox_id)
165            .await?
166            .unwrap_or(MailboxState {
167                mailbox_id: entry.mailbox_id.clone(),
168                current_generation: entry.generation,
169                updated_at: entry.updated_at,
170            });
171        if mailbox_state.current_generation != entry.generation {
172            return Err(MailboxStoreError::GenerationMismatch {
173                mailbox_id: entry.mailbox_id.clone(),
174                expected: mailbox_state.current_generation,
175                actual: entry.generation,
176            });
177        }
178        if let Some(dedupe_key) = entry.dedupe_key.as_deref() {
179            let existing = self.load_all_mailbox_entries().await?;
180            if existing.iter().any(|current| {
181                current.mailbox_id == entry.mailbox_id
182                    && current.dedupe_key.as_deref() == Some(dedupe_key)
183            }) {
184                return Err(MailboxStoreError::AlreadyExists(dedupe_key.to_string()));
185            }
186        }
187        self.save_mailbox_state(&mailbox_state).await?;
188        self.save_mailbox_entry(entry).await
189    }
190
191    async fn ensure_mailbox_state(
192        &self,
193        mailbox_id: &str,
194        now: u64,
195    ) -> Result<MailboxState, MailboxStoreError> {
196        let mut state = self
197            .load_mailbox_state_inner(mailbox_id)
198            .await?
199            .unwrap_or(MailboxState {
200                mailbox_id: mailbox_id.to_string(),
201                current_generation: 0,
202                updated_at: now,
203            });
204        state.updated_at = now;
205        self.save_mailbox_state(&state).await?;
206        Ok(state)
207    }
208
209    async fn claim_mailbox_entries(
210        &self,
211        mailbox_id: Option<&str>,
212        limit: usize,
213        consumer_id: &str,
214        now: u64,
215        lease_duration_ms: u64,
216    ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
217        let all_entries = self.load_all_mailbox_entries().await?;
218        let mut candidates: Vec<MailboxEntry> = all_entries
219            .iter()
220            .filter(|entry| entry.is_claimable(now))
221            .filter(|entry| mailbox_id.is_none_or(|id| entry.mailbox_id == id))
222            .cloned()
223            .collect();
224        candidates.sort_by(|left, right| {
225            right
226                .priority
227                .cmp(&left.priority)
228                .then_with(|| left.available_at.cmp(&right.available_at))
229                .then_with(|| left.created_at.cmp(&right.created_at))
230                .then_with(|| left.entry_id.cmp(&right.entry_id))
231        });
232
233        // Track which mailbox IDs we've already claimed in this batch.
234        let mut claimed_mailbox_ids = std::collections::HashSet::new();
235
236        let mut claimed = Vec::new();
237        for mut entry in candidates {
238            if claimed.len() >= limit {
239                break;
240            }
241
242            // Mailbox-level exclusive claim: skip if this mailbox already has
243            // an active (non-expired) claim.
244            if claimed_mailbox_ids.contains(&entry.mailbox_id)
245                || has_active_claim_for_mailbox(
246                    all_entries.iter(),
247                    &entry.mailbox_id,
248                    now,
249                    Some(&entry.entry_id),
250                )
251            {
252                continue;
253            }
254
255            // Reconcile: supersede stale-generation entries that survived a
256            // partial interrupt (FileStore interrupt is not atomic).
257            if let Some(ts) = self.load_mailbox_state_inner(&entry.mailbox_id).await? {
258                if entry.generation < ts.current_generation {
259                    entry.status = tirea_contract::MailboxEntryStatus::Superseded;
260                    entry.last_error =
261                        Some("superseded by interrupt (reconciled on claim)".to_string());
262                    entry.claim_token = None;
263                    entry.claimed_by = None;
264                    entry.lease_until = None;
265                    entry.updated_at = now;
266                    self.save_mailbox_entry(&entry).await?;
267                    continue;
268                }
269            }
270
271            let mid = entry.mailbox_id.clone();
272            entry.status = tirea_contract::MailboxEntryStatus::Claimed;
273            entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
274            entry.claimed_by = Some(consumer_id.to_string());
275            entry.lease_until = Some(now.saturating_add(lease_duration_ms));
276            entry.attempt_count = entry.attempt_count.saturating_add(1);
277            entry.updated_at = now;
278            self.save_mailbox_entry(&entry).await?;
279            claimed.push(entry);
280            claimed_mailbox_ids.insert(mid);
281        }
282        Ok(claimed)
283    }
284
285    async fn claim_mailbox_entry(
286        &self,
287        entry_id: &str,
288        consumer_id: &str,
289        now: u64,
290        lease_duration_ms: u64,
291    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
292        let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
293            return Ok(None);
294        };
295        if entry.status.is_terminal() {
296            return Ok(None);
297        }
298        if entry.status == tirea_contract::MailboxEntryStatus::Claimed
299            && entry.lease_until.is_some_and(|lease| lease > now)
300        {
301            return Ok(None);
302        }
303
304        // Mailbox-level exclusive claim: reject if another entry in the same
305        // mailbox already holds an active lease.
306        let all_entries = self.load_all_mailbox_entries().await?;
307        if has_active_claim_for_mailbox(all_entries.iter(), &entry.mailbox_id, now, Some(entry_id))
308        {
309            return Ok(None);
310        }
311
312        entry.status = tirea_contract::MailboxEntryStatus::Claimed;
313        entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
314        entry.claimed_by = Some(consumer_id.to_string());
315        entry.lease_until = Some(now.saturating_add(lease_duration_ms));
316        entry.attempt_count = entry.attempt_count.saturating_add(1);
317        entry.updated_at = now;
318        self.save_mailbox_entry(&entry).await?;
319        Ok(Some(entry))
320    }
321
322    async fn ack_mailbox_entry(
323        &self,
324        entry_id: &str,
325        claim_token: &str,
326        now: u64,
327    ) -> Result<(), MailboxStoreError> {
328        let mut entry = self
329            .load_mailbox_entry(entry_id)
330            .await?
331            .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
332        if entry.claim_token.as_deref() != Some(claim_token) {
333            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
334        }
335        entry.status = tirea_contract::MailboxEntryStatus::Accepted;
336        entry.claim_token = None;
337        entry.claimed_by = None;
338        entry.lease_until = None;
339        entry.updated_at = now;
340        self.save_mailbox_entry(&entry).await
341    }
342
343    async fn nack_mailbox_entry(
344        &self,
345        entry_id: &str,
346        claim_token: &str,
347        retry_at: u64,
348        error: &str,
349        now: u64,
350    ) -> Result<(), MailboxStoreError> {
351        let mut entry = self
352            .load_mailbox_entry(entry_id)
353            .await?
354            .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
355        if entry.claim_token.as_deref() != Some(claim_token) {
356            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
357        }
358        entry.status = tirea_contract::MailboxEntryStatus::Queued;
359        entry.available_at = retry_at;
360        entry.last_error = Some(error.to_string());
361        entry.claim_token = None;
362        entry.claimed_by = None;
363        entry.lease_until = None;
364        entry.updated_at = now;
365        self.save_mailbox_entry(&entry).await
366    }
367
368    async fn dead_letter_mailbox_entry(
369        &self,
370        entry_id: &str,
371        claim_token: &str,
372        error: &str,
373        now: u64,
374    ) -> Result<(), MailboxStoreError> {
375        let mut entry = self
376            .load_mailbox_entry(entry_id)
377            .await?
378            .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
379        if entry.claim_token.as_deref() != Some(claim_token) {
380            return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
381        }
382        entry.status = tirea_contract::MailboxEntryStatus::DeadLetter;
383        entry.last_error = Some(error.to_string());
384        entry.claim_token = None;
385        entry.claimed_by = None;
386        entry.lease_until = None;
387        entry.updated_at = now;
388        self.save_mailbox_entry(&entry).await
389    }
390
391    async fn cancel_mailbox_entry(
392        &self,
393        entry_id: &str,
394        now: u64,
395    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
396        let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
397            return Ok(None);
398        };
399        if entry.status.is_terminal() {
400            return Ok(Some(entry));
401        }
402        entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
403        entry.last_error = Some("cancelled".to_string());
404        entry.claim_token = None;
405        entry.claimed_by = None;
406        entry.lease_until = None;
407        entry.updated_at = now;
408        self.save_mailbox_entry(&entry).await?;
409        Ok(Some(entry))
410    }
411
412    async fn supersede_mailbox_entry(
413        &self,
414        entry_id: &str,
415        now: u64,
416        reason: &str,
417    ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
418        let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
419            return Ok(None);
420        };
421        if entry.status.is_terminal() {
422            return Ok(Some(entry));
423        }
424        entry.status = tirea_contract::MailboxEntryStatus::Superseded;
425        entry.last_error = Some(reason.to_string());
426        entry.claim_token = None;
427        entry.claimed_by = None;
428        entry.lease_until = None;
429        entry.updated_at = now;
430        self.save_mailbox_entry(&entry).await?;
431        Ok(Some(entry))
432    }
433
434    async fn cancel_pending_for_mailbox(
435        &self,
436        mailbox_id: &str,
437        now: u64,
438        exclude_entry_id: Option<&str>,
439    ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
440        let entries = self.load_all_mailbox_entries().await?;
441        let mut cancelled = Vec::new();
442        for mut entry in entries {
443            if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
444                continue;
445            }
446            if exclude_entry_id.is_some_and(|eid| entry.entry_id == eid) {
447                continue;
448            }
449            entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
450            entry.last_error = Some("cancelled".to_string());
451            entry.claim_token = None;
452            entry.claimed_by = None;
453            entry.lease_until = None;
454            entry.updated_at = now;
455            self.save_mailbox_entry(&entry).await?;
456            cancelled.push(entry);
457        }
458        Ok(cancelled)
459    }
460
461    async fn interrupt_mailbox(
462        &self,
463        mailbox_id: &str,
464        now: u64,
465    ) -> Result<MailboxInterrupt, MailboxStoreError> {
466        let mut state = self
467            .load_mailbox_state_inner(mailbox_id)
468            .await?
469            .unwrap_or(MailboxState {
470                mailbox_id: mailbox_id.to_string(),
471                current_generation: 0,
472                updated_at: now,
473            });
474        state.current_generation = state.current_generation.saturating_add(1);
475        state.updated_at = now;
476        self.save_mailbox_state(&state).await?;
477
478        let entries = self.load_all_mailbox_entries().await?;
479        let mut superseded = Vec::new();
480        for mut entry in entries {
481            if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
482                continue;
483            }
484            if entry.generation >= state.current_generation {
485                continue;
486            }
487            entry.status = tirea_contract::MailboxEntryStatus::Superseded;
488            entry.last_error = Some("superseded by interrupt".to_string());
489            entry.claim_token = None;
490            entry.claimed_by = None;
491            entry.lease_until = None;
492            entry.updated_at = now;
493            self.save_mailbox_entry(&entry).await?;
494            superseded.push(entry);
495        }
496
497        Ok(MailboxInterrupt {
498            mailbox_state: state,
499            superseded_entries: superseded,
500        })
501    }
502
503    async fn extend_lease(
504        &self,
505        entry_id: &str,
506        claim_token: &str,
507        extension_ms: u64,
508        now: u64,
509    ) -> Result<bool, MailboxStoreError> {
510        let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
511            return Ok(false);
512        };
513        if entry.status != tirea_contract::MailboxEntryStatus::Claimed {
514            return Ok(false);
515        }
516        if entry.claim_token.as_deref() != Some(claim_token) {
517            return Ok(false);
518        }
519        entry.lease_until = Some(now.saturating_add(extension_ms));
520        entry.updated_at = now;
521        self.save_mailbox_entry(&entry).await?;
522        Ok(true)
523    }
524
525    async fn purge_terminal_mailbox_entries(
526        &self,
527        older_than: u64,
528    ) -> Result<usize, MailboxStoreError> {
529        let entries = self.load_all_mailbox_entries().await?;
530        let mut count = 0usize;
531        for entry in entries {
532            if entry.status.is_terminal() && entry.updated_at < older_than {
533                let path = self.mailbox_dir().join(format!("{}.json", entry.entry_id));
534                if path.exists() {
535                    tokio::fs::remove_file(&path)
536                        .await
537                        .map_err(MailboxStoreError::Io)?;
538                    count += 1;
539                }
540            }
541        }
542        Ok(count)
543    }
544}
545
546#[async_trait]
547impl ThreadWriter for FileStore {
548    async fn create(&self, thread: &Thread) -> Result<Committed, ThreadStoreError> {
549        let path = self.thread_path(&thread.id)?;
550        if path.exists() {
551            return Err(ThreadStoreError::AlreadyExists);
552        }
553        let mut thread = thread.clone();
554        let now = now_millis();
555        if thread.metadata.created_at.is_none() {
556            thread.metadata.created_at = Some(now);
557        }
558        thread.metadata.updated_at = Some(now);
559        let head = ThreadHead { thread, version: 0 };
560        self.save_head(&head).await?;
561        Ok(Committed { version: 0 })
562    }
563
564    async fn append(
565        &self,
566        thread_id: &str,
567        delta: &ThreadChangeSet,
568        precondition: VersionPrecondition,
569    ) -> Result<Committed, ThreadStoreError> {
570        let head = self
571            .load_head(thread_id)
572            .await?
573            .ok_or_else(|| ThreadStoreError::NotFound(thread_id.to_string()))?;
574
575        if let VersionPrecondition::Exact(expected) = precondition {
576            if head.version != expected {
577                return Err(ThreadStoreError::VersionConflict {
578                    expected,
579                    actual: head.version,
580                });
581            }
582        }
583
584        let mut thread = head.thread;
585        delta.apply_to(&mut thread);
586        thread.metadata.updated_at = Some(now_millis());
587        let new_version = head.version + 1;
588        let new_head = ThreadHead {
589            thread,
590            version: new_version,
591        };
592        self.save_head(&new_head).await?;
593        self.upsert_run_from_changeset(thread_id, delta).await?;
594        Ok(Committed {
595            version: new_version,
596        })
597    }
598
599    async fn delete(&self, thread_id: &str) -> Result<(), ThreadStoreError> {
600        let path = self.thread_path(thread_id)?;
601        if path.exists() {
602            tokio::fs::remove_file(&path).await?;
603        }
604        Ok(())
605    }
606
607    async fn save(&self, thread: &Thread) -> Result<(), ThreadStoreError> {
608        let next_version = self
609            .load_head(&thread.id)
610            .await?
611            .map_or(0, |head| head.version.saturating_add(1));
612        let mut thread = thread.clone();
613        let now = now_millis();
614        thread.metadata.updated_at = Some(now);
615        if thread.metadata.created_at.is_none() {
616            thread.metadata.created_at = Some(now);
617        }
618        let head = ThreadHead {
619            thread,
620            version: next_version,
621        };
622        self.save_head(&head).await
623    }
624}
625
626#[async_trait]
627impl ThreadReader for FileStore {
628    async fn load(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
629        self.load_head(thread_id).await
630    }
631
632    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
633        self.load_run_record(run_id).await
634    }
635
636    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, ThreadStoreError> {
637        let records = self.load_all_run_records().await?;
638        Ok(paginate_runs_in_memory(&records, query))
639    }
640
641    async fn active_run_for_thread(
642        &self,
643        thread_id: &str,
644    ) -> Result<Option<RunRecord>, ThreadStoreError> {
645        let records = self.load_all_run_records().await?;
646        Ok(records
647            .into_iter()
648            .filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
649            .max_by(|a, b| {
650                a.created_at
651                    .cmp(&b.created_at)
652                    .then_with(|| a.updated_at.cmp(&b.updated_at))
653                    .then_with(|| a.run_id.cmp(&b.run_id))
654            }))
655    }
656
657    async fn list_threads(
658        &self,
659        query: &ThreadListQuery,
660    ) -> Result<ThreadListPage, ThreadStoreError> {
661        let mut all = file_utils::scan_json_stems(&self.base_path).await?;
662
663        // Filter by resource_id if specified.
664        if let Some(ref resource_id) = query.resource_id {
665            let mut filtered = Vec::new();
666            for id in &all {
667                if let Some(head) = self.load(id).await? {
668                    if head.thread.resource_id.as_deref() == Some(resource_id.as_str()) {
669                        filtered.push(id.clone());
670                    }
671                }
672            }
673            all = filtered;
674        }
675
676        // Filter by parent_thread_id if specified.
677        if let Some(ref parent_thread_id) = query.parent_thread_id {
678            let mut filtered = Vec::new();
679            for id in &all {
680                if let Some(head) = self.load(id).await? {
681                    if head.thread.parent_thread_id.as_deref() == Some(parent_thread_id.as_str()) {
682                        filtered.push(id.clone());
683                    }
684                }
685            }
686            all = filtered;
687        }
688
689        all.sort();
690        let total = all.len();
691        let limit = query.limit.clamp(1, 200);
692        let offset = query.offset.min(total);
693        let end = (offset + limit + 1).min(total);
694        let slice = &all[offset..end];
695        let has_more = slice.len() > limit;
696        let items: Vec<String> = slice.iter().take(limit).cloned().collect();
697        Ok(ThreadListPage {
698            items,
699            total,
700            has_more,
701        })
702    }
703}
704
705impl FileStore {
706    fn runs_dir(&self) -> PathBuf {
707        self.base_path.join("_runs")
708    }
709
710    fn run_path(&self, run_id: &str) -> Result<PathBuf, ThreadStoreError> {
711        file_utils::validate_fs_id(run_id, "run id").map_err(ThreadStoreError::InvalidId)?;
712        Ok(self.runs_dir().join(format!("{run_id}.json")))
713    }
714
715    async fn save_run_record(&self, record: &RunRecord) -> Result<(), ThreadStoreError> {
716        let payload = serde_json::to_string_pretty(record)
717            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
718        let filename = format!("{}.json", record.run_id);
719        file_utils::atomic_json_write(&self.runs_dir(), &filename, &payload)
720            .await
721            .map_err(ThreadStoreError::Io)
722    }
723
724    async fn load_run_record(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
725        let path = self.run_path(run_id)?;
726        if !path.exists() {
727            return Ok(None);
728        }
729        let content = tokio::fs::read_to_string(path).await?;
730        let record: RunRecord = serde_json::from_str(&content)
731            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
732        Ok(Some(record))
733    }
734
735    async fn load_all_run_records(&self) -> Result<Vec<RunRecord>, ThreadStoreError> {
736        let dir = self.runs_dir();
737        if !dir.exists() {
738            return Ok(Vec::new());
739        }
740        let mut entries = tokio::fs::read_dir(&dir).await?;
741        let mut records = Vec::new();
742        while let Some(entry) = entries.next_entry().await? {
743            let path = entry.path();
744            if path.extension().is_none_or(|ext| ext != "json") {
745                continue;
746            }
747            let content = tokio::fs::read_to_string(path).await?;
748            let record: RunRecord = serde_json::from_str(&content)
749                .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
750            records.push(record);
751        }
752        Ok(records)
753    }
754
755    async fn upsert_run_from_changeset(
756        &self,
757        thread_id: &str,
758        delta: &ThreadChangeSet,
759    ) -> Result<(), ThreadStoreError> {
760        if delta.run_id.is_empty() {
761            return Ok(());
762        }
763        let now = now_millis();
764        if let Some(meta) = &delta.run_meta {
765            let mut record = self
766                .load_run_record(&delta.run_id)
767                .await?
768                .unwrap_or_else(|| {
769                    RunRecord::new(
770                        &delta.run_id,
771                        thread_id,
772                        &meta.agent_id,
773                        meta.origin,
774                        meta.status,
775                        now,
776                    )
777                });
778            record.status = meta.status;
779            record.agent_id.clone_from(&meta.agent_id);
780            record.origin = meta.origin;
781            record.thread_id = thread_id.to_string();
782            if record.parent_run_id.is_none() {
783                record.parent_run_id.clone_from(&delta.parent_run_id);
784            }
785            if record.parent_thread_id.is_none() {
786                record.parent_thread_id.clone_from(&meta.parent_thread_id);
787            }
788            record.termination_code.clone_from(&meta.termination_code);
789            record
790                .termination_detail
791                .clone_from(&meta.termination_detail);
792            if record.source_mailbox_entry_id.is_none() {
793                record
794                    .source_mailbox_entry_id
795                    .clone_from(&meta.source_mailbox_entry_id);
796            }
797            record.updated_at = now;
798            self.save_run_record(&record).await?;
799        } else if let Some(mut record) = self.load_run_record(&delta.run_id).await? {
800            record.updated_at = now;
801            self.save_run_record(&record).await?;
802        }
803        Ok(())
804    }
805
806    /// Load a thread head (thread + version) from file.
807    async fn load_head(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
808        let path = self.thread_path(thread_id)?;
809        if !path.exists() {
810            return Ok(None);
811        }
812        let content = tokio::fs::read_to_string(&path).await?;
813        // Try to parse as ThreadHead first (new format with version).
814        if let Ok(head) = serde_json::from_str::<VersionedThread>(&content) {
815            let thread: Thread = serde_json::from_str(&content)
816                .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
817            Ok(Some(ThreadHead {
818                thread,
819                version: head._version.unwrap_or(0),
820            }))
821        } else {
822            let thread: Thread = serde_json::from_str(&content)
823                .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
824            Ok(Some(ThreadHead { thread, version: 0 }))
825        }
826    }
827
828    /// Save a thread head (thread + version) to file atomically.
829    async fn save_head(&self, head: &ThreadHead) -> Result<(), ThreadStoreError> {
830        // Embed version into the JSON
831        let mut v = serde_json::to_value(&head.thread)
832            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
833        if let Some(obj) = v.as_object_mut() {
834            obj.insert("_version".to_string(), serde_json::json!(head.version));
835        }
836        let content = serde_json::to_string_pretty(&v)
837            .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
838
839        let filename = format!("{}.json", head.thread.id);
840        file_utils::atomic_json_write(&self.base_path, &filename, &content)
841            .await
842            .map_err(ThreadStoreError::Io)
843    }
844}
845
846/// Helper for extracting the `_version` field from serialized thread JSON.
847#[derive(Deserialize)]
848struct VersionedThread {
849    #[serde(default)]
850    _version: Option<Version>,
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856    use serde_json::json;
857    use std::sync::Arc;
858    use tempfile::TempDir;
859    use tirea_contract::{
860        storage::{MailboxEntryStatus, MailboxReader, MailboxWriter, ThreadReader},
861        testing::MailboxEntryBuilder,
862        CheckpointReason, Message, MessageQuery, ThreadWriter,
863    };
864    use tirea_state::{path, Op, Patch, TrackedPatch};
865
866    fn make_thread_with_messages(thread_id: &str, n: usize) -> Thread {
867        let mut thread = Thread::new(thread_id);
868        for i in 0..n {
869            thread = thread.with_message(Message::user(format!("msg-{i}")));
870        }
871        thread
872    }
873
874    #[tokio::test]
875    async fn file_storage_save_load_roundtrip() {
876        let temp_dir = TempDir::new().unwrap();
877        let storage = FileStore::new(temp_dir.path());
878
879        let thread = Thread::new("test-1").with_message(Message::user("hello"));
880        storage.save(&thread).await.unwrap();
881
882        let loaded = storage.load_thread("test-1").await.unwrap().unwrap();
883        assert_eq!(loaded.id, "test-1");
884        assert_eq!(loaded.message_count(), 1);
885    }
886
887    #[tokio::test]
888    async fn file_storage_list_and_delete() {
889        let temp_dir = TempDir::new().unwrap();
890        let storage = FileStore::new(temp_dir.path());
891
892        storage.create(&Thread::new("thread-a")).await.unwrap();
893        storage.create(&Thread::new("thread-b")).await.unwrap();
894        storage.create(&Thread::new("thread-c")).await.unwrap();
895
896        let mut ids = storage.list().await.unwrap();
897        ids.sort();
898        assert_eq!(ids, vec!["thread-a", "thread-b", "thread-c"]);
899
900        storage.delete("thread-b").await.unwrap();
901        let mut ids = storage.list().await.unwrap();
902        ids.sort();
903        assert_eq!(ids, vec!["thread-a", "thread-c"]);
904    }
905
906    #[tokio::test]
907    async fn file_storage_message_queries() {
908        let temp_dir = TempDir::new().unwrap();
909        let storage = FileStore::new(temp_dir.path());
910        let thread = make_thread_with_messages("t1", 10);
911        storage.save(&thread).await.unwrap();
912
913        let page = storage
914            .load_messages(
915                "t1",
916                &MessageQuery {
917                    after: Some(4),
918                    limit: 3,
919                    ..Default::default()
920                },
921            )
922            .await
923            .unwrap();
924        assert_eq!(page.messages.len(), 3);
925        assert_eq!(page.messages[0].cursor, 5);
926        assert_eq!(page.messages[0].message.content, "msg-5");
927        assert_eq!(storage.message_count("t1").await.unwrap(), 10);
928    }
929
930    #[tokio::test]
931    async fn file_storage_append_and_versioning() {
932        let temp_dir = TempDir::new().unwrap();
933        let store = FileStore::new(temp_dir.path());
934        store.create(&Thread::new("t1")).await.unwrap();
935
936        let d1 = ThreadChangeSet {
937            run_id: "run-1".to_string(),
938            parent_run_id: None,
939            run_meta: None,
940            reason: CheckpointReason::UserMessage,
941            messages: vec![Arc::new(Message::user("hello"))],
942            patches: vec![],
943            state_actions: vec![],
944            snapshot: None,
945        };
946        let c1 = store
947            .append("t1", &d1, VersionPrecondition::Exact(0))
948            .await
949            .unwrap();
950        assert_eq!(c1.version, 1);
951
952        let d2 = ThreadChangeSet {
953            run_id: "run-1".to_string(),
954            parent_run_id: None,
955            run_meta: None,
956            reason: CheckpointReason::AssistantTurnCommitted,
957            messages: vec![Arc::new(Message::assistant("hi"))],
958            patches: vec![TrackedPatch::new(
959                Patch::new().with_op(Op::set(path!("greeted"), json!(true))),
960            )],
961            state_actions: vec![],
962            snapshot: None,
963        };
964        let c2 = store
965            .append("t1", &d2, VersionPrecondition::Exact(1))
966            .await
967            .unwrap();
968        assert_eq!(c2.version, 2);
969
970        let d3 = ThreadChangeSet {
971            run_id: "run-1".to_string(),
972            parent_run_id: None,
973            run_meta: None,
974            reason: CheckpointReason::RunFinished,
975            messages: vec![],
976            patches: vec![],
977            state_actions: vec![],
978            snapshot: Some(json!({"greeted": true})),
979        };
980        let c3 = store
981            .append("t1", &d3, VersionPrecondition::Exact(2))
982            .await
983            .unwrap();
984        assert_eq!(c3.version, 3);
985
986        let store2 = FileStore::new(temp_dir.path());
987        let head = store2.load("t1").await.unwrap().unwrap();
988        assert_eq!(head.version, 3);
989        assert_eq!(head.thread.message_count(), 2);
990        assert!(head.thread.patches.is_empty());
991        assert_eq!(head.thread.state, json!({"greeted": true}));
992    }
993
994    #[tokio::test]
995    async fn file_storage_tool_call_message_roundtrip() {
996        let temp_dir = TempDir::new().unwrap();
997        let storage = FileStore::new(temp_dir.path());
998
999        let tool_call = tirea_contract::ToolCall::new("call_1", "search", json!({"query": "rust"}));
1000        let thread = Thread::new("tool-rt")
1001            .with_message(Message::user("Find info about Rust"))
1002            .with_message(Message::assistant_with_tool_calls(
1003                "Let me search for that.",
1004                vec![tool_call],
1005            ))
1006            .with_message(Message::tool(
1007                "call_1",
1008                r#"{"result": "Rust is a language"}"#,
1009            ))
1010            .with_message(Message::assistant(
1011                "Rust is a systems programming language.",
1012            ));
1013
1014        storage.save(&thread).await.unwrap();
1015        let loaded = storage.load_thread("tool-rt").await.unwrap().unwrap();
1016
1017        assert_eq!(loaded.message_count(), 4);
1018
1019        // Assistant message with tool_calls
1020        let assistant_msg = &loaded.messages[1];
1021        assert_eq!(assistant_msg.role, tirea_contract::Role::Assistant);
1022        let calls = assistant_msg.tool_calls.as_ref().expect("tool_calls lost");
1023        assert_eq!(calls.len(), 1);
1024        assert_eq!(calls[0].id, "call_1");
1025        assert_eq!(calls[0].name, "search");
1026        assert_eq!(calls[0].arguments, json!({"query": "rust"}));
1027
1028        // Tool response message with tool_call_id
1029        let tool_msg = &loaded.messages[2];
1030        assert_eq!(tool_msg.role, tirea_contract::Role::Tool);
1031        assert_eq!(tool_msg.tool_call_id.as_deref(), Some("call_1"));
1032        assert_eq!(tool_msg.content, r#"{"result": "Rust is a language"}"#);
1033    }
1034
1035    #[tokio::test]
1036    async fn file_storage_tool_call_message_roundtrip_via_append() {
1037        let temp_dir = TempDir::new().unwrap();
1038        let store = FileStore::new(temp_dir.path());
1039        store.create(&Thread::new("tool-append")).await.unwrap();
1040
1041        let tool_call =
1042            tirea_contract::ToolCall::new("call_42", "calculator", json!({"expr": "6*7"}));
1043        let delta = ThreadChangeSet {
1044            run_id: "run-1".to_string(),
1045            parent_run_id: None,
1046            run_meta: None,
1047            reason: CheckpointReason::AssistantTurnCommitted,
1048            messages: vec![
1049                Arc::new(Message::assistant_with_tool_calls(
1050                    "Calculating...",
1051                    vec![tool_call],
1052                )),
1053                Arc::new(Message::tool("call_42", r#"{"answer": 42}"#)),
1054            ],
1055            patches: vec![],
1056            state_actions: vec![],
1057            snapshot: None,
1058        };
1059        store
1060            .append("tool-append", &delta, VersionPrecondition::Exact(0))
1061            .await
1062            .unwrap();
1063
1064        let head = store.load("tool-append").await.unwrap().unwrap();
1065        assert_eq!(head.thread.message_count(), 2);
1066
1067        let calls = head.thread.messages[0]
1068            .tool_calls
1069            .as_ref()
1070            .expect("tool_calls lost after append");
1071        assert_eq!(calls[0].id, "call_42");
1072        assert_eq!(calls[0].name, "calculator");
1073
1074        assert_eq!(
1075            head.thread.messages[1].tool_call_id.as_deref(),
1076            Some("call_42")
1077        );
1078    }
1079
1080    #[tokio::test]
1081    async fn file_storage_timestamps_populated() {
1082        let temp_dir = TempDir::new().unwrap();
1083        let store = FileStore::new(temp_dir.path());
1084
1085        // create() populates both timestamps
1086        store.create(&Thread::new("ts-1")).await.unwrap();
1087        let head = store.load("ts-1").await.unwrap().unwrap();
1088        assert!(head.thread.metadata.created_at.is_some());
1089        assert!(head.thread.metadata.updated_at.is_some());
1090        let created = head.thread.metadata.created_at.unwrap();
1091        let updated = head.thread.metadata.updated_at.unwrap();
1092        assert!(created > 0);
1093        assert_eq!(created, updated);
1094
1095        // append() updates updated_at
1096        let delta = ThreadChangeSet {
1097            run_id: "run-1".to_string(),
1098            parent_run_id: None,
1099            run_meta: None,
1100            reason: CheckpointReason::UserMessage,
1101            messages: vec![Arc::new(Message::user("hello"))],
1102            patches: vec![],
1103            state_actions: vec![],
1104            snapshot: None,
1105        };
1106        store
1107            .append("ts-1", &delta, VersionPrecondition::Exact(0))
1108            .await
1109            .unwrap();
1110        let head = store.load("ts-1").await.unwrap().unwrap();
1111        assert!(head.thread.metadata.updated_at.unwrap() >= updated);
1112        assert_eq!(head.thread.metadata.created_at.unwrap(), created);
1113
1114        // save() populates created_at if missing, updates updated_at
1115        let thread = Thread::new("ts-2");
1116        assert!(thread.metadata.created_at.is_none());
1117        store.save(&thread).await.unwrap();
1118        let head = store.load("ts-2").await.unwrap().unwrap();
1119        assert!(head.thread.metadata.created_at.is_some());
1120        assert!(head.thread.metadata.updated_at.is_some());
1121    }
1122
1123    #[test]
1124    fn file_storage_rejects_path_traversal() {
1125        let storage = FileStore::new("/base/path");
1126        assert!(storage.thread_path("../../etc/passwd").is_err());
1127        assert!(storage.thread_path("foo/bar").is_err());
1128        assert!(storage.thread_path("foo\\bar").is_err());
1129        assert!(storage.thread_path("").is_err());
1130        assert!(storage.thread_path("foo\0bar").is_err());
1131    }
1132
1133    #[tokio::test]
1134    async fn file_storage_mailbox_claim_and_cancel_roundtrip() {
1135        let temp_dir = TempDir::new().unwrap();
1136        let storage = FileStore::new(temp_dir.path());
1137        let entry = MailboxEntryBuilder::queued("entry-file-mailbox", "mailbox-file-mailbox")
1138            .with_payload(json!({"message": "hello"}))
1139            .build();
1140        storage.enqueue_mailbox_entry(&entry).await.unwrap();
1141
1142        let claimed = storage
1143            .claim_mailbox_entries(None, 1, "worker-file", 10, 5_000)
1144            .await
1145            .unwrap();
1146        assert_eq!(claimed.len(), 1);
1147        assert_eq!(claimed[0].status, MailboxEntryStatus::Claimed);
1148
1149        let cancelled = storage
1150            .cancel_mailbox_entry("entry-file-mailbox", 20)
1151            .await
1152            .unwrap()
1153            .expect("queued entry should be cancellable");
1154        assert_eq!(cancelled.status, MailboxEntryStatus::Cancelled);
1155
1156        let loaded = storage
1157            .load_mailbox_entry(&entry.entry_id)
1158            .await
1159            .unwrap()
1160            .expect("mailbox entry should persist");
1161        assert_eq!(loaded.status, MailboxEntryStatus::Cancelled);
1162    }
1163
1164    #[tokio::test]
1165    async fn file_storage_mailbox_claim_by_entry_id_ignores_available_at() {
1166        let temp_dir = TempDir::new().unwrap();
1167        let storage = FileStore::new(temp_dir.path());
1168        let entry = MailboxEntryBuilder::queued("entry-file-inline", "mailbox-file-inline")
1169            .with_payload(json!({"message": "hello"}))
1170            .with_available_at(i64::MAX as u64)
1171            .build();
1172        storage.enqueue_mailbox_entry(&entry).await.unwrap();
1173
1174        let claimed = storage
1175            .claim_mailbox_entries(None, 1, "worker-file-batch", 10, 5_000)
1176            .await
1177            .unwrap();
1178        assert!(claimed.is_empty());
1179
1180        let targeted = storage
1181            .claim_mailbox_entry("entry-file-inline", "worker-file-inline", 10, 5_000)
1182            .await
1183            .unwrap()
1184            .expect("inline claim should succeed");
1185        assert_eq!(targeted.status, MailboxEntryStatus::Claimed);
1186        assert_eq!(targeted.claimed_by.as_deref(), Some("worker-file-inline"));
1187    }
1188
1189    #[tokio::test]
1190    async fn file_storage_mailbox_interrupt_bumps_generation_and_supersedes_entries() {
1191        let temp_dir = TempDir::new().unwrap();
1192        let storage = FileStore::new(temp_dir.path());
1193        let old_a = MailboxEntryBuilder::queued("entry-file-old-a", "mailbox-file-interrupt")
1194            .with_payload(json!({"message": "hello"}))
1195            .build();
1196        let old_b = MailboxEntryBuilder::queued("entry-file-old-b", "mailbox-file-interrupt")
1197            .with_payload(json!({"message": "hello"}))
1198            .build();
1199        storage.enqueue_mailbox_entry(&old_a).await.unwrap();
1200        storage.enqueue_mailbox_entry(&old_b).await.unwrap();
1201
1202        let interrupted = storage
1203            .interrupt_mailbox("mailbox-file-interrupt", 50)
1204            .await
1205            .unwrap();
1206        assert_eq!(interrupted.mailbox_state.current_generation, 1);
1207        assert_eq!(interrupted.superseded_entries.len(), 2);
1208
1209        let superseded = storage
1210            .load_mailbox_entry("entry-file-old-a")
1211            .await
1212            .unwrap()
1213            .expect("superseded entry should exist");
1214        assert_eq!(superseded.status, MailboxEntryStatus::Superseded);
1215
1216        let next_generation = storage
1217            .ensure_mailbox_state("mailbox-file-interrupt", 60)
1218            .await
1219            .unwrap()
1220            .current_generation;
1221        let fresh = MailboxEntryBuilder::queued("entry-file-fresh", "mailbox-file-interrupt")
1222            .with_payload(json!({"message": "hello"}))
1223            .with_generation(next_generation)
1224            .build();
1225        storage.enqueue_mailbox_entry(&fresh).await.unwrap();
1226
1227        let fresh_loaded = storage
1228            .load_mailbox_entry("entry-file-fresh")
1229            .await
1230            .unwrap()
1231            .expect("fresh entry should exist");
1232        assert_eq!(fresh_loaded.generation, 1);
1233        assert_eq!(fresh_loaded.status, MailboxEntryStatus::Queued);
1234    }
1235
1236    #[tokio::test]
1237    async fn file_storage_mailbox_rejects_duplicate_dedupe_key_in_same_mailbox() {
1238        let temp_dir = TempDir::new().unwrap();
1239        let storage = FileStore::new(temp_dir.path());
1240        let first = MailboxEntryBuilder::queued("entry-file-dedupe-1", "mailbox-file-dedupe")
1241            .with_dedupe_key("dup-key")
1242            .build();
1243        let duplicate = MailboxEntryBuilder::queued("entry-file-dedupe-2", "mailbox-file-dedupe")
1244            .with_dedupe_key("dup-key")
1245            .build();
1246
1247        storage.enqueue_mailbox_entry(&first).await.unwrap();
1248        let result = storage.enqueue_mailbox_entry(&duplicate).await;
1249        assert!(matches!(result, Err(MailboxStoreError::AlreadyExists(_))));
1250    }
1251}