NatsBufferedThreadWriter

Struct NatsBufferedThreadWriter 

Source
pub struct NatsBufferedThreadWriter { /* private fields */ }
Expand description

A ThreadWriter decorator that buffers deltas in NATS JetStream and flushes the final thread to the inner storage at run end.

§Query consistency (CQRS)

load always reads from the inner (durable) storage. During an active run, queries return the last-flushed snapshot — they do not include deltas that are buffered in NATS but not yet flushed.

Real-time data for in-progress runs is delivered through the SSE/NATS event stream. Callers that need up-to-date messages during a run should consume the event stream rather than polling the query API.

Implementations§

Source§

impl NatsBufferedThreadWriter

Source

pub async fn new( inner: Arc<dyn ThreadStore>, jetstream: Context, ) -> Result<Self, Error>

Create a new buffered storage.

inner is the durable backend (e.g. PostgreSQL) used for create, load, delete, and the final save at run end.

jetstream is an already-connected JetStream context.

Source

pub async fn recover(&self) -> Result<usize, NatsBufferedThreadWriterError>

Recover incomplete runs after a crash.

Replays any unacked deltas from the JetStream stream, applies them to the corresponding threads loaded from the inner storage, and saves the result. Acked messages are then purged.

Trait Implementations§

Source§

impl ThreadReader for NatsBufferedThreadWriter

Source§

fn load<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<ThreadHead>, ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Load an Thread and its current version.
Source§

fn list_threads<'life0, 'life1, 'async_trait>( &'life0 self, query: &'life1 ThreadListQuery, ) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List Thread ids.
Source§

fn load_thread<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<Thread>, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Load an Thread without version info. Convenience wrapper.
Source§

fn load_messages<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, thread_id: &'life1 str, query: &'life2 MessageQuery, ) -> Pin<Box<dyn Future<Output = Result<MessagePage, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, Self: 'async_trait,

Load paginated messages for an Thread.
Source§

fn list<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<String>, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, Self: 'async_trait,

List all Thread ids with default paging.
Source§

fn list_paginated<'life0, 'life1, 'async_trait>( &'life0 self, query: &'life1 ThreadListQuery, ) -> Pin<Box<dyn Future<Output = Result<ThreadListPage, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

List Thread ids with explicit query.
Source§

fn message_count<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<usize, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Return total message count.
Source§

fn load_run<'life0, 'life1, 'async_trait>( &'life0 self, _run_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Load one run record by run id.
Source§

fn list_runs<'life0, 'life1, 'async_trait>( &'life0 self, _query: &'life1 RunQuery, ) -> Pin<Box<dyn Future<Output = Result<RunPage, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

List runs with optional filtering and pagination.
Source§

fn active_run_for_thread<'life0, 'life1, 'async_trait>( &'life0 self, _thread_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<RunRecord>, ThreadStoreError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Load the most recent non-terminal run for a thread, if any.
Source§

impl ThreadWriter for NatsBufferedThreadWriter

Source§

fn append<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, thread_id: &'life1 str, delta: &'life2 ThreadChangeSet, precondition: VersionPrecondition, ) -> Pin<Box<dyn Future<Output = Result<Committed, ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Publish delta to NATS JetStream instead of writing to database.

The delta is durably stored in JetStream and will be purged after the run-end save() succeeds. If publishing fails the error is mapped to ThreadStoreError::Io.

Source§

fn save<'life0, 'life1, 'async_trait>( &'life0 self, thread: &'life1 Thread, ) -> Pin<Box<dyn Future<Output = Result<(), ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Run-end flush: saves the final materialized thread to the inner storage and purges the corresponding NATS JetStream messages.

Source§

fn create<'life0, 'life1, 'async_trait>( &'life0 self, thread: &'life1 Thread, ) -> Pin<Box<dyn Future<Output = Result<Committed, ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Create a new Thread.
Source§

fn delete<'life0, 'life1, 'async_trait>( &'life0 self, thread_id: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<(), ThreadStoreError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Delete an Thread.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
Source§

impl<T> ThreadStore for T
where T: ThreadWriter + ?Sized,