tirea_contract/thread/
changeset.rs

1//! Shared persistence change-set types shared by runtime and storage.
2
3use crate::runtime::state::SerializedStateAction;
4use crate::runtime::RunStatus;
5use crate::storage::RunOrigin;
6use crate::thread::{Message, Thread};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::sync::Arc;
11use tirea_state::TrackedPatch;
12
13/// Monotonically increasing version for optimistic concurrency.
14pub type Version = u64;
15
16/// Reason for a checkpoint (delta).
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub enum CheckpointReason {
19    UserMessage,
20    AssistantTurnCommitted,
21    ToolResultsCommitted,
22    RunFinished,
23}
24
25/// Run-level metadata carried in a [`ThreadChangeSet`].
26///
27/// When present, the thread store uses this to maintain a run index.
28/// Set on the first changeset of a run (to create the record) and the last
29/// (to finalize status / termination).
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RunMeta {
32    pub agent_id: String,
33    pub origin: RunOrigin,
34    pub status: RunStatus,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub parent_thread_id: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub termination_code: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub termination_detail: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub source_mailbox_entry_id: Option<String>,
43}
44
45/// An incremental change to a thread produced by a single step.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ThreadChangeSet {
48    /// Which run produced this delta.
49    pub run_id: String,
50    /// Parent run (for sub-agent deltas).
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub parent_run_id: Option<String>,
53    /// Run-level metadata for run index maintenance.
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub run_meta: Option<RunMeta>,
56    /// Why this delta was created.
57    pub reason: CheckpointReason,
58    /// New messages appended in this step.
59    pub messages: Vec<Arc<Message>>,
60    /// New patches appended in this step.
61    pub patches: Vec<TrackedPatch>,
62    /// Serialized state actions captured during this step (intent log).
63    #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "actions")]
64    pub state_actions: Vec<SerializedStateAction>,
65    /// If `Some`, a full state snapshot was taken (replaces base state).
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub snapshot: Option<Value>,
68}
69
70impl ThreadChangeSet {
71    /// Build a `ThreadChangeSet` from explicit delta components.
72    pub fn from_parts(
73        run_id: impl Into<String>,
74        parent_run_id: Option<String>,
75        reason: CheckpointReason,
76        messages: Vec<Arc<Message>>,
77        patches: Vec<TrackedPatch>,
78        state_actions: Vec<SerializedStateAction>,
79        snapshot: Option<Value>,
80    ) -> Self {
81        Self {
82            run_id: run_id.into(),
83            parent_run_id,
84            run_meta: None,
85            reason,
86            messages,
87            patches,
88            state_actions,
89            snapshot,
90        }
91    }
92
93    /// Attach run-level metadata for run index maintenance.
94    #[must_use]
95    pub fn with_run_meta(mut self, meta: RunMeta) -> Self {
96        self.run_meta = Some(meta);
97        self
98    }
99
100    /// Apply this delta to a thread in place.
101    ///
102    /// Messages are deduplicated by `id` — if a message with the same id
103    /// already exists in the thread it is skipped. Messages without an id
104    /// are always appended.
105    pub fn apply_to(&self, thread: &mut Thread) {
106        if let Some(ref snapshot) = self.snapshot {
107            thread.state = snapshot.clone();
108            thread.patches.clear();
109        }
110
111        let mut existing_ids: HashSet<String> = thread
112            .messages
113            .iter()
114            .filter_map(|m| m.id.clone())
115            .collect();
116        for msg in &self.messages {
117            if let Some(ref id) = msg.id {
118                if !existing_ids.insert(id.clone()) {
119                    continue;
120                }
121            }
122            thread.messages.push(msg.clone());
123        }
124        thread.patches.extend(self.patches.iter().cloned());
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use crate::thread::{Message, Thread};
132    use serde_json::json;
133
134    fn sample_changeset_with_state_actions() -> ThreadChangeSet {
135        ThreadChangeSet {
136            run_id: "run-1".into(),
137            parent_run_id: None,
138            run_meta: None,
139            reason: CheckpointReason::AssistantTurnCommitted,
140            messages: vec![Arc::new(Message::assistant("hello"))],
141            patches: vec![],
142            state_actions: vec![SerializedStateAction {
143                state_type_name: "TestCounter".into(),
144                base_path: "test_counter".into(),
145                scope: crate::runtime::state::StateScope::Thread,
146                call_id_override: None,
147                payload: json!({"Increment": 1}),
148            }],
149            snapshot: None,
150        }
151    }
152
153    #[test]
154    fn test_changeset_serde_roundtrip_with_state_actions() {
155        let cs = sample_changeset_with_state_actions();
156        assert_eq!(cs.state_actions.len(), 1);
157
158        let json = serde_json::to_string(&cs).unwrap();
159        let restored: ThreadChangeSet = serde_json::from_str(&json).unwrap();
160
161        assert_eq!(restored.state_actions.len(), 1);
162        assert_eq!(restored.state_actions[0].state_type_name, "TestCounter");
163        assert_eq!(restored.state_actions[0].payload, json!({"Increment": 1}));
164    }
165
166    #[test]
167    fn test_changeset_serde_backward_compat_without_state_actions() {
168        // Simulate old JSON that has no `actions` field.
169        let json = r#"{
170            "run_id": "run-1",
171            "reason": "RunFinished",
172            "messages": [],
173            "patches": []
174        }"#;
175        let cs: ThreadChangeSet = serde_json::from_str(json).unwrap();
176        assert!(cs.state_actions.is_empty());
177    }
178
179    #[test]
180    fn test_apply_to_deduplicates_messages() {
181        let msg = Arc::new(Message::user("hello"));
182        let delta = ThreadChangeSet {
183            run_id: "run-1".into(),
184            parent_run_id: None,
185            run_meta: None,
186            reason: CheckpointReason::AssistantTurnCommitted,
187            messages: vec![msg.clone()],
188            patches: vec![],
189            state_actions: vec![],
190            snapshot: None,
191        };
192
193        let mut thread = Thread::new("t1");
194        delta.apply_to(&mut thread);
195        delta.apply_to(&mut thread);
196
197        // The same message (by id) applied twice should appear only once.
198        assert_eq!(
199            thread.messages.len(),
200            1,
201            "apply_to should deduplicate messages by id"
202        );
203    }
204}