tirea_store_adapters/
file_run_store.rs1use crate::file_utils;
2use async_trait::async_trait;
3use std::path::PathBuf;
4use tirea_contract::storage::{
5 paginate_runs_in_memory, RunPage, RunQuery, RunReader, RunRecord, RunStoreError, RunWriter,
6};
7
8pub struct FileRunStore {
12 base_path: PathBuf,
13}
14
15impl FileRunStore {
16 pub fn new(base_path: impl Into<PathBuf>) -> Self {
17 Self {
18 base_path: base_path.into(),
19 }
20 }
21
22 fn run_path(&self, run_id: &str) -> Result<PathBuf, RunStoreError> {
23 Self::validate_run_id(run_id)?;
24 Ok(self.base_path.join(format!("{run_id}.json")))
25 }
26
27 fn validate_run_id(run_id: &str) -> Result<(), RunStoreError> {
28 file_utils::validate_fs_id(run_id, "run id").map_err(RunStoreError::Backend)
29 }
30
31 async fn save_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
32 let payload = serde_json::to_string_pretty(record)
33 .map_err(|e| RunStoreError::Backend(e.to_string()))?;
34 let filename = format!("{}.json", record.run_id);
35 file_utils::atomic_json_write(&self.base_path, &filename, &payload)
36 .await
37 .map_err(RunStoreError::from)
38 }
39
40 async fn load_all_runs(&self) -> Result<Vec<RunRecord>, RunStoreError> {
41 if !self.base_path.exists() {
42 return Ok(Vec::new());
43 }
44 let mut entries = tokio::fs::read_dir(&self.base_path).await?;
45 let mut records = Vec::new();
46 while let Some(entry) = entries.next_entry().await? {
47 let path = entry.path();
48 if path.extension().is_none_or(|ext| ext != "json") {
49 continue;
50 }
51 let content = tokio::fs::read_to_string(path).await?;
52 let record: RunRecord = serde_json::from_str(&content)
53 .map_err(|e| RunStoreError::Backend(e.to_string()))?;
54 records.push(record);
55 }
56 Ok(records)
57 }
58}
59
60#[async_trait]
61impl RunReader for FileRunStore {
62 async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
63 let path = self.run_path(run_id)?;
64 if !path.exists() {
65 return Ok(None);
66 }
67 let content = tokio::fs::read_to_string(path).await?;
68 let record: RunRecord =
69 serde_json::from_str(&content).map_err(|e| RunStoreError::Backend(e.to_string()))?;
70 Ok(Some(record))
71 }
72
73 async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, RunStoreError> {
74 let records = self.load_all_runs().await?;
75 Ok(paginate_runs_in_memory(&records, query))
76 }
77
78 async fn load_current_run(&self, thread_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
79 let records = self.load_all_runs().await?;
80 Ok(records
81 .into_iter()
82 .filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
83 .max_by(|a, b| {
84 a.created_at
85 .cmp(&b.created_at)
86 .then_with(|| a.updated_at.cmp(&b.updated_at))
87 .then_with(|| a.run_id.cmp(&b.run_id))
88 }))
89 }
90}
91
92#[async_trait]
93impl RunWriter for FileRunStore {
94 async fn upsert_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
95 self.save_run(record).await
96 }
97
98 async fn delete_run(&self, run_id: &str) -> Result<(), RunStoreError> {
99 let path = self.run_path(run_id)?;
100 if path.exists() {
101 tokio::fs::remove_file(path).await?;
102 }
103 Ok(())
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use tempfile::TempDir;
111 use tirea_contract::storage::{RunOrigin, RunStatus};
112
113 #[tokio::test]
114 async fn run_store_roundtrip() {
115 let temp = TempDir::new().expect("tempdir");
116 let store = FileRunStore::new(temp.path());
117 let mut record = RunRecord::new(
118 "run-roundtrip",
119 "thread-1",
120 "",
121 RunOrigin::A2a,
122 RunStatus::Running,
123 100,
124 );
125 record.updated_at = 120;
126
127 store.upsert_run(&record).await.expect("upsert");
128 let loaded = store
129 .load_run("run-roundtrip")
130 .await
131 .expect("load")
132 .expect("exists");
133 assert_eq!(loaded.thread_id, "thread-1");
134 assert_eq!(loaded.updated_at, 120);
135
136 let page = store.list_runs(&RunQuery::default()).await.expect("list");
137 assert_eq!(page.total, 1);
138
139 store.delete_run("run-roundtrip").await.expect("delete");
140 assert!(store
141 .load_run("run-roundtrip")
142 .await
143 .expect("load after delete")
144 .is_none());
145 }
146
147 #[tokio::test]
148 async fn load_current_run_returns_latest_non_terminal() {
149 let temp = TempDir::new().expect("tempdir");
150 let store = FileRunStore::new(temp.path());
151
152 let mut done = RunRecord::new("run-old", "t1", "", RunOrigin::AgUi, RunStatus::Done, 1);
153 done.updated_at = 2;
154 store.upsert_run(&done).await.unwrap();
155
156 let mut active = RunRecord::new(
157 "run-active",
158 "t1",
159 "",
160 RunOrigin::AgUi,
161 RunStatus::Running,
162 3,
163 );
164 active.updated_at = 4;
165 store.upsert_run(&active).await.unwrap();
166
167 let current = store.load_current_run("t1").await.unwrap();
168 assert_eq!(
169 current.as_ref().map(|r| r.run_id.as_str()),
170 Some("run-active")
171 );
172 }
173
174 #[tokio::test]
175 async fn load_current_run_returns_none_when_all_terminal() {
176 let temp = TempDir::new().expect("tempdir");
177 let store = FileRunStore::new(temp.path());
178
179 let done = RunRecord::new("run-d", "t2", "", RunOrigin::AiSdk, RunStatus::Done, 1);
180 store.upsert_run(&done).await.unwrap();
181
182 assert!(store.load_current_run("t2").await.unwrap().is_none());
183 }
184}