tirea_contract/thread/
changeset.rs1use crate::runtime::state::SerializedStateAction;
4use crate::runtime::RunStatus;
5use crate::storage::RunOrigin;
6use crate::thread::{Message, Thread};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashSet;
10use std::sync::Arc;
11use tirea_state::TrackedPatch;
12
13pub type Version = u64;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18pub enum CheckpointReason {
19 UserMessage,
20 AssistantTurnCommitted,
21 ToolResultsCommitted,
22 RunFinished,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct RunMeta {
32 pub agent_id: String,
33 pub origin: RunOrigin,
34 pub status: RunStatus,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub parent_thread_id: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub termination_code: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub termination_detail: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub source_mailbox_entry_id: Option<String>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ThreadChangeSet {
48 pub run_id: String,
50 #[serde(skip_serializing_if = "Option::is_none")]
52 pub parent_run_id: Option<String>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub run_meta: Option<RunMeta>,
56 pub reason: CheckpointReason,
58 pub messages: Vec<Arc<Message>>,
60 pub patches: Vec<TrackedPatch>,
62 #[serde(default, skip_serializing_if = "Vec::is_empty", rename = "actions")]
64 pub state_actions: Vec<SerializedStateAction>,
65 #[serde(skip_serializing_if = "Option::is_none")]
67 pub snapshot: Option<Value>,
68}
69
70impl ThreadChangeSet {
71 pub fn from_parts(
73 run_id: impl Into<String>,
74 parent_run_id: Option<String>,
75 reason: CheckpointReason,
76 messages: Vec<Arc<Message>>,
77 patches: Vec<TrackedPatch>,
78 state_actions: Vec<SerializedStateAction>,
79 snapshot: Option<Value>,
80 ) -> Self {
81 Self {
82 run_id: run_id.into(),
83 parent_run_id,
84 run_meta: None,
85 reason,
86 messages,
87 patches,
88 state_actions,
89 snapshot,
90 }
91 }
92
93 #[must_use]
95 pub fn with_run_meta(mut self, meta: RunMeta) -> Self {
96 self.run_meta = Some(meta);
97 self
98 }
99
100 pub fn apply_to(&self, thread: &mut Thread) {
106 if let Some(ref snapshot) = self.snapshot {
107 thread.state = snapshot.clone();
108 thread.patches.clear();
109 }
110
111 let mut existing_ids: HashSet<String> = thread
112 .messages
113 .iter()
114 .filter_map(|m| m.id.clone())
115 .collect();
116 for msg in &self.messages {
117 if let Some(ref id) = msg.id {
118 if !existing_ids.insert(id.clone()) {
119 continue;
120 }
121 }
122 thread.messages.push(msg.clone());
123 }
124 thread.patches.extend(self.patches.iter().cloned());
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use crate::thread::{Message, Thread};
132 use serde_json::json;
133
134 fn sample_changeset_with_state_actions() -> ThreadChangeSet {
135 ThreadChangeSet {
136 run_id: "run-1".into(),
137 parent_run_id: None,
138 run_meta: None,
139 reason: CheckpointReason::AssistantTurnCommitted,
140 messages: vec![Arc::new(Message::assistant("hello"))],
141 patches: vec![],
142 state_actions: vec![SerializedStateAction {
143 state_type_name: "TestCounter".into(),
144 base_path: "test_counter".into(),
145 scope: crate::runtime::state::StateScope::Thread,
146 call_id_override: None,
147 payload: json!({"Increment": 1}),
148 }],
149 snapshot: None,
150 }
151 }
152
153 #[test]
154 fn test_changeset_serde_roundtrip_with_state_actions() {
155 let cs = sample_changeset_with_state_actions();
156 assert_eq!(cs.state_actions.len(), 1);
157
158 let json = serde_json::to_string(&cs).unwrap();
159 let restored: ThreadChangeSet = serde_json::from_str(&json).unwrap();
160
161 assert_eq!(restored.state_actions.len(), 1);
162 assert_eq!(restored.state_actions[0].state_type_name, "TestCounter");
163 assert_eq!(restored.state_actions[0].payload, json!({"Increment": 1}));
164 }
165
166 #[test]
167 fn test_changeset_serde_backward_compat_without_state_actions() {
168 let json = r#"{
170 "run_id": "run-1",
171 "reason": "RunFinished",
172 "messages": [],
173 "patches": []
174 }"#;
175 let cs: ThreadChangeSet = serde_json::from_str(json).unwrap();
176 assert!(cs.state_actions.is_empty());
177 }
178
179 #[test]
180 fn test_apply_to_deduplicates_messages() {
181 let msg = Arc::new(Message::user("hello"));
182 let delta = ThreadChangeSet {
183 run_id: "run-1".into(),
184 parent_run_id: None,
185 run_meta: None,
186 reason: CheckpointReason::AssistantTurnCommitted,
187 messages: vec![msg.clone()],
188 patches: vec![],
189 state_actions: vec![],
190 snapshot: None,
191 };
192
193 let mut thread = Thread::new("t1");
194 delta.apply_to(&mut thread);
195 delta.apply_to(&mut thread);
196
197 assert_eq!(
199 thread.messages.len(),
200 1,
201 "apply_to should deduplicate messages by id"
202 );
203 }
204}