tirea_state/
manager.rs

1//! StateManager manages immutable state with patch history.
2//!
3//! All state changes are applied through patches, enabling:
4//! - Full traceability of changes
5//! - State replay to any point in history
6//! - Batch application with conflict detection
7
8use crate::apply::apply_patch_in_place_with_registry;
9use crate::{
10    detect_conflicts, Conflict, ConflictKind, LatticeRegistry, Op, Patch, PatchExt, Path,
11    TireaError, TrackedPatch,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use serde_json::Value;
16use std::sync::Arc;
17use thiserror::Error;
18use tokio::sync::RwLock;
19
20use crate::Lattice;
21
22/// Errors that can occur during state management.
23#[derive(Debug, Error)]
24pub enum StateError {
25    /// Failed to apply a patch operation.
26    #[error("Failed to apply patch: {0}")]
27    ApplyFailed(#[from] TireaError),
28
29    /// Replay index is outside the available history range.
30    #[error("Invalid replay index: {index}, history length: {len}")]
31    InvalidReplayIndex {
32        /// Requested replay index.
33        index: usize,
34        /// Total number of patches in history.
35        len: usize,
36    },
37
38    /// Batch commit contains conflicting patches.
39    #[error("conflicting patch in batch between index {left} and {right} at {path} ({kind:?})")]
40    BatchConflict {
41        /// Left patch index in the batch.
42        left: usize,
43        /// Right patch index in the batch.
44        right: usize,
45        /// Path where conflict was detected.
46        path: Path,
47        /// Conflict classification.
48        kind: ConflictKind,
49    },
50}
51
52/// Result of applying patches.
53#[derive(Debug, Clone)]
54pub struct ApplyResult {
55    /// Number of patches applied.
56    pub patches_applied: usize,
57    /// Number of operations applied.
58    pub ops_applied: usize,
59}
60
61/// StateManager manages immutable state with patch history.
62///
63/// # Design
64///
65/// - State is immutable - all changes go through commits
66/// - Full history is maintained for replay
67/// - Supports batch commits and preview operations
68///
69/// # API Philosophy
70///
71/// - `commit`/`commit_batch`: Mutating operations that modify state and record history
72/// - `preview_patch`: Pure operation that computes result without modifying state
73/// - `replay_to`: Pure operation that reconstructs historical state
74///
75/// # Example
76///
77/// ```ignore
78/// use tirea_state::{StateManager, StateContext};
79/// use serde_json::json;
80///
81/// let manager = StateManager::new(json!({}));
82///
83/// // Get snapshot and create state context
84/// let snapshot = manager.snapshot().await;
85/// let ctx = StateContext::new(&snapshot);
86///
87/// // ... modify state through ctx ...
88///
89/// // Commit patch (modifies state and records history)
90/// manager.commit(ctx.take_tracked_patch("tool:example")).await?;
91///
92/// // Replay to a specific point
93/// let old_state = manager.replay_to(5).await?;
94/// ```
95pub struct StateManager {
96    initial: Arc<RwLock<Value>>,
97    state: Arc<RwLock<Value>>,
98    history: Arc<RwLock<Vec<TrackedPatch>>>,
99    registry: Arc<RwLock<LatticeRegistry>>,
100}
101
102impl StateManager {
103    /// Create a new StateManager with initial state.
104    pub fn new(initial: Value) -> Self {
105        Self {
106            initial: Arc::new(RwLock::new(initial.clone())),
107            state: Arc::new(RwLock::new(initial)),
108            history: Arc::new(RwLock::new(Vec::new())),
109            registry: Arc::new(RwLock::new(LatticeRegistry::new())),
110        }
111    }
112
113    /// Register a lattice type at the given path for automatic merge during apply.
114    pub async fn register_lattice<T>(&self, path: impl Into<Path>)
115    where
116        T: Lattice + Serialize + DeserializeOwned + Send + Sync + 'static,
117    {
118        self.registry.write().await.register::<T>(path);
119    }
120
121    /// Get a snapshot of the current state.
122    pub async fn snapshot(&self) -> Value {
123        self.state.read().await.clone()
124    }
125
126    /// Commit a single patch, modifying state and recording to history.
127    ///
128    /// This is a mutating operation that:
129    /// 1. Applies the patch to current state
130    /// 2. Updates the internal state
131    /// 3. Records the patch in history for replay
132    pub async fn commit(&self, patch: TrackedPatch) -> Result<ApplyResult, StateError> {
133        let ops_count = patch.patch().len();
134
135        let registry = self.registry.read().await;
136        let mut state = self.state.write().await;
137        let mut history = self.history.write().await;
138        let mut new_state = state.clone();
139        apply_patch_in_place_with_registry(&mut new_state, patch.patch(), &registry)?;
140        *state = new_state;
141        history.push(patch);
142
143        Ok(ApplyResult {
144            patches_applied: 1,
145            ops_applied: ops_count,
146        })
147    }
148
149    /// Commit multiple patches in batch.
150    ///
151    /// Patches are applied in order atomically. If any patch fails, no changes
152    /// are persisted to state or history.
153    pub async fn commit_batch(
154        &self,
155        patches: Vec<TrackedPatch>,
156    ) -> Result<ApplyResult, StateError> {
157        if patches.is_empty() {
158            return Ok(ApplyResult {
159                patches_applied: 0,
160                ops_applied: 0,
161            });
162        }
163
164        let registry = self.registry.read().await;
165
166        if let Some((left, right, conflict)) = first_batch_conflict(&patches, &registry) {
167            return Err(StateError::BatchConflict {
168                left,
169                right,
170                path: conflict.path,
171                kind: conflict.kind,
172            });
173        }
174
175        let mut total_ops = 0;
176        let mut state = self.state.write().await;
177        let mut history = self.history.write().await;
178        let mut new_state = state.clone();
179
180        for patch in &patches {
181            total_ops += patch.patch().len();
182            apply_patch_in_place_with_registry(&mut new_state, patch.patch(), &registry)?;
183        }
184        *state = new_state;
185        let patches_count = patches.len();
186        history.extend(patches);
187
188        Ok(ApplyResult {
189            patches_applied: patches_count,
190            ops_applied: total_ops,
191        })
192    }
193
194    /// Preview applying a patch without modifying state (pure operation).
195    ///
196    /// This computes what the state would look like after applying the patch,
197    /// but does not modify the actual state or record to history.
198    ///
199    /// Useful for validation, dry-runs, or showing users what changes would occur.
200    pub async fn preview_patch(&self, patch: &Patch) -> Result<Value, StateError> {
201        let registry = self.registry.read().await;
202        let state = self.state.read().await;
203        let mut preview = state.clone();
204        apply_patch_in_place_with_registry(&mut preview, patch, &registry)?;
205        Ok(preview)
206    }
207
208    /// Deprecated: Use `commit` instead.
209    #[deprecated(since = "0.2.0", note = "Use `commit` instead")]
210    pub async fn apply(&self, patch: TrackedPatch) -> Result<ApplyResult, StateError> {
211        self.commit(patch).await
212    }
213
214    /// Deprecated: Use `commit_batch` instead.
215    #[deprecated(since = "0.2.0", note = "Use `commit_batch` instead")]
216    pub async fn apply_batch(&self, patches: Vec<TrackedPatch>) -> Result<ApplyResult, StateError> {
217        self.commit_batch(patches).await
218    }
219
220    /// Replay state from the beginning up to (and including) the specified index.
221    ///
222    /// Returns the state as it was after applying patches [0..=index] to the initial state.
223    pub async fn replay_to(&self, index: usize) -> Result<Value, StateError> {
224        let registry = self.registry.read().await;
225        let history = self.history.read().await;
226
227        if index >= history.len() {
228            return Err(StateError::InvalidReplayIndex {
229                index,
230                len: history.len(),
231            });
232        }
233
234        let mut state = self.initial.read().await.clone();
235        for patch in history.iter().take(index + 1) {
236            apply_patch_in_place_with_registry(&mut state, patch.patch(), &registry)?;
237        }
238
239        Ok(state)
240    }
241
242    /// Get the full patch history.
243    pub async fn history(&self) -> Vec<TrackedPatch> {
244        self.history.read().await.clone()
245    }
246
247    /// Get the number of patches in history.
248    pub async fn history_len(&self) -> usize {
249        self.history.read().await.len()
250    }
251
252    /// Clear history (keeps current state).
253    ///
254    /// Use with caution - this removes the ability to replay.
255    pub async fn clear_history(&self) {
256        self.history.write().await.clear();
257    }
258
259    /// Prune history, keeping only the last `keep_last` patches.
260    ///
261    /// This is useful for long-running systems to prevent unbounded memory growth.
262    /// The initial state is updated to the state before the remaining patches,
263    /// so `replay_to` will continue to work correctly with the remaining patches.
264    ///
265    /// # Arguments
266    ///
267    /// - `keep_last`: Number of recent patches to keep. If 0, all patches are removed.
268    ///
269    /// # Returns
270    ///
271    /// The number of patches that were removed.
272    pub async fn prune_history(&self, keep_last: usize) -> Result<usize, StateError> {
273        let registry = self.registry.read().await;
274        let mut history = self.history.write().await;
275        let len = history.len();
276
277        if len <= keep_last {
278            return Ok(0);
279        }
280
281        let to_remove = len - keep_last;
282
283        // Compute the new initial state by applying the patches to be removed
284        let mut new_initial = self.initial.read().await.clone();
285        for patch in history.iter().take(to_remove) {
286            apply_patch_in_place_with_registry(&mut new_initial, patch.patch(), &registry)?;
287        }
288
289        // Update initial state and remove old patches
290        *self.initial.write().await = new_initial;
291        history.drain(0..to_remove);
292
293        Ok(to_remove)
294    }
295
296    /// Get a snapshot of the initial state.
297    pub async fn initial(&self) -> Value {
298        self.initial.read().await.clone()
299    }
300}
301
302/// Check whether all ops in `patch` that touch `path` are `LatticeMerge`.
303fn is_lattice_only_at(patch: &Patch, path: &Path) -> bool {
304    patch
305        .ops()
306        .iter()
307        .filter(|op| op.path() == path)
308        .all(|op| matches!(op, Op::LatticeMerge { .. }))
309}
310
311fn first_batch_conflict(
312    patches: &[TrackedPatch],
313    registry: &LatticeRegistry,
314) -> Option<(usize, usize, Conflict)> {
315    let touched: Vec<_> = patches
316        .iter()
317        .map(|patch| patch.patch().touched(false))
318        .collect();
319
320    for left_idx in 0..patches.len() {
321        for right_idx in (left_idx + 1)..patches.len() {
322            for conflict in detect_conflicts(&touched[left_idx], &touched[right_idx]) {
323                // Skip conflicts where both patches only use LatticeMerge at the
324                // conflicting path AND the path is registered in the registry.
325                if registry.get(&conflict.path).is_some()
326                    && is_lattice_only_at(patches[left_idx].patch(), &conflict.path)
327                    && is_lattice_only_at(patches[right_idx].patch(), &conflict.path)
328                {
329                    continue;
330                }
331                return Some((left_idx, right_idx, conflict));
332            }
333        }
334    }
335
336    None
337}
338
339impl Clone for StateManager {
340    fn clone(&self) -> Self {
341        Self {
342            initial: Arc::clone(&self.initial),
343            state: Arc::clone(&self.state),
344            history: Arc::clone(&self.history),
345            registry: Arc::clone(&self.registry),
346        }
347    }
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use crate::{path, Op, Patch};
354    use serde_json::json;
355
356    fn make_patch(ops: Vec<Op>, source: &str) -> TrackedPatch {
357        TrackedPatch::new(Patch::with_ops(ops)).with_source(source)
358    }
359
360    #[tokio::test]
361    async fn test_new_and_snapshot() {
362        let initial = json!({"count": 0});
363        let manager = StateManager::new(initial.clone());
364        let snapshot = manager.snapshot().await;
365        assert_eq!(snapshot, initial);
366    }
367
368    #[tokio::test]
369    async fn test_commit_single() {
370        let manager = StateManager::new(json!({}));
371
372        let patch = make_patch(vec![Op::set(path!("count"), json!(10))], "test");
373
374        let result = manager.commit(patch).await.unwrap();
375        assert_eq!(result.patches_applied, 1);
376        assert_eq!(result.ops_applied, 1);
377
378        let state = manager.snapshot().await;
379        assert_eq!(state["count"], 10);
380    }
381
382    #[tokio::test]
383    async fn test_commit_batch() {
384        let manager = StateManager::new(json!({}));
385
386        let patches = vec![
387            make_patch(vec![Op::set(path!("a"), json!(1))], "test1"),
388            make_patch(vec![Op::set(path!("b"), json!(2))], "test2"),
389        ];
390
391        let result = manager.commit_batch(patches).await.unwrap();
392        assert_eq!(result.patches_applied, 2);
393        assert_eq!(result.ops_applied, 2);
394
395        let state = manager.snapshot().await;
396        assert_eq!(state["a"], 1);
397        assert_eq!(state["b"], 2);
398    }
399
400    #[tokio::test]
401    async fn test_commit_batch_rejects_exact_conflict() {
402        let manager = StateManager::new(json!({"stable": true}));
403
404        let patches = vec![
405            make_patch(vec![Op::set(path!("x"), json!(1))], "left"),
406            make_patch(vec![Op::set(path!("x"), json!(2))], "right"),
407        ];
408
409        let err = manager.commit_batch(patches).await.unwrap_err();
410        assert!(err.to_string().contains("conflicting patch"));
411
412        // Batch is atomic: state/history remain unchanged on conflict.
413        let state = manager.snapshot().await;
414        assert_eq!(state, json!({"stable": true}));
415        assert_eq!(manager.history_len().await, 0);
416    }
417
418    #[tokio::test]
419    async fn test_commit_batch_rejects_prefix_conflict() {
420        let manager = StateManager::new(json!({"stable": true}));
421
422        let patches = vec![
423            make_patch(vec![Op::set(path!("user"), json!({"name": "A"}))], "left"),
424            make_patch(vec![Op::set(path!("user", "name"), json!("B"))], "right"),
425        ];
426
427        let err = manager.commit_batch(patches).await.unwrap_err();
428        assert!(err.to_string().contains("conflicting patch"));
429
430        // Batch is atomic: state/history remain unchanged on conflict.
431        let state = manager.snapshot().await;
432        assert_eq!(state, json!({"stable": true}));
433        assert_eq!(manager.history_len().await, 0);
434    }
435
436    #[tokio::test]
437    async fn test_history() {
438        let manager = StateManager::new(json!({}));
439
440        manager
441            .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
442            .await
443            .unwrap();
444
445        manager
446            .commit(make_patch(vec![Op::set(path!("y"), json!(2))], "s2"))
447            .await
448            .unwrap();
449
450        let history = manager.history().await;
451        assert_eq!(history.len(), 2);
452        assert_eq!(history[0].source.as_deref(), Some("s1"));
453        assert_eq!(history[1].source.as_deref(), Some("s2"));
454    }
455
456    #[tokio::test]
457    async fn test_replay_to() {
458        let manager = StateManager::new(json!({}));
459
460        manager
461            .commit(make_patch(vec![Op::set(path!("count"), json!(1))], "s1"))
462            .await
463            .unwrap();
464
465        manager
466            .commit(make_patch(vec![Op::set(path!("count"), json!(2))], "s2"))
467            .await
468            .unwrap();
469
470        manager
471            .commit(make_patch(vec![Op::set(path!("count"), json!(3))], "s3"))
472            .await
473            .unwrap();
474
475        // Replay to index 0
476        let state0 = manager.replay_to(0).await.unwrap();
477        assert_eq!(state0["count"], 1);
478
479        // Replay to index 1
480        let state1 = manager.replay_to(1).await.unwrap();
481        assert_eq!(state1["count"], 2);
482
483        // Current state unchanged
484        let current = manager.snapshot().await;
485        assert_eq!(current["count"], 3);
486    }
487
488    #[tokio::test]
489    async fn test_replay_invalid_index() {
490        let manager = StateManager::new(json!({}));
491        let result = manager.replay_to(0).await;
492        assert!(result.is_err());
493    }
494
495    #[tokio::test]
496    async fn test_clear_history() {
497        let manager = StateManager::new(json!({}));
498
499        manager
500            .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
501            .await
502            .unwrap();
503
504        assert_eq!(manager.history_len().await, 1);
505
506        manager.clear_history().await;
507
508        assert_eq!(manager.history_len().await, 0);
509
510        // State should be preserved
511        let state = manager.snapshot().await;
512        assert_eq!(state["x"], 1);
513    }
514
515    #[tokio::test]
516    async fn test_clone_shares_state() {
517        let manager1 = StateManager::new(json!({}));
518        let manager2 = manager1.clone();
519
520        manager1
521            .commit(make_patch(vec![Op::set(path!("x"), json!(42))], "s1"))
522            .await
523            .unwrap();
524
525        let state = manager2.snapshot().await;
526        assert_eq!(state["x"], 42);
527    }
528
529    #[tokio::test]
530    async fn test_replay_to_preserves_initial_state() {
531        // Create manager with non-empty initial state
532        let initial = json!({"base_value": 100, "name": "test"});
533        let manager = StateManager::new(initial.clone());
534
535        // Verify initial() returns the initial state
536        assert_eq!(manager.initial().await, initial);
537
538        // Apply patches that modify existing and add new fields
539        manager
540            .commit(make_patch(vec![Op::set(path!("count"), json!(1))], "s1"))
541            .await
542            .unwrap();
543
544        manager
545            .commit(make_patch(vec![Op::set(path!("count"), json!(2))], "s2"))
546            .await
547            .unwrap();
548
549        // Replay to index 0 should have initial state + first patch
550        let state0 = manager.replay_to(0).await.unwrap();
551        assert_eq!(state0["base_value"], 100); // Initial value preserved
552        assert_eq!(state0["name"], "test"); // Initial value preserved
553        assert_eq!(state0["count"], 1); // First patch applied
554
555        // Replay to index 1 should have initial state + both patches
556        let state1 = manager.replay_to(1).await.unwrap();
557        assert_eq!(state1["base_value"], 100); // Initial value preserved
558        assert_eq!(state1["name"], "test"); // Initial value preserved
559        assert_eq!(state1["count"], 2); // Second patch applied
560    }
561
562    #[tokio::test]
563    async fn test_replay_to_with_overwrite() {
564        // Initial state with a value that will be overwritten
565        let initial = json!({"count": 0});
566        let manager = StateManager::new(initial);
567
568        // Patch overwrites the initial count
569        manager
570            .commit(make_patch(vec![Op::set(path!("count"), json!(10))], "s1"))
571            .await
572            .unwrap();
573
574        // Replay should show initial count overwritten
575        let state = manager.replay_to(0).await.unwrap();
576        assert_eq!(state["count"], 10);
577    }
578
579    #[tokio::test]
580    async fn test_prune_history_basic() {
581        let manager = StateManager::new(json!({"base": 0}));
582
583        // Add 5 patches
584        for i in 1..=5 {
585            manager
586                .commit(make_patch(
587                    vec![Op::set(path!("count"), json!(i))],
588                    &format!("s{}", i),
589                ))
590                .await
591                .unwrap();
592        }
593
594        assert_eq!(manager.history_len().await, 5);
595
596        // Keep last 2 patches
597        let removed = manager.prune_history(2).await.unwrap();
598        assert_eq!(removed, 3);
599        assert_eq!(manager.history_len().await, 2);
600
601        // Current state unchanged
602        let current = manager.snapshot().await;
603        assert_eq!(current["count"], 5);
604        assert_eq!(current["base"], 0);
605    }
606
607    #[tokio::test]
608    async fn test_prune_history_updates_initial() {
609        let manager = StateManager::new(json!({"base": 0}));
610
611        // Add 3 patches
612        manager
613            .commit(make_patch(vec![Op::set(path!("a"), json!(1))], "s1"))
614            .await
615            .unwrap();
616        manager
617            .commit(make_patch(vec![Op::set(path!("b"), json!(2))], "s2"))
618            .await
619            .unwrap();
620        manager
621            .commit(make_patch(vec![Op::set(path!("c"), json!(3))], "s3"))
622            .await
623            .unwrap();
624
625        // Keep last 1 patch (remove first 2)
626        manager.prune_history(1).await.unwrap();
627
628        // Initial state should now include patches s1 and s2
629        let initial = manager.initial().await;
630        assert_eq!(initial["base"], 0);
631        assert_eq!(initial["a"], 1);
632        assert_eq!(initial["b"], 2);
633        assert!(initial.get("c").is_none()); // Not in initial, only in remaining patch
634
635        // Replay index 0 should give us state after s3
636        let state = manager.replay_to(0).await.unwrap();
637        assert_eq!(state["a"], 1);
638        assert_eq!(state["b"], 2);
639        assert_eq!(state["c"], 3);
640    }
641
642    #[tokio::test]
643    async fn test_prune_history_keep_all() {
644        let manager = StateManager::new(json!({}));
645
646        manager
647            .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
648            .await
649            .unwrap();
650
651        // Keep more than we have
652        let removed = manager.prune_history(10).await.unwrap();
653        assert_eq!(removed, 0);
654        assert_eq!(manager.history_len().await, 1);
655    }
656
657    #[tokio::test]
658    async fn test_prune_history_keep_zero() {
659        let manager = StateManager::new(json!({"base": 0}));
660
661        manager
662            .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
663            .await
664            .unwrap();
665        manager
666            .commit(make_patch(vec![Op::set(path!("y"), json!(2))], "s2"))
667            .await
668            .unwrap();
669
670        // Keep 0 - remove all history
671        let removed = manager.prune_history(0).await.unwrap();
672        assert_eq!(removed, 2);
673        assert_eq!(manager.history_len().await, 0);
674
675        // Initial should now be current state
676        let initial = manager.initial().await;
677        assert_eq!(initial["base"], 0);
678        assert_eq!(initial["x"], 1);
679        assert_eq!(initial["y"], 2);
680    }
681}