pub struct NatsBufferedThreadWriter { /* private fields */ }Expand description
A ThreadWriter decorator that buffers deltas in NATS JetStream and
flushes the final thread to the inner storage at run end.
§Query consistency (CQRS)
load always reads from the inner (durable) storage.
During an active run, queries return the last-flushed snapshot — they do
not include deltas that are buffered in NATS but not yet flushed.
Real-time data for in-progress runs is delivered through the SSE/NATS event stream. Callers that need up-to-date messages during a run should consume the event stream rather than polling the query API.
Implementations§
Source§impl NatsBufferedThreadWriter
impl NatsBufferedThreadWriter
Sourcepub async fn new(
inner: Arc<dyn ThreadStore>,
jetstream: Context,
) -> Result<Self, Error>
pub async fn new( inner: Arc<dyn ThreadStore>, jetstream: Context, ) -> Result<Self, Error>
Create a new buffered storage.
inner is the durable backend (e.g. PostgreSQL) used for create,
load, delete, and the final save at run end.
jetstream is an already-connected JetStream context.
Sourcepub async fn recover(&self) -> Result<usize, NatsBufferedThreadWriterError>
pub async fn recover(&self) -> Result<usize, NatsBufferedThreadWriterError>
Recover incomplete runs after a crash.
Replays any unacked deltas from the JetStream stream, applies them to the corresponding threads loaded from the inner storage, and saves the result. Acked messages are then purged.
Trait Implementations§
Source§impl ThreadReader for NatsBufferedThreadWriter
impl ThreadReader for NatsBufferedThreadWriter
Source§fn load<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<ThreadHead>, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn load<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<ThreadHead>, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn list_threads<'life0, 'life1, 'async_trait>(
&'life0 self,
query: &'life1 ThreadListQuery,
) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_threads<'life0, 'life1, 'async_trait>(
&'life0 self,
query: &'life1 ThreadListQuery,
) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn load_thread<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<Thread>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn load_thread<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<Thread>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn load_messages<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
query: &'life2 MessageQuery,
) -> Pin<Box<dyn Future<Output = Result<MessagePage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
fn load_messages<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
query: &'life2 MessageQuery,
) -> Pin<Box<dyn Future<Output = Result<MessagePage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
Source§fn list<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn list<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn list_paginated<'life0, 'life1, 'async_trait>(
&'life0 self,
query: &'life1 ThreadListQuery,
) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn list_paginated<'life0, 'life1, 'async_trait>(
&'life0 self,
query: &'life1 ThreadListQuery,
) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn message_count<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<usize, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn message_count<'life0, 'life1, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<usize, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn load_run<'life0, 'life1, 'async_trait>(
&'life0 self,
_run_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn load_run<'life0, 'life1, 'async_trait>(
&'life0 self,
_run_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn list_runs<'life0, 'life1, 'async_trait>(
&'life0 self,
_query: &'life1 RunQuery,
) -> Pin<Box<dyn Future<Output = Result<RunPage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn list_runs<'life0, 'life1, 'async_trait>(
&'life0 self,
_query: &'life1 RunQuery,
) -> Pin<Box<dyn Future<Output = Result<RunPage, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn active_run_for_thread<'life0, 'life1, 'async_trait>(
&'life0 self,
_thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn active_run_for_thread<'life0, 'life1, 'async_trait>(
&'life0 self,
_thread_id: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§impl ThreadWriter for NatsBufferedThreadWriter
impl ThreadWriter for NatsBufferedThreadWriter
Source§fn append<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
delta: &'life2 ThreadChangeSet,
precondition: VersionPrecondition,
) -> Pin<Box<dyn Future<Output = Result<Committed, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn append<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
thread_id: &'life1 str,
delta: &'life2 ThreadChangeSet,
precondition: VersionPrecondition,
) -> Pin<Box<dyn Future<Output = Result<Committed, ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Publish delta to NATS JetStream instead of writing to database.
The delta is durably stored in JetStream and will be purged after the
run-end save() succeeds. If publishing fails the error is mapped to
ThreadStoreError::Io.
Source§fn save<'life0, 'life1, 'async_trait>(
&'life0 self,
thread: &'life1 Thread,
) -> Pin<Box<dyn Future<Output = Result<(), ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn save<'life0, 'life1, 'async_trait>(
&'life0 self,
thread: &'life1 Thread,
) -> Pin<Box<dyn Future<Output = Result<(), ThreadStoreError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Run-end flush: saves the final materialized thread to the inner storage and purges the corresponding NATS JetStream messages.
Auto Trait Implementations§
impl Freeze for NatsBufferedThreadWriter
impl !RefUnwindSafe for NatsBufferedThreadWriter
impl Send for NatsBufferedThreadWriter
impl Sync for NatsBufferedThreadWriter
impl Unpin for NatsBufferedThreadWriter
impl !UnwindSafe for NatsBufferedThreadWriter
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more