1use crate::apply::apply_patch_in_place_with_registry;
9use crate::{
10 detect_conflicts, Conflict, ConflictKind, LatticeRegistry, Op, Patch, PatchExt, Path,
11 TireaError, TrackedPatch,
12};
13use serde::de::DeserializeOwned;
14use serde::Serialize;
15use serde_json::Value;
16use std::sync::Arc;
17use thiserror::Error;
18use tokio::sync::RwLock;
19
20use crate::Lattice;
21
22#[derive(Debug, Error)]
24pub enum StateError {
25 #[error("Failed to apply patch: {0}")]
27 ApplyFailed(#[from] TireaError),
28
29 #[error("Invalid replay index: {index}, history length: {len}")]
31 InvalidReplayIndex {
32 index: usize,
34 len: usize,
36 },
37
38 #[error("conflicting patch in batch between index {left} and {right} at {path} ({kind:?})")]
40 BatchConflict {
41 left: usize,
43 right: usize,
45 path: Path,
47 kind: ConflictKind,
49 },
50}
51
52#[derive(Debug, Clone)]
54pub struct ApplyResult {
55 pub patches_applied: usize,
57 pub ops_applied: usize,
59}
60
61pub struct StateManager {
96 initial: Arc<RwLock<Value>>,
97 state: Arc<RwLock<Value>>,
98 history: Arc<RwLock<Vec<TrackedPatch>>>,
99 registry: Arc<RwLock<LatticeRegistry>>,
100}
101
102impl StateManager {
103 pub fn new(initial: Value) -> Self {
105 Self {
106 initial: Arc::new(RwLock::new(initial.clone())),
107 state: Arc::new(RwLock::new(initial)),
108 history: Arc::new(RwLock::new(Vec::new())),
109 registry: Arc::new(RwLock::new(LatticeRegistry::new())),
110 }
111 }
112
113 pub async fn register_lattice<T>(&self, path: impl Into<Path>)
115 where
116 T: Lattice + Serialize + DeserializeOwned + Send + Sync + 'static,
117 {
118 self.registry.write().await.register::<T>(path);
119 }
120
121 pub async fn snapshot(&self) -> Value {
123 self.state.read().await.clone()
124 }
125
126 pub async fn commit(&self, patch: TrackedPatch) -> Result<ApplyResult, StateError> {
133 let ops_count = patch.patch().len();
134
135 let registry = self.registry.read().await;
136 let mut state = self.state.write().await;
137 let mut history = self.history.write().await;
138 let mut new_state = state.clone();
139 apply_patch_in_place_with_registry(&mut new_state, patch.patch(), ®istry)?;
140 *state = new_state;
141 history.push(patch);
142
143 Ok(ApplyResult {
144 patches_applied: 1,
145 ops_applied: ops_count,
146 })
147 }
148
149 pub async fn commit_batch(
154 &self,
155 patches: Vec<TrackedPatch>,
156 ) -> Result<ApplyResult, StateError> {
157 if patches.is_empty() {
158 return Ok(ApplyResult {
159 patches_applied: 0,
160 ops_applied: 0,
161 });
162 }
163
164 let registry = self.registry.read().await;
165
166 if let Some((left, right, conflict)) = first_batch_conflict(&patches, ®istry) {
167 return Err(StateError::BatchConflict {
168 left,
169 right,
170 path: conflict.path,
171 kind: conflict.kind,
172 });
173 }
174
175 let mut total_ops = 0;
176 let mut state = self.state.write().await;
177 let mut history = self.history.write().await;
178 let mut new_state = state.clone();
179
180 for patch in &patches {
181 total_ops += patch.patch().len();
182 apply_patch_in_place_with_registry(&mut new_state, patch.patch(), ®istry)?;
183 }
184 *state = new_state;
185 let patches_count = patches.len();
186 history.extend(patches);
187
188 Ok(ApplyResult {
189 patches_applied: patches_count,
190 ops_applied: total_ops,
191 })
192 }
193
194 pub async fn preview_patch(&self, patch: &Patch) -> Result<Value, StateError> {
201 let registry = self.registry.read().await;
202 let state = self.state.read().await;
203 let mut preview = state.clone();
204 apply_patch_in_place_with_registry(&mut preview, patch, ®istry)?;
205 Ok(preview)
206 }
207
208 #[deprecated(since = "0.2.0", note = "Use `commit` instead")]
210 pub async fn apply(&self, patch: TrackedPatch) -> Result<ApplyResult, StateError> {
211 self.commit(patch).await
212 }
213
214 #[deprecated(since = "0.2.0", note = "Use `commit_batch` instead")]
216 pub async fn apply_batch(&self, patches: Vec<TrackedPatch>) -> Result<ApplyResult, StateError> {
217 self.commit_batch(patches).await
218 }
219
220 pub async fn replay_to(&self, index: usize) -> Result<Value, StateError> {
224 let registry = self.registry.read().await;
225 let history = self.history.read().await;
226
227 if index >= history.len() {
228 return Err(StateError::InvalidReplayIndex {
229 index,
230 len: history.len(),
231 });
232 }
233
234 let mut state = self.initial.read().await.clone();
235 for patch in history.iter().take(index + 1) {
236 apply_patch_in_place_with_registry(&mut state, patch.patch(), ®istry)?;
237 }
238
239 Ok(state)
240 }
241
242 pub async fn history(&self) -> Vec<TrackedPatch> {
244 self.history.read().await.clone()
245 }
246
247 pub async fn history_len(&self) -> usize {
249 self.history.read().await.len()
250 }
251
252 pub async fn clear_history(&self) {
256 self.history.write().await.clear();
257 }
258
259 pub async fn prune_history(&self, keep_last: usize) -> Result<usize, StateError> {
273 let registry = self.registry.read().await;
274 let mut history = self.history.write().await;
275 let len = history.len();
276
277 if len <= keep_last {
278 return Ok(0);
279 }
280
281 let to_remove = len - keep_last;
282
283 let mut new_initial = self.initial.read().await.clone();
285 for patch in history.iter().take(to_remove) {
286 apply_patch_in_place_with_registry(&mut new_initial, patch.patch(), ®istry)?;
287 }
288
289 *self.initial.write().await = new_initial;
291 history.drain(0..to_remove);
292
293 Ok(to_remove)
294 }
295
296 pub async fn initial(&self) -> Value {
298 self.initial.read().await.clone()
299 }
300}
301
302fn is_lattice_only_at(patch: &Patch, path: &Path) -> bool {
304 patch
305 .ops()
306 .iter()
307 .filter(|op| op.path() == path)
308 .all(|op| matches!(op, Op::LatticeMerge { .. }))
309}
310
311fn first_batch_conflict(
312 patches: &[TrackedPatch],
313 registry: &LatticeRegistry,
314) -> Option<(usize, usize, Conflict)> {
315 let touched: Vec<_> = patches
316 .iter()
317 .map(|patch| patch.patch().touched(false))
318 .collect();
319
320 for left_idx in 0..patches.len() {
321 for right_idx in (left_idx + 1)..patches.len() {
322 for conflict in detect_conflicts(&touched[left_idx], &touched[right_idx]) {
323 if registry.get(&conflict.path).is_some()
326 && is_lattice_only_at(patches[left_idx].patch(), &conflict.path)
327 && is_lattice_only_at(patches[right_idx].patch(), &conflict.path)
328 {
329 continue;
330 }
331 return Some((left_idx, right_idx, conflict));
332 }
333 }
334 }
335
336 None
337}
338
339impl Clone for StateManager {
340 fn clone(&self) -> Self {
341 Self {
342 initial: Arc::clone(&self.initial),
343 state: Arc::clone(&self.state),
344 history: Arc::clone(&self.history),
345 registry: Arc::clone(&self.registry),
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use crate::{path, Op, Patch};
354 use serde_json::json;
355
356 fn make_patch(ops: Vec<Op>, source: &str) -> TrackedPatch {
357 TrackedPatch::new(Patch::with_ops(ops)).with_source(source)
358 }
359
360 #[tokio::test]
361 async fn test_new_and_snapshot() {
362 let initial = json!({"count": 0});
363 let manager = StateManager::new(initial.clone());
364 let snapshot = manager.snapshot().await;
365 assert_eq!(snapshot, initial);
366 }
367
368 #[tokio::test]
369 async fn test_commit_single() {
370 let manager = StateManager::new(json!({}));
371
372 let patch = make_patch(vec![Op::set(path!("count"), json!(10))], "test");
373
374 let result = manager.commit(patch).await.unwrap();
375 assert_eq!(result.patches_applied, 1);
376 assert_eq!(result.ops_applied, 1);
377
378 let state = manager.snapshot().await;
379 assert_eq!(state["count"], 10);
380 }
381
382 #[tokio::test]
383 async fn test_commit_batch() {
384 let manager = StateManager::new(json!({}));
385
386 let patches = vec![
387 make_patch(vec![Op::set(path!("a"), json!(1))], "test1"),
388 make_patch(vec![Op::set(path!("b"), json!(2))], "test2"),
389 ];
390
391 let result = manager.commit_batch(patches).await.unwrap();
392 assert_eq!(result.patches_applied, 2);
393 assert_eq!(result.ops_applied, 2);
394
395 let state = manager.snapshot().await;
396 assert_eq!(state["a"], 1);
397 assert_eq!(state["b"], 2);
398 }
399
400 #[tokio::test]
401 async fn test_commit_batch_rejects_exact_conflict() {
402 let manager = StateManager::new(json!({"stable": true}));
403
404 let patches = vec![
405 make_patch(vec![Op::set(path!("x"), json!(1))], "left"),
406 make_patch(vec![Op::set(path!("x"), json!(2))], "right"),
407 ];
408
409 let err = manager.commit_batch(patches).await.unwrap_err();
410 assert!(err.to_string().contains("conflicting patch"));
411
412 let state = manager.snapshot().await;
414 assert_eq!(state, json!({"stable": true}));
415 assert_eq!(manager.history_len().await, 0);
416 }
417
418 #[tokio::test]
419 async fn test_commit_batch_rejects_prefix_conflict() {
420 let manager = StateManager::new(json!({"stable": true}));
421
422 let patches = vec![
423 make_patch(vec![Op::set(path!("user"), json!({"name": "A"}))], "left"),
424 make_patch(vec![Op::set(path!("user", "name"), json!("B"))], "right"),
425 ];
426
427 let err = manager.commit_batch(patches).await.unwrap_err();
428 assert!(err.to_string().contains("conflicting patch"));
429
430 let state = manager.snapshot().await;
432 assert_eq!(state, json!({"stable": true}));
433 assert_eq!(manager.history_len().await, 0);
434 }
435
436 #[tokio::test]
437 async fn test_history() {
438 let manager = StateManager::new(json!({}));
439
440 manager
441 .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
442 .await
443 .unwrap();
444
445 manager
446 .commit(make_patch(vec![Op::set(path!("y"), json!(2))], "s2"))
447 .await
448 .unwrap();
449
450 let history = manager.history().await;
451 assert_eq!(history.len(), 2);
452 assert_eq!(history[0].source.as_deref(), Some("s1"));
453 assert_eq!(history[1].source.as_deref(), Some("s2"));
454 }
455
456 #[tokio::test]
457 async fn test_replay_to() {
458 let manager = StateManager::new(json!({}));
459
460 manager
461 .commit(make_patch(vec![Op::set(path!("count"), json!(1))], "s1"))
462 .await
463 .unwrap();
464
465 manager
466 .commit(make_patch(vec![Op::set(path!("count"), json!(2))], "s2"))
467 .await
468 .unwrap();
469
470 manager
471 .commit(make_patch(vec![Op::set(path!("count"), json!(3))], "s3"))
472 .await
473 .unwrap();
474
475 let state0 = manager.replay_to(0).await.unwrap();
477 assert_eq!(state0["count"], 1);
478
479 let state1 = manager.replay_to(1).await.unwrap();
481 assert_eq!(state1["count"], 2);
482
483 let current = manager.snapshot().await;
485 assert_eq!(current["count"], 3);
486 }
487
488 #[tokio::test]
489 async fn test_replay_invalid_index() {
490 let manager = StateManager::new(json!({}));
491 let result = manager.replay_to(0).await;
492 assert!(result.is_err());
493 }
494
495 #[tokio::test]
496 async fn test_clear_history() {
497 let manager = StateManager::new(json!({}));
498
499 manager
500 .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
501 .await
502 .unwrap();
503
504 assert_eq!(manager.history_len().await, 1);
505
506 manager.clear_history().await;
507
508 assert_eq!(manager.history_len().await, 0);
509
510 let state = manager.snapshot().await;
512 assert_eq!(state["x"], 1);
513 }
514
515 #[tokio::test]
516 async fn test_clone_shares_state() {
517 let manager1 = StateManager::new(json!({}));
518 let manager2 = manager1.clone();
519
520 manager1
521 .commit(make_patch(vec![Op::set(path!("x"), json!(42))], "s1"))
522 .await
523 .unwrap();
524
525 let state = manager2.snapshot().await;
526 assert_eq!(state["x"], 42);
527 }
528
529 #[tokio::test]
530 async fn test_replay_to_preserves_initial_state() {
531 let initial = json!({"base_value": 100, "name": "test"});
533 let manager = StateManager::new(initial.clone());
534
535 assert_eq!(manager.initial().await, initial);
537
538 manager
540 .commit(make_patch(vec![Op::set(path!("count"), json!(1))], "s1"))
541 .await
542 .unwrap();
543
544 manager
545 .commit(make_patch(vec![Op::set(path!("count"), json!(2))], "s2"))
546 .await
547 .unwrap();
548
549 let state0 = manager.replay_to(0).await.unwrap();
551 assert_eq!(state0["base_value"], 100); assert_eq!(state0["name"], "test"); assert_eq!(state0["count"], 1); let state1 = manager.replay_to(1).await.unwrap();
557 assert_eq!(state1["base_value"], 100); assert_eq!(state1["name"], "test"); assert_eq!(state1["count"], 2); }
561
562 #[tokio::test]
563 async fn test_replay_to_with_overwrite() {
564 let initial = json!({"count": 0});
566 let manager = StateManager::new(initial);
567
568 manager
570 .commit(make_patch(vec![Op::set(path!("count"), json!(10))], "s1"))
571 .await
572 .unwrap();
573
574 let state = manager.replay_to(0).await.unwrap();
576 assert_eq!(state["count"], 10);
577 }
578
579 #[tokio::test]
580 async fn test_prune_history_basic() {
581 let manager = StateManager::new(json!({"base": 0}));
582
583 for i in 1..=5 {
585 manager
586 .commit(make_patch(
587 vec![Op::set(path!("count"), json!(i))],
588 &format!("s{}", i),
589 ))
590 .await
591 .unwrap();
592 }
593
594 assert_eq!(manager.history_len().await, 5);
595
596 let removed = manager.prune_history(2).await.unwrap();
598 assert_eq!(removed, 3);
599 assert_eq!(manager.history_len().await, 2);
600
601 let current = manager.snapshot().await;
603 assert_eq!(current["count"], 5);
604 assert_eq!(current["base"], 0);
605 }
606
607 #[tokio::test]
608 async fn test_prune_history_updates_initial() {
609 let manager = StateManager::new(json!({"base": 0}));
610
611 manager
613 .commit(make_patch(vec![Op::set(path!("a"), json!(1))], "s1"))
614 .await
615 .unwrap();
616 manager
617 .commit(make_patch(vec![Op::set(path!("b"), json!(2))], "s2"))
618 .await
619 .unwrap();
620 manager
621 .commit(make_patch(vec![Op::set(path!("c"), json!(3))], "s3"))
622 .await
623 .unwrap();
624
625 manager.prune_history(1).await.unwrap();
627
628 let initial = manager.initial().await;
630 assert_eq!(initial["base"], 0);
631 assert_eq!(initial["a"], 1);
632 assert_eq!(initial["b"], 2);
633 assert!(initial.get("c").is_none()); let state = manager.replay_to(0).await.unwrap();
637 assert_eq!(state["a"], 1);
638 assert_eq!(state["b"], 2);
639 assert_eq!(state["c"], 3);
640 }
641
642 #[tokio::test]
643 async fn test_prune_history_keep_all() {
644 let manager = StateManager::new(json!({}));
645
646 manager
647 .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
648 .await
649 .unwrap();
650
651 let removed = manager.prune_history(10).await.unwrap();
653 assert_eq!(removed, 0);
654 assert_eq!(manager.history_len().await, 1);
655 }
656
657 #[tokio::test]
658 async fn test_prune_history_keep_zero() {
659 let manager = StateManager::new(json!({"base": 0}));
660
661 manager
662 .commit(make_patch(vec![Op::set(path!("x"), json!(1))], "s1"))
663 .await
664 .unwrap();
665 manager
666 .commit(make_patch(vec![Op::set(path!("y"), json!(2))], "s2"))
667 .await
668 .unwrap();
669
670 let removed = manager.prune_history(0).await.unwrap();
672 assert_eq!(removed, 2);
673 assert_eq!(manager.history_len().await, 0);
674
675 let initial = manager.initial().await;
677 assert_eq!(initial["base"], 0);
678 assert_eq!(initial["x"], 1);
679 assert_eq!(initial["y"], 2);
680 }
681}