1use crate::file_utils;
2use async_trait::async_trait;
3use serde::Deserialize;
4use std::path::PathBuf;
5use std::time::{SystemTime, UNIX_EPOCH};
6use tirea_contract::storage::{
7 has_active_claim_for_mailbox, paginate_mailbox_entries, paginate_runs_in_memory, Committed,
8 MailboxEntry, MailboxInterrupt, MailboxPage, MailboxQuery, MailboxReader, MailboxState,
9 MailboxStoreError, MailboxWriter, RunPage, RunQuery, RunRecord, ThreadHead, ThreadListPage,
10 ThreadListQuery, ThreadReader, ThreadStoreError, ThreadWriter, VersionPrecondition,
11};
12use tirea_contract::{Thread, ThreadChangeSet, Version};
13
14fn now_millis() -> u64 {
15 SystemTime::now()
16 .duration_since(UNIX_EPOCH)
17 .unwrap_or_default()
18 .as_millis() as u64
19}
20
21pub struct FileStore {
22 base_path: PathBuf,
23}
24
25impl FileStore {
26 pub fn new(base_path: impl Into<PathBuf>) -> Self {
28 Self {
29 base_path: base_path.into(),
30 }
31 }
32
33 pub(super) fn thread_path(&self, thread_id: &str) -> Result<PathBuf, ThreadStoreError> {
34 Self::validate_thread_id(thread_id)?;
35 Ok(self.base_path.join(format!("{}.json", thread_id)))
36 }
37
38 fn validate_thread_id(thread_id: &str) -> Result<(), ThreadStoreError> {
39 file_utils::validate_fs_id(thread_id, "thread id").map_err(ThreadStoreError::InvalidId)
40 }
41
42 fn mailbox_dir(&self) -> PathBuf {
43 self.base_path.join("_mailbox")
44 }
45
46 fn mailbox_threads_dir(&self) -> PathBuf {
47 self.base_path.join("_mailbox_threads")
48 }
49
50 fn mailbox_path(&self, entry_id: &str) -> Result<PathBuf, MailboxStoreError> {
51 file_utils::validate_fs_id(entry_id, "mailbox entry id")
52 .map_err(MailboxStoreError::Backend)?;
53 Ok(self.mailbox_dir().join(format!("{entry_id}.json")))
54 }
55
56 fn mailbox_state_path(&self, mailbox_id: &str) -> Result<PathBuf, MailboxStoreError> {
57 file_utils::validate_fs_id(mailbox_id, "mailbox id").map_err(MailboxStoreError::Backend)?;
58 Ok(self
59 .mailbox_threads_dir()
60 .join(format!("{mailbox_id}.json")))
61 }
62
63 async fn save_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
64 let payload = serde_json::to_string_pretty(entry)
65 .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
66 let filename = format!("{}.json", entry.entry_id);
67 file_utils::atomic_json_write(&self.mailbox_dir(), &filename, &payload)
68 .await
69 .map_err(MailboxStoreError::Io)
70 }
71
72 async fn save_mailbox_state(&self, state: &MailboxState) -> Result<(), MailboxStoreError> {
73 let payload = serde_json::to_string_pretty(state)
74 .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
75 let filename = format!("{}.json", state.mailbox_id);
76 file_utils::atomic_json_write(&self.mailbox_threads_dir(), &filename, &payload)
77 .await
78 .map_err(MailboxStoreError::Io)
79 }
80
81 async fn load_mailbox_state_inner(
82 &self,
83 mailbox_id: &str,
84 ) -> Result<Option<MailboxState>, MailboxStoreError> {
85 let path = self.mailbox_state_path(mailbox_id)?;
86 if !path.exists() {
87 return Ok(None);
88 }
89 let content = tokio::fs::read_to_string(path)
90 .await
91 .map_err(MailboxStoreError::Io)?;
92 let state = serde_json::from_str(&content)
93 .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
94 Ok(Some(state))
95 }
96
97 async fn load_all_mailbox_entries(&self) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
98 let dir = self.mailbox_dir();
99 if !dir.exists() {
100 return Ok(Vec::new());
101 }
102 let mut entries = tokio::fs::read_dir(&dir)
103 .await
104 .map_err(MailboxStoreError::Io)?;
105 let mut mailbox_entries = Vec::new();
106 while let Some(entry) = entries.next_entry().await.map_err(MailboxStoreError::Io)? {
107 let path = entry.path();
108 if path.extension().is_none_or(|ext| ext != "json") {
109 continue;
110 }
111 let content = tokio::fs::read_to_string(path)
112 .await
113 .map_err(MailboxStoreError::Io)?;
114 let mailbox_entry: MailboxEntry = serde_json::from_str(&content)
115 .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
116 mailbox_entries.push(mailbox_entry);
117 }
118 Ok(mailbox_entries)
119 }
120}
121
122#[async_trait]
123impl MailboxReader for FileStore {
124 async fn load_mailbox_entry(
125 &self,
126 entry_id: &str,
127 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
128 let path = self.mailbox_path(entry_id)?;
129 if !path.exists() {
130 return Ok(None);
131 }
132 let content = tokio::fs::read_to_string(path)
133 .await
134 .map_err(MailboxStoreError::Io)?;
135 let entry: MailboxEntry = serde_json::from_str(&content)
136 .map_err(|e| MailboxStoreError::Serialization(e.to_string()))?;
137 Ok(Some(entry))
138 }
139
140 async fn load_mailbox_state(
141 &self,
142 mailbox_id: &str,
143 ) -> Result<Option<MailboxState>, MailboxStoreError> {
144 self.load_mailbox_state_inner(mailbox_id).await
145 }
146
147 async fn list_mailbox_entries(
148 &self,
149 query: &MailboxQuery,
150 ) -> Result<MailboxPage, MailboxStoreError> {
151 let entries = self.load_all_mailbox_entries().await?;
152 Ok(paginate_mailbox_entries(&entries, query))
153 }
154}
155
156#[async_trait]
157impl MailboxWriter for FileStore {
158 async fn enqueue_mailbox_entry(&self, entry: &MailboxEntry) -> Result<(), MailboxStoreError> {
159 let path = self.mailbox_path(&entry.entry_id)?;
160 if path.exists() {
161 return Err(MailboxStoreError::AlreadyExists(entry.entry_id.clone()));
162 }
163 let mailbox_state = self
164 .load_mailbox_state_inner(&entry.mailbox_id)
165 .await?
166 .unwrap_or(MailboxState {
167 mailbox_id: entry.mailbox_id.clone(),
168 current_generation: entry.generation,
169 updated_at: entry.updated_at,
170 });
171 if mailbox_state.current_generation != entry.generation {
172 return Err(MailboxStoreError::GenerationMismatch {
173 mailbox_id: entry.mailbox_id.clone(),
174 expected: mailbox_state.current_generation,
175 actual: entry.generation,
176 });
177 }
178 if let Some(dedupe_key) = entry.dedupe_key.as_deref() {
179 let existing = self.load_all_mailbox_entries().await?;
180 if existing.iter().any(|current| {
181 current.mailbox_id == entry.mailbox_id
182 && current.dedupe_key.as_deref() == Some(dedupe_key)
183 }) {
184 return Err(MailboxStoreError::AlreadyExists(dedupe_key.to_string()));
185 }
186 }
187 self.save_mailbox_state(&mailbox_state).await?;
188 self.save_mailbox_entry(entry).await
189 }
190
191 async fn ensure_mailbox_state(
192 &self,
193 mailbox_id: &str,
194 now: u64,
195 ) -> Result<MailboxState, MailboxStoreError> {
196 let mut state = self
197 .load_mailbox_state_inner(mailbox_id)
198 .await?
199 .unwrap_or(MailboxState {
200 mailbox_id: mailbox_id.to_string(),
201 current_generation: 0,
202 updated_at: now,
203 });
204 state.updated_at = now;
205 self.save_mailbox_state(&state).await?;
206 Ok(state)
207 }
208
209 async fn claim_mailbox_entries(
210 &self,
211 mailbox_id: Option<&str>,
212 limit: usize,
213 consumer_id: &str,
214 now: u64,
215 lease_duration_ms: u64,
216 ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
217 let all_entries = self.load_all_mailbox_entries().await?;
218 let mut candidates: Vec<MailboxEntry> = all_entries
219 .iter()
220 .filter(|entry| entry.is_claimable(now))
221 .filter(|entry| mailbox_id.is_none_or(|id| entry.mailbox_id == id))
222 .cloned()
223 .collect();
224 candidates.sort_by(|left, right| {
225 right
226 .priority
227 .cmp(&left.priority)
228 .then_with(|| left.available_at.cmp(&right.available_at))
229 .then_with(|| left.created_at.cmp(&right.created_at))
230 .then_with(|| left.entry_id.cmp(&right.entry_id))
231 });
232
233 let mut claimed_mailbox_ids = std::collections::HashSet::new();
235
236 let mut claimed = Vec::new();
237 for mut entry in candidates {
238 if claimed.len() >= limit {
239 break;
240 }
241
242 if claimed_mailbox_ids.contains(&entry.mailbox_id)
245 || has_active_claim_for_mailbox(
246 all_entries.iter(),
247 &entry.mailbox_id,
248 now,
249 Some(&entry.entry_id),
250 )
251 {
252 continue;
253 }
254
255 if let Some(ts) = self.load_mailbox_state_inner(&entry.mailbox_id).await? {
258 if entry.generation < ts.current_generation {
259 entry.status = tirea_contract::MailboxEntryStatus::Superseded;
260 entry.last_error =
261 Some("superseded by interrupt (reconciled on claim)".to_string());
262 entry.claim_token = None;
263 entry.claimed_by = None;
264 entry.lease_until = None;
265 entry.updated_at = now;
266 self.save_mailbox_entry(&entry).await?;
267 continue;
268 }
269 }
270
271 let mid = entry.mailbox_id.clone();
272 entry.status = tirea_contract::MailboxEntryStatus::Claimed;
273 entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
274 entry.claimed_by = Some(consumer_id.to_string());
275 entry.lease_until = Some(now.saturating_add(lease_duration_ms));
276 entry.attempt_count = entry.attempt_count.saturating_add(1);
277 entry.updated_at = now;
278 self.save_mailbox_entry(&entry).await?;
279 claimed.push(entry);
280 claimed_mailbox_ids.insert(mid);
281 }
282 Ok(claimed)
283 }
284
285 async fn claim_mailbox_entry(
286 &self,
287 entry_id: &str,
288 consumer_id: &str,
289 now: u64,
290 lease_duration_ms: u64,
291 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
292 let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
293 return Ok(None);
294 };
295 if entry.status.is_terminal() {
296 return Ok(None);
297 }
298 if entry.status == tirea_contract::MailboxEntryStatus::Claimed
299 && entry.lease_until.is_some_and(|lease| lease > now)
300 {
301 return Ok(None);
302 }
303
304 let all_entries = self.load_all_mailbox_entries().await?;
307 if has_active_claim_for_mailbox(all_entries.iter(), &entry.mailbox_id, now, Some(entry_id))
308 {
309 return Ok(None);
310 }
311
312 entry.status = tirea_contract::MailboxEntryStatus::Claimed;
313 entry.claim_token = Some(uuid::Uuid::now_v7().simple().to_string());
314 entry.claimed_by = Some(consumer_id.to_string());
315 entry.lease_until = Some(now.saturating_add(lease_duration_ms));
316 entry.attempt_count = entry.attempt_count.saturating_add(1);
317 entry.updated_at = now;
318 self.save_mailbox_entry(&entry).await?;
319 Ok(Some(entry))
320 }
321
322 async fn ack_mailbox_entry(
323 &self,
324 entry_id: &str,
325 claim_token: &str,
326 now: u64,
327 ) -> Result<(), MailboxStoreError> {
328 let mut entry = self
329 .load_mailbox_entry(entry_id)
330 .await?
331 .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
332 if entry.claim_token.as_deref() != Some(claim_token) {
333 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
334 }
335 entry.status = tirea_contract::MailboxEntryStatus::Accepted;
336 entry.claim_token = None;
337 entry.claimed_by = None;
338 entry.lease_until = None;
339 entry.updated_at = now;
340 self.save_mailbox_entry(&entry).await
341 }
342
343 async fn nack_mailbox_entry(
344 &self,
345 entry_id: &str,
346 claim_token: &str,
347 retry_at: u64,
348 error: &str,
349 now: u64,
350 ) -> Result<(), MailboxStoreError> {
351 let mut entry = self
352 .load_mailbox_entry(entry_id)
353 .await?
354 .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
355 if entry.claim_token.as_deref() != Some(claim_token) {
356 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
357 }
358 entry.status = tirea_contract::MailboxEntryStatus::Queued;
359 entry.available_at = retry_at;
360 entry.last_error = Some(error.to_string());
361 entry.claim_token = None;
362 entry.claimed_by = None;
363 entry.lease_until = None;
364 entry.updated_at = now;
365 self.save_mailbox_entry(&entry).await
366 }
367
368 async fn dead_letter_mailbox_entry(
369 &self,
370 entry_id: &str,
371 claim_token: &str,
372 error: &str,
373 now: u64,
374 ) -> Result<(), MailboxStoreError> {
375 let mut entry = self
376 .load_mailbox_entry(entry_id)
377 .await?
378 .ok_or_else(|| MailboxStoreError::NotFound(entry_id.to_string()))?;
379 if entry.claim_token.as_deref() != Some(claim_token) {
380 return Err(MailboxStoreError::ClaimConflict(entry_id.to_string()));
381 }
382 entry.status = tirea_contract::MailboxEntryStatus::DeadLetter;
383 entry.last_error = Some(error.to_string());
384 entry.claim_token = None;
385 entry.claimed_by = None;
386 entry.lease_until = None;
387 entry.updated_at = now;
388 self.save_mailbox_entry(&entry).await
389 }
390
391 async fn cancel_mailbox_entry(
392 &self,
393 entry_id: &str,
394 now: u64,
395 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
396 let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
397 return Ok(None);
398 };
399 if entry.status.is_terminal() {
400 return Ok(Some(entry));
401 }
402 entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
403 entry.last_error = Some("cancelled".to_string());
404 entry.claim_token = None;
405 entry.claimed_by = None;
406 entry.lease_until = None;
407 entry.updated_at = now;
408 self.save_mailbox_entry(&entry).await?;
409 Ok(Some(entry))
410 }
411
412 async fn supersede_mailbox_entry(
413 &self,
414 entry_id: &str,
415 now: u64,
416 reason: &str,
417 ) -> Result<Option<MailboxEntry>, MailboxStoreError> {
418 let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
419 return Ok(None);
420 };
421 if entry.status.is_terminal() {
422 return Ok(Some(entry));
423 }
424 entry.status = tirea_contract::MailboxEntryStatus::Superseded;
425 entry.last_error = Some(reason.to_string());
426 entry.claim_token = None;
427 entry.claimed_by = None;
428 entry.lease_until = None;
429 entry.updated_at = now;
430 self.save_mailbox_entry(&entry).await?;
431 Ok(Some(entry))
432 }
433
434 async fn cancel_pending_for_mailbox(
435 &self,
436 mailbox_id: &str,
437 now: u64,
438 exclude_entry_id: Option<&str>,
439 ) -> Result<Vec<MailboxEntry>, MailboxStoreError> {
440 let entries = self.load_all_mailbox_entries().await?;
441 let mut cancelled = Vec::new();
442 for mut entry in entries {
443 if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
444 continue;
445 }
446 if exclude_entry_id.is_some_and(|eid| entry.entry_id == eid) {
447 continue;
448 }
449 entry.status = tirea_contract::MailboxEntryStatus::Cancelled;
450 entry.last_error = Some("cancelled".to_string());
451 entry.claim_token = None;
452 entry.claimed_by = None;
453 entry.lease_until = None;
454 entry.updated_at = now;
455 self.save_mailbox_entry(&entry).await?;
456 cancelled.push(entry);
457 }
458 Ok(cancelled)
459 }
460
461 async fn interrupt_mailbox(
462 &self,
463 mailbox_id: &str,
464 now: u64,
465 ) -> Result<MailboxInterrupt, MailboxStoreError> {
466 let mut state = self
467 .load_mailbox_state_inner(mailbox_id)
468 .await?
469 .unwrap_or(MailboxState {
470 mailbox_id: mailbox_id.to_string(),
471 current_generation: 0,
472 updated_at: now,
473 });
474 state.current_generation = state.current_generation.saturating_add(1);
475 state.updated_at = now;
476 self.save_mailbox_state(&state).await?;
477
478 let entries = self.load_all_mailbox_entries().await?;
479 let mut superseded = Vec::new();
480 for mut entry in entries {
481 if entry.mailbox_id != mailbox_id || entry.status.is_terminal() {
482 continue;
483 }
484 if entry.generation >= state.current_generation {
485 continue;
486 }
487 entry.status = tirea_contract::MailboxEntryStatus::Superseded;
488 entry.last_error = Some("superseded by interrupt".to_string());
489 entry.claim_token = None;
490 entry.claimed_by = None;
491 entry.lease_until = None;
492 entry.updated_at = now;
493 self.save_mailbox_entry(&entry).await?;
494 superseded.push(entry);
495 }
496
497 Ok(MailboxInterrupt {
498 mailbox_state: state,
499 superseded_entries: superseded,
500 })
501 }
502
503 async fn extend_lease(
504 &self,
505 entry_id: &str,
506 claim_token: &str,
507 extension_ms: u64,
508 now: u64,
509 ) -> Result<bool, MailboxStoreError> {
510 let Some(mut entry) = self.load_mailbox_entry(entry_id).await? else {
511 return Ok(false);
512 };
513 if entry.status != tirea_contract::MailboxEntryStatus::Claimed {
514 return Ok(false);
515 }
516 if entry.claim_token.as_deref() != Some(claim_token) {
517 return Ok(false);
518 }
519 entry.lease_until = Some(now.saturating_add(extension_ms));
520 entry.updated_at = now;
521 self.save_mailbox_entry(&entry).await?;
522 Ok(true)
523 }
524
525 async fn purge_terminal_mailbox_entries(
526 &self,
527 older_than: u64,
528 ) -> Result<usize, MailboxStoreError> {
529 let entries = self.load_all_mailbox_entries().await?;
530 let mut count = 0usize;
531 for entry in entries {
532 if entry.status.is_terminal() && entry.updated_at < older_than {
533 let path = self.mailbox_dir().join(format!("{}.json", entry.entry_id));
534 if path.exists() {
535 tokio::fs::remove_file(&path)
536 .await
537 .map_err(MailboxStoreError::Io)?;
538 count += 1;
539 }
540 }
541 }
542 Ok(count)
543 }
544}
545
546#[async_trait]
547impl ThreadWriter for FileStore {
548 async fn create(&self, thread: &Thread) -> Result<Committed, ThreadStoreError> {
549 let path = self.thread_path(&thread.id)?;
550 if path.exists() {
551 return Err(ThreadStoreError::AlreadyExists);
552 }
553 let mut thread = thread.clone();
554 let now = now_millis();
555 if thread.metadata.created_at.is_none() {
556 thread.metadata.created_at = Some(now);
557 }
558 thread.metadata.updated_at = Some(now);
559 let head = ThreadHead { thread, version: 0 };
560 self.save_head(&head).await?;
561 Ok(Committed { version: 0 })
562 }
563
564 async fn append(
565 &self,
566 thread_id: &str,
567 delta: &ThreadChangeSet,
568 precondition: VersionPrecondition,
569 ) -> Result<Committed, ThreadStoreError> {
570 let head = self
571 .load_head(thread_id)
572 .await?
573 .ok_or_else(|| ThreadStoreError::NotFound(thread_id.to_string()))?;
574
575 if let VersionPrecondition::Exact(expected) = precondition {
576 if head.version != expected {
577 return Err(ThreadStoreError::VersionConflict {
578 expected,
579 actual: head.version,
580 });
581 }
582 }
583
584 let mut thread = head.thread;
585 delta.apply_to(&mut thread);
586 thread.metadata.updated_at = Some(now_millis());
587 let new_version = head.version + 1;
588 let new_head = ThreadHead {
589 thread,
590 version: new_version,
591 };
592 self.save_head(&new_head).await?;
593 self.upsert_run_from_changeset(thread_id, delta).await?;
594 Ok(Committed {
595 version: new_version,
596 })
597 }
598
599 async fn delete(&self, thread_id: &str) -> Result<(), ThreadStoreError> {
600 let path = self.thread_path(thread_id)?;
601 if path.exists() {
602 tokio::fs::remove_file(&path).await?;
603 }
604 Ok(())
605 }
606
607 async fn save(&self, thread: &Thread) -> Result<(), ThreadStoreError> {
608 let next_version = self
609 .load_head(&thread.id)
610 .await?
611 .map_or(0, |head| head.version.saturating_add(1));
612 let mut thread = thread.clone();
613 let now = now_millis();
614 thread.metadata.updated_at = Some(now);
615 if thread.metadata.created_at.is_none() {
616 thread.metadata.created_at = Some(now);
617 }
618 let head = ThreadHead {
619 thread,
620 version: next_version,
621 };
622 self.save_head(&head).await
623 }
624}
625
626#[async_trait]
627impl ThreadReader for FileStore {
628 async fn load(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
629 self.load_head(thread_id).await
630 }
631
632 async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
633 self.load_run_record(run_id).await
634 }
635
636 async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, ThreadStoreError> {
637 let records = self.load_all_run_records().await?;
638 Ok(paginate_runs_in_memory(&records, query))
639 }
640
641 async fn active_run_for_thread(
642 &self,
643 thread_id: &str,
644 ) -> Result<Option<RunRecord>, ThreadStoreError> {
645 let records = self.load_all_run_records().await?;
646 Ok(records
647 .into_iter()
648 .filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
649 .max_by(|a, b| {
650 a.created_at
651 .cmp(&b.created_at)
652 .then_with(|| a.updated_at.cmp(&b.updated_at))
653 .then_with(|| a.run_id.cmp(&b.run_id))
654 }))
655 }
656
657 async fn list_threads(
658 &self,
659 query: &ThreadListQuery,
660 ) -> Result<ThreadListPage, ThreadStoreError> {
661 let mut all = file_utils::scan_json_stems(&self.base_path).await?;
662
663 if let Some(ref resource_id) = query.resource_id {
665 let mut filtered = Vec::new();
666 for id in &all {
667 if let Some(head) = self.load(id).await? {
668 if head.thread.resource_id.as_deref() == Some(resource_id.as_str()) {
669 filtered.push(id.clone());
670 }
671 }
672 }
673 all = filtered;
674 }
675
676 if let Some(ref parent_thread_id) = query.parent_thread_id {
678 let mut filtered = Vec::new();
679 for id in &all {
680 if let Some(head) = self.load(id).await? {
681 if head.thread.parent_thread_id.as_deref() == Some(parent_thread_id.as_str()) {
682 filtered.push(id.clone());
683 }
684 }
685 }
686 all = filtered;
687 }
688
689 all.sort();
690 let total = all.len();
691 let limit = query.limit.clamp(1, 200);
692 let offset = query.offset.min(total);
693 let end = (offset + limit + 1).min(total);
694 let slice = &all[offset..end];
695 let has_more = slice.len() > limit;
696 let items: Vec<String> = slice.iter().take(limit).cloned().collect();
697 Ok(ThreadListPage {
698 items,
699 total,
700 has_more,
701 })
702 }
703}
704
705impl FileStore {
706 fn runs_dir(&self) -> PathBuf {
707 self.base_path.join("_runs")
708 }
709
710 fn run_path(&self, run_id: &str) -> Result<PathBuf, ThreadStoreError> {
711 file_utils::validate_fs_id(run_id, "run id").map_err(ThreadStoreError::InvalidId)?;
712 Ok(self.runs_dir().join(format!("{run_id}.json")))
713 }
714
715 async fn save_run_record(&self, record: &RunRecord) -> Result<(), ThreadStoreError> {
716 let payload = serde_json::to_string_pretty(record)
717 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
718 let filename = format!("{}.json", record.run_id);
719 file_utils::atomic_json_write(&self.runs_dir(), &filename, &payload)
720 .await
721 .map_err(ThreadStoreError::Io)
722 }
723
724 async fn load_run_record(&self, run_id: &str) -> Result<Option<RunRecord>, ThreadStoreError> {
725 let path = self.run_path(run_id)?;
726 if !path.exists() {
727 return Ok(None);
728 }
729 let content = tokio::fs::read_to_string(path).await?;
730 let record: RunRecord = serde_json::from_str(&content)
731 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
732 Ok(Some(record))
733 }
734
735 async fn load_all_run_records(&self) -> Result<Vec<RunRecord>, ThreadStoreError> {
736 let dir = self.runs_dir();
737 if !dir.exists() {
738 return Ok(Vec::new());
739 }
740 let mut entries = tokio::fs::read_dir(&dir).await?;
741 let mut records = Vec::new();
742 while let Some(entry) = entries.next_entry().await? {
743 let path = entry.path();
744 if path.extension().is_none_or(|ext| ext != "json") {
745 continue;
746 }
747 let content = tokio::fs::read_to_string(path).await?;
748 let record: RunRecord = serde_json::from_str(&content)
749 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
750 records.push(record);
751 }
752 Ok(records)
753 }
754
755 async fn upsert_run_from_changeset(
756 &self,
757 thread_id: &str,
758 delta: &ThreadChangeSet,
759 ) -> Result<(), ThreadStoreError> {
760 if delta.run_id.is_empty() {
761 return Ok(());
762 }
763 let now = now_millis();
764 if let Some(meta) = &delta.run_meta {
765 let mut record = self
766 .load_run_record(&delta.run_id)
767 .await?
768 .unwrap_or_else(|| {
769 RunRecord::new(
770 &delta.run_id,
771 thread_id,
772 &meta.agent_id,
773 meta.origin,
774 meta.status,
775 now,
776 )
777 });
778 record.status = meta.status;
779 record.agent_id.clone_from(&meta.agent_id);
780 record.origin = meta.origin;
781 record.thread_id = thread_id.to_string();
782 if record.parent_run_id.is_none() {
783 record.parent_run_id.clone_from(&delta.parent_run_id);
784 }
785 if record.parent_thread_id.is_none() {
786 record.parent_thread_id.clone_from(&meta.parent_thread_id);
787 }
788 record.termination_code.clone_from(&meta.termination_code);
789 record
790 .termination_detail
791 .clone_from(&meta.termination_detail);
792 if record.source_mailbox_entry_id.is_none() {
793 record
794 .source_mailbox_entry_id
795 .clone_from(&meta.source_mailbox_entry_id);
796 }
797 record.updated_at = now;
798 self.save_run_record(&record).await?;
799 } else if let Some(mut record) = self.load_run_record(&delta.run_id).await? {
800 record.updated_at = now;
801 self.save_run_record(&record).await?;
802 }
803 Ok(())
804 }
805
806 async fn load_head(&self, thread_id: &str) -> Result<Option<ThreadHead>, ThreadStoreError> {
808 let path = self.thread_path(thread_id)?;
809 if !path.exists() {
810 return Ok(None);
811 }
812 let content = tokio::fs::read_to_string(&path).await?;
813 if let Ok(head) = serde_json::from_str::<VersionedThread>(&content) {
815 let thread: Thread = serde_json::from_str(&content)
816 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
817 Ok(Some(ThreadHead {
818 thread,
819 version: head._version.unwrap_or(0),
820 }))
821 } else {
822 let thread: Thread = serde_json::from_str(&content)
823 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
824 Ok(Some(ThreadHead { thread, version: 0 }))
825 }
826 }
827
828 async fn save_head(&self, head: &ThreadHead) -> Result<(), ThreadStoreError> {
830 let mut v = serde_json::to_value(&head.thread)
832 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
833 if let Some(obj) = v.as_object_mut() {
834 obj.insert("_version".to_string(), serde_json::json!(head.version));
835 }
836 let content = serde_json::to_string_pretty(&v)
837 .map_err(|e| ThreadStoreError::Serialization(e.to_string()))?;
838
839 let filename = format!("{}.json", head.thread.id);
840 file_utils::atomic_json_write(&self.base_path, &filename, &content)
841 .await
842 .map_err(ThreadStoreError::Io)
843 }
844}
845
846#[derive(Deserialize)]
848struct VersionedThread {
849 #[serde(default)]
850 _version: Option<Version>,
851}
852
853#[cfg(test)]
854mod tests {
855 use super::*;
856 use serde_json::json;
857 use std::sync::Arc;
858 use tempfile::TempDir;
859 use tirea_contract::{
860 storage::{MailboxEntryStatus, MailboxReader, MailboxWriter, ThreadReader},
861 testing::MailboxEntryBuilder,
862 CheckpointReason, Message, MessageQuery, ThreadWriter,
863 };
864 use tirea_state::{path, Op, Patch, TrackedPatch};
865
866 fn make_thread_with_messages(thread_id: &str, n: usize) -> Thread {
867 let mut thread = Thread::new(thread_id);
868 for i in 0..n {
869 thread = thread.with_message(Message::user(format!("msg-{i}")));
870 }
871 thread
872 }
873
874 #[tokio::test]
875 async fn file_storage_save_load_roundtrip() {
876 let temp_dir = TempDir::new().unwrap();
877 let storage = FileStore::new(temp_dir.path());
878
879 let thread = Thread::new("test-1").with_message(Message::user("hello"));
880 storage.save(&thread).await.unwrap();
881
882 let loaded = storage.load_thread("test-1").await.unwrap().unwrap();
883 assert_eq!(loaded.id, "test-1");
884 assert_eq!(loaded.message_count(), 1);
885 }
886
887 #[tokio::test]
888 async fn file_storage_list_and_delete() {
889 let temp_dir = TempDir::new().unwrap();
890 let storage = FileStore::new(temp_dir.path());
891
892 storage.create(&Thread::new("thread-a")).await.unwrap();
893 storage.create(&Thread::new("thread-b")).await.unwrap();
894 storage.create(&Thread::new("thread-c")).await.unwrap();
895
896 let mut ids = storage.list().await.unwrap();
897 ids.sort();
898 assert_eq!(ids, vec!["thread-a", "thread-b", "thread-c"]);
899
900 storage.delete("thread-b").await.unwrap();
901 let mut ids = storage.list().await.unwrap();
902 ids.sort();
903 assert_eq!(ids, vec!["thread-a", "thread-c"]);
904 }
905
906 #[tokio::test]
907 async fn file_storage_message_queries() {
908 let temp_dir = TempDir::new().unwrap();
909 let storage = FileStore::new(temp_dir.path());
910 let thread = make_thread_with_messages("t1", 10);
911 storage.save(&thread).await.unwrap();
912
913 let page = storage
914 .load_messages(
915 "t1",
916 &MessageQuery {
917 after: Some(4),
918 limit: 3,
919 ..Default::default()
920 },
921 )
922 .await
923 .unwrap();
924 assert_eq!(page.messages.len(), 3);
925 assert_eq!(page.messages[0].cursor, 5);
926 assert_eq!(page.messages[0].message.content, "msg-5");
927 assert_eq!(storage.message_count("t1").await.unwrap(), 10);
928 }
929
930 #[tokio::test]
931 async fn file_storage_append_and_versioning() {
932 let temp_dir = TempDir::new().unwrap();
933 let store = FileStore::new(temp_dir.path());
934 store.create(&Thread::new("t1")).await.unwrap();
935
936 let d1 = ThreadChangeSet {
937 run_id: "run-1".to_string(),
938 parent_run_id: None,
939 run_meta: None,
940 reason: CheckpointReason::UserMessage,
941 messages: vec![Arc::new(Message::user("hello"))],
942 patches: vec![],
943 state_actions: vec![],
944 snapshot: None,
945 };
946 let c1 = store
947 .append("t1", &d1, VersionPrecondition::Exact(0))
948 .await
949 .unwrap();
950 assert_eq!(c1.version, 1);
951
952 let d2 = ThreadChangeSet {
953 run_id: "run-1".to_string(),
954 parent_run_id: None,
955 run_meta: None,
956 reason: CheckpointReason::AssistantTurnCommitted,
957 messages: vec![Arc::new(Message::assistant("hi"))],
958 patches: vec![TrackedPatch::new(
959 Patch::new().with_op(Op::set(path!("greeted"), json!(true))),
960 )],
961 state_actions: vec![],
962 snapshot: None,
963 };
964 let c2 = store
965 .append("t1", &d2, VersionPrecondition::Exact(1))
966 .await
967 .unwrap();
968 assert_eq!(c2.version, 2);
969
970 let d3 = ThreadChangeSet {
971 run_id: "run-1".to_string(),
972 parent_run_id: None,
973 run_meta: None,
974 reason: CheckpointReason::RunFinished,
975 messages: vec![],
976 patches: vec![],
977 state_actions: vec![],
978 snapshot: Some(json!({"greeted": true})),
979 };
980 let c3 = store
981 .append("t1", &d3, VersionPrecondition::Exact(2))
982 .await
983 .unwrap();
984 assert_eq!(c3.version, 3);
985
986 let store2 = FileStore::new(temp_dir.path());
987 let head = store2.load("t1").await.unwrap().unwrap();
988 assert_eq!(head.version, 3);
989 assert_eq!(head.thread.message_count(), 2);
990 assert!(head.thread.patches.is_empty());
991 assert_eq!(head.thread.state, json!({"greeted": true}));
992 }
993
994 #[tokio::test]
995 async fn file_storage_tool_call_message_roundtrip() {
996 let temp_dir = TempDir::new().unwrap();
997 let storage = FileStore::new(temp_dir.path());
998
999 let tool_call = tirea_contract::ToolCall::new("call_1", "search", json!({"query": "rust"}));
1000 let thread = Thread::new("tool-rt")
1001 .with_message(Message::user("Find info about Rust"))
1002 .with_message(Message::assistant_with_tool_calls(
1003 "Let me search for that.",
1004 vec![tool_call],
1005 ))
1006 .with_message(Message::tool(
1007 "call_1",
1008 r#"{"result": "Rust is a language"}"#,
1009 ))
1010 .with_message(Message::assistant(
1011 "Rust is a systems programming language.",
1012 ));
1013
1014 storage.save(&thread).await.unwrap();
1015 let loaded = storage.load_thread("tool-rt").await.unwrap().unwrap();
1016
1017 assert_eq!(loaded.message_count(), 4);
1018
1019 let assistant_msg = &loaded.messages[1];
1021 assert_eq!(assistant_msg.role, tirea_contract::Role::Assistant);
1022 let calls = assistant_msg.tool_calls.as_ref().expect("tool_calls lost");
1023 assert_eq!(calls.len(), 1);
1024 assert_eq!(calls[0].id, "call_1");
1025 assert_eq!(calls[0].name, "search");
1026 assert_eq!(calls[0].arguments, json!({"query": "rust"}));
1027
1028 let tool_msg = &loaded.messages[2];
1030 assert_eq!(tool_msg.role, tirea_contract::Role::Tool);
1031 assert_eq!(tool_msg.tool_call_id.as_deref(), Some("call_1"));
1032 assert_eq!(tool_msg.content, r#"{"result": "Rust is a language"}"#);
1033 }
1034
1035 #[tokio::test]
1036 async fn file_storage_tool_call_message_roundtrip_via_append() {
1037 let temp_dir = TempDir::new().unwrap();
1038 let store = FileStore::new(temp_dir.path());
1039 store.create(&Thread::new("tool-append")).await.unwrap();
1040
1041 let tool_call =
1042 tirea_contract::ToolCall::new("call_42", "calculator", json!({"expr": "6*7"}));
1043 let delta = ThreadChangeSet {
1044 run_id: "run-1".to_string(),
1045 parent_run_id: None,
1046 run_meta: None,
1047 reason: CheckpointReason::AssistantTurnCommitted,
1048 messages: vec![
1049 Arc::new(Message::assistant_with_tool_calls(
1050 "Calculating...",
1051 vec![tool_call],
1052 )),
1053 Arc::new(Message::tool("call_42", r#"{"answer": 42}"#)),
1054 ],
1055 patches: vec![],
1056 state_actions: vec![],
1057 snapshot: None,
1058 };
1059 store
1060 .append("tool-append", &delta, VersionPrecondition::Exact(0))
1061 .await
1062 .unwrap();
1063
1064 let head = store.load("tool-append").await.unwrap().unwrap();
1065 assert_eq!(head.thread.message_count(), 2);
1066
1067 let calls = head.thread.messages[0]
1068 .tool_calls
1069 .as_ref()
1070 .expect("tool_calls lost after append");
1071 assert_eq!(calls[0].id, "call_42");
1072 assert_eq!(calls[0].name, "calculator");
1073
1074 assert_eq!(
1075 head.thread.messages[1].tool_call_id.as_deref(),
1076 Some("call_42")
1077 );
1078 }
1079
1080 #[tokio::test]
1081 async fn file_storage_timestamps_populated() {
1082 let temp_dir = TempDir::new().unwrap();
1083 let store = FileStore::new(temp_dir.path());
1084
1085 store.create(&Thread::new("ts-1")).await.unwrap();
1087 let head = store.load("ts-1").await.unwrap().unwrap();
1088 assert!(head.thread.metadata.created_at.is_some());
1089 assert!(head.thread.metadata.updated_at.is_some());
1090 let created = head.thread.metadata.created_at.unwrap();
1091 let updated = head.thread.metadata.updated_at.unwrap();
1092 assert!(created > 0);
1093 assert_eq!(created, updated);
1094
1095 let delta = ThreadChangeSet {
1097 run_id: "run-1".to_string(),
1098 parent_run_id: None,
1099 run_meta: None,
1100 reason: CheckpointReason::UserMessage,
1101 messages: vec![Arc::new(Message::user("hello"))],
1102 patches: vec![],
1103 state_actions: vec![],
1104 snapshot: None,
1105 };
1106 store
1107 .append("ts-1", &delta, VersionPrecondition::Exact(0))
1108 .await
1109 .unwrap();
1110 let head = store.load("ts-1").await.unwrap().unwrap();
1111 assert!(head.thread.metadata.updated_at.unwrap() >= updated);
1112 assert_eq!(head.thread.metadata.created_at.unwrap(), created);
1113
1114 let thread = Thread::new("ts-2");
1116 assert!(thread.metadata.created_at.is_none());
1117 store.save(&thread).await.unwrap();
1118 let head = store.load("ts-2").await.unwrap().unwrap();
1119 assert!(head.thread.metadata.created_at.is_some());
1120 assert!(head.thread.metadata.updated_at.is_some());
1121 }
1122
1123 #[test]
1124 fn file_storage_rejects_path_traversal() {
1125 let storage = FileStore::new("/base/path");
1126 assert!(storage.thread_path("../../etc/passwd").is_err());
1127 assert!(storage.thread_path("foo/bar").is_err());
1128 assert!(storage.thread_path("foo\\bar").is_err());
1129 assert!(storage.thread_path("").is_err());
1130 assert!(storage.thread_path("foo\0bar").is_err());
1131 }
1132
1133 #[tokio::test]
1134 async fn file_storage_mailbox_claim_and_cancel_roundtrip() {
1135 let temp_dir = TempDir::new().unwrap();
1136 let storage = FileStore::new(temp_dir.path());
1137 let entry = MailboxEntryBuilder::queued("entry-file-mailbox", "mailbox-file-mailbox")
1138 .with_payload(json!({"message": "hello"}))
1139 .build();
1140 storage.enqueue_mailbox_entry(&entry).await.unwrap();
1141
1142 let claimed = storage
1143 .claim_mailbox_entries(None, 1, "worker-file", 10, 5_000)
1144 .await
1145 .unwrap();
1146 assert_eq!(claimed.len(), 1);
1147 assert_eq!(claimed[0].status, MailboxEntryStatus::Claimed);
1148
1149 let cancelled = storage
1150 .cancel_mailbox_entry("entry-file-mailbox", 20)
1151 .await
1152 .unwrap()
1153 .expect("queued entry should be cancellable");
1154 assert_eq!(cancelled.status, MailboxEntryStatus::Cancelled);
1155
1156 let loaded = storage
1157 .load_mailbox_entry(&entry.entry_id)
1158 .await
1159 .unwrap()
1160 .expect("mailbox entry should persist");
1161 assert_eq!(loaded.status, MailboxEntryStatus::Cancelled);
1162 }
1163
1164 #[tokio::test]
1165 async fn file_storage_mailbox_claim_by_entry_id_ignores_available_at() {
1166 let temp_dir = TempDir::new().unwrap();
1167 let storage = FileStore::new(temp_dir.path());
1168 let entry = MailboxEntryBuilder::queued("entry-file-inline", "mailbox-file-inline")
1169 .with_payload(json!({"message": "hello"}))
1170 .with_available_at(i64::MAX as u64)
1171 .build();
1172 storage.enqueue_mailbox_entry(&entry).await.unwrap();
1173
1174 let claimed = storage
1175 .claim_mailbox_entries(None, 1, "worker-file-batch", 10, 5_000)
1176 .await
1177 .unwrap();
1178 assert!(claimed.is_empty());
1179
1180 let targeted = storage
1181 .claim_mailbox_entry("entry-file-inline", "worker-file-inline", 10, 5_000)
1182 .await
1183 .unwrap()
1184 .expect("inline claim should succeed");
1185 assert_eq!(targeted.status, MailboxEntryStatus::Claimed);
1186 assert_eq!(targeted.claimed_by.as_deref(), Some("worker-file-inline"));
1187 }
1188
1189 #[tokio::test]
1190 async fn file_storage_mailbox_interrupt_bumps_generation_and_supersedes_entries() {
1191 let temp_dir = TempDir::new().unwrap();
1192 let storage = FileStore::new(temp_dir.path());
1193 let old_a = MailboxEntryBuilder::queued("entry-file-old-a", "mailbox-file-interrupt")
1194 .with_payload(json!({"message": "hello"}))
1195 .build();
1196 let old_b = MailboxEntryBuilder::queued("entry-file-old-b", "mailbox-file-interrupt")
1197 .with_payload(json!({"message": "hello"}))
1198 .build();
1199 storage.enqueue_mailbox_entry(&old_a).await.unwrap();
1200 storage.enqueue_mailbox_entry(&old_b).await.unwrap();
1201
1202 let interrupted = storage
1203 .interrupt_mailbox("mailbox-file-interrupt", 50)
1204 .await
1205 .unwrap();
1206 assert_eq!(interrupted.mailbox_state.current_generation, 1);
1207 assert_eq!(interrupted.superseded_entries.len(), 2);
1208
1209 let superseded = storage
1210 .load_mailbox_entry("entry-file-old-a")
1211 .await
1212 .unwrap()
1213 .expect("superseded entry should exist");
1214 assert_eq!(superseded.status, MailboxEntryStatus::Superseded);
1215
1216 let next_generation = storage
1217 .ensure_mailbox_state("mailbox-file-interrupt", 60)
1218 .await
1219 .unwrap()
1220 .current_generation;
1221 let fresh = MailboxEntryBuilder::queued("entry-file-fresh", "mailbox-file-interrupt")
1222 .with_payload(json!({"message": "hello"}))
1223 .with_generation(next_generation)
1224 .build();
1225 storage.enqueue_mailbox_entry(&fresh).await.unwrap();
1226
1227 let fresh_loaded = storage
1228 .load_mailbox_entry("entry-file-fresh")
1229 .await
1230 .unwrap()
1231 .expect("fresh entry should exist");
1232 assert_eq!(fresh_loaded.generation, 1);
1233 assert_eq!(fresh_loaded.status, MailboxEntryStatus::Queued);
1234 }
1235
1236 #[tokio::test]
1237 async fn file_storage_mailbox_rejects_duplicate_dedupe_key_in_same_mailbox() {
1238 let temp_dir = TempDir::new().unwrap();
1239 let storage = FileStore::new(temp_dir.path());
1240 let first = MailboxEntryBuilder::queued("entry-file-dedupe-1", "mailbox-file-dedupe")
1241 .with_dedupe_key("dup-key")
1242 .build();
1243 let duplicate = MailboxEntryBuilder::queued("entry-file-dedupe-2", "mailbox-file-dedupe")
1244 .with_dedupe_key("dup-key")
1245 .build();
1246
1247 storage.enqueue_mailbox_entry(&first).await.unwrap();
1248 let result = storage.enqueue_mailbox_entry(&duplicate).await;
1249 assert!(matches!(result, Err(MailboxStoreError::AlreadyExists(_))));
1250 }
1251}