tirea_store_adapters/
memory_run_store.rs

1use async_trait::async_trait;
2use tirea_contract::storage::{
3    paginate_runs_in_memory, RunPage, RunQuery, RunReader, RunRecord, RunStoreError, RunWriter,
4};
5
6/// In-memory run projection store for tests and local development.
7#[derive(Default)]
8pub struct MemoryRunStore {
9    entries: tokio::sync::RwLock<std::collections::HashMap<String, RunRecord>>,
10}
11
12impl MemoryRunStore {
13    pub fn new() -> Self {
14        Self::default()
15    }
16}
17
18#[async_trait]
19impl RunReader for MemoryRunStore {
20    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
21        Ok(self.entries.read().await.get(run_id).cloned())
22    }
23
24    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, RunStoreError> {
25        let entries = self.entries.read().await;
26        let records: Vec<RunRecord> = entries.values().cloned().collect();
27        Ok(paginate_runs_in_memory(&records, query))
28    }
29
30    async fn load_current_run(&self, thread_id: &str) -> Result<Option<RunRecord>, RunStoreError> {
31        let entries = self.entries.read().await;
32        Ok(entries
33            .values()
34            .filter(|r| r.thread_id == thread_id && !r.status.is_terminal())
35            .max_by(|a, b| {
36                a.created_at
37                    .cmp(&b.created_at)
38                    .then_with(|| a.updated_at.cmp(&b.updated_at))
39                    .then_with(|| a.run_id.cmp(&b.run_id))
40            })
41            .cloned())
42    }
43}
44
45#[async_trait]
46impl RunWriter for MemoryRunStore {
47    async fn upsert_run(&self, record: &RunRecord) -> Result<(), RunStoreError> {
48        self.entries
49            .write()
50            .await
51            .insert(record.run_id.clone(), record.clone());
52        Ok(())
53    }
54
55    async fn delete_run(&self, run_id: &str) -> Result<(), RunStoreError> {
56        self.entries.write().await.remove(run_id);
57        Ok(())
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64    use tirea_contract::storage::{RunOrigin, RunStatus};
65
66    #[tokio::test]
67    async fn upsert_load_and_list_runs() {
68        let store = MemoryRunStore::new();
69        let r1 = RunRecord::new(
70            "run-1",
71            "thread-1",
72            "",
73            RunOrigin::AgUi,
74            RunStatus::Running,
75            1,
76        );
77        let r2 = RunRecord::new(
78            "run-2",
79            "thread-2",
80            "",
81            RunOrigin::AiSdk,
82            RunStatus::Waiting,
83            2,
84        );
85        store.upsert_run(&r1).await.expect("upsert run-1");
86        store.upsert_run(&r2).await.expect("upsert run-2");
87
88        let loaded = store
89            .load_run("run-1")
90            .await
91            .expect("load")
92            .expect("exists");
93        assert_eq!(loaded.thread_id, "thread-1");
94
95        let page = store
96            .list_runs(&RunQuery {
97                thread_id: Some("thread-2".to_string()),
98                ..Default::default()
99            })
100            .await
101            .expect("list");
102        assert_eq!(page.total, 1);
103        assert_eq!(page.items[0].run_id, "run-2");
104    }
105
106    #[tokio::test]
107    async fn load_current_run_returns_latest_non_terminal() {
108        let store = MemoryRunStore::new();
109
110        // Older completed run.
111        let mut done = RunRecord::new("run-old", "t1", "", RunOrigin::AgUi, RunStatus::Done, 1);
112        done.updated_at = 2;
113        store.upsert_run(&done).await.unwrap();
114
115        // Newer active run.
116        let mut active = RunRecord::new(
117            "run-active",
118            "t1",
119            "",
120            RunOrigin::AgUi,
121            RunStatus::Running,
122            3,
123        );
124        active.updated_at = 4;
125        store.upsert_run(&active).await.unwrap();
126
127        let current = store.load_current_run("t1").await.unwrap();
128        assert_eq!(
129            current.as_ref().map(|r| r.run_id.as_str()),
130            Some("run-active")
131        );
132    }
133
134    #[tokio::test]
135    async fn load_current_run_returns_none_when_all_terminal() {
136        let store = MemoryRunStore::new();
137        let done = RunRecord::new("run-d", "t2", "", RunOrigin::AiSdk, RunStatus::Done, 1);
138        store.upsert_run(&done).await.unwrap();
139
140        assert!(store.load_current_run("t2").await.unwrap().is_none());
141    }
142
143    #[tokio::test]
144    async fn load_current_run_tiebreaks_by_created_at_then_run_id() {
145        let store = MemoryRunStore::new();
146
147        // Two active runs with same created_at — run_id tiebreaker.
148        let r1 = RunRecord::new("run-a", "t3", "", RunOrigin::AgUi, RunStatus::Running, 10);
149        let r2 = RunRecord::new("run-b", "t3", "", RunOrigin::AgUi, RunStatus::Waiting, 10);
150        store.upsert_run(&r1).await.unwrap();
151        store.upsert_run(&r2).await.unwrap();
152
153        let current = store.load_current_run("t3").await.unwrap().unwrap();
154        assert_eq!(
155            current.run_id, "run-b",
156            "should pick lexicographically later run_id as tiebreaker"
157        );
158    }
159}