tirea_store_adapters/
file_run_store.rs

1use crate::file_utils;
2use async_trait::async_trait;
3use std::path::PathBuf;
4use tirea_contract::storage::{
5    paginate_runs_in_memory, RunPage, RunQuery, RunReader, RunRecord, RunStoreError, RunWriter,
6};
7
8/// File-based run projection store.
9///
10/// Each run is stored as one JSON file `<run_id>.json` under `base_path`.
11pub struct FileRunStore {
12    base_path: PathBuf,
13}
14
15impl FileRunStore {
16    pub fn new(base_path: impl Into<PathBuf>) -> Self {
17        Self {
18            base_path: base_path.into(),
19        }
20    }
21
22    fn run_path(&self, run_id: &str) -> Result<PathBuf, RunStoreError> {
23        Self::validate_run_id(run_id)?;
24        Ok(self.base_path.join(format!("{run_id}.json")))
25    }
26
27    fn validate_run_id(run_id: &str) -> Result<(), RunStoreError> {
28        file_utils::validate_fs_id(run_id, "run id").map_err(RunStoreError::Backend)
29    }
30
31    async fn save_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
32        let payload = serde_json::to_string_pretty(record)
33            .map_err(|e| RunStoreError::Backend(e.to_string()))?;
34        let filename = format!("{}.json", record.run_id);
35        file_utils::atomic_json_write(&self.base_path, &filename, &payload)
36            .await
37            .map_err(RunStoreError::from)
38    }
39
40    async fn load_all_runs(&self) -> Result<Vec<RunRecord>, RunStoreError> {
41        if !self.base_path.exists() {
42            return Ok(Vec::new());
43        }
44        let mut entries = tokio::fs::read_dir(&self.base_path).await?;
45        let mut records = Vec::new();
46        while let Some(entry) = entries.next_entry().await? {
47            let path = entry.path();
48            if path.extension().is_none_or(|ext| ext != "json") {
49                continue;
50            }
51            let content = tokio::fs::read_to_string(path).await?;
52            let record: RunRecord = serde_json::from_str(&content)
53                .map_err(|e| RunStoreError::Backend(e.to_string()))?;
54            records.push(record);
55        }
56        Ok(records)
57    }
58}
59
60#[async_trait]
61impl RunReader for FileRunStore {
62    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
63        let path = self.run_path(run_id)?;
64        if !path.exists() {
65            return Ok(None);
66        }
67        let content = tokio::fs::read_to_string(path).await?;
68        let record: RunRecord =
69            serde_json::from_str(&content).map_err(|e| RunStoreError::Backend(e.to_string()))?;
70        Ok(Some(record))
71    }
72
73    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, RunStoreError> {
74        let records = self.load_all_runs().await?;
75        Ok(paginate_runs_in_memory(&records, query))
76    }
77
78    async fn load_current_run(&self, thread_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
79        let records = self.load_all_runs().await?;
80        Ok(records
81            .into_iter()
82            .filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
83            .max_by(|a, b| {
84                a.created_at
85                    .cmp(&b.created_at)
86                    .then_with(|| a.updated_at.cmp(&b.updated_at))
87                    .then_with(|| a.run_id.cmp(&b.run_id))
88            }))
89    }
90}
91
92#[async_trait]
93impl RunWriter for FileRunStore {
94    async fn upsert_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
95        self.save_run(record).await
96    }
97
98    async fn delete_run(&self, run_id: &str) -> Result<(), RunStoreError> {
99        let path = self.run_path(run_id)?;
100        if path.exists() {
101            tokio::fs::remove_file(path).await?;
102        }
103        Ok(())
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use tempfile::TempDir;
111    use tirea_contract::storage::{RunOrigin, RunStatus};
112
113    #[tokio::test]
114    async fn run_store_roundtrip() {
115        let temp = TempDir::new().expect("tempdir");
116        let store = FileRunStore::new(temp.path());
117        let mut record = RunRecord::new(
118            "run-roundtrip",
119            "thread-1",
120            "",
121            RunOrigin::A2a,
122            RunStatus::Running,
123            100,
124        );
125        record.updated_at = 120;
126
127        store.upsert_run(&record).await.expect("upsert");
128        let loaded = store
129            .load_run("run-roundtrip")
130            .await
131            .expect("load")
132            .expect("exists");
133        assert_eq!(loaded.thread_id, "thread-1");
134        assert_eq!(loaded.updated_at, 120);
135
136        let page = store.list_runs(&RunQuery::default()).await.expect("list");
137        assert_eq!(page.total, 1);
138
139        store.delete_run("run-roundtrip").await.expect("delete");
140        assert!(store
141            .load_run("run-roundtrip")
142            .await
143            .expect("load after delete")
144            .is_none());
145    }
146
147    #[tokio::test]
148    async fn load_current_run_returns_latest_non_terminal() {
149        let temp = TempDir::new().expect("tempdir");
150        let store = FileRunStore::new(temp.path());
151
152        let mut done = RunRecord::new("run-old", "t1", "", RunOrigin::AgUi, RunStatus::Done, 1);
153        done.updated_at = 2;
154        store.upsert_run(&done).await.unwrap();
155
156        let mut active = RunRecord::new(
157            "run-active",
158            "t1",
159            "",
160            RunOrigin::AgUi,
161            RunStatus::Running,
162            3,
163        );
164        active.updated_at = 4;
165        store.upsert_run(&active).await.unwrap();
166
167        let current = store.load_current_run("t1").await.unwrap();
168        assert_eq!(
169            current.as_ref().map(|r| r.run_id.as_str()),
170            Some("run-active")
171        );
172    }
173
174    #[tokio::test]
175    async fn load_current_run_returns_none_when_all_terminal() {
176        let temp = TempDir::new().expect("tempdir");
177        let store = FileRunStore::new(temp.path());
178
179        let done = RunRecord::new("run-d", "t2", "", RunOrigin::AiSdk, RunStatus::Done, 1);
180        store.upsert_run(&done).await.unwrap();
181
182        assert!(store.load_current_run("t2").await.unwrap().is_none());
183    }
184}