Use NATS Buffered Postgres Store
Use this for high-write runs: checkpoint deltas are buffered in NATS JetStream and flushed to Postgres at run end.
Prerequisites
tirea-store-adapterswithnatsandpostgresfeatures.- Reachable PostgreSQL and NATS JetStream.
Steps
- Create Postgres durable store.
use std::sync::Arc;
use tirea_store_adapters::PostgresStore;
let pool = sqlx::PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
let postgres = Arc::new(PostgresStore::new(pool));
postgres.ensure_table().await?;
- Wrap writer with NATS JetStream buffer.
use tirea::contracts::storage::ThreadStore;
use tirea_store_adapters::NatsBufferedThreadWriter;
let nats = async_nats::connect(std::env::var("NATS_URL")?).await?;
let jetstream = async_nats::jetstream::new(nats);
let durable: Arc<dyn ThreadStore> = postgres.clone();
let buffered = Arc::new(NatsBufferedThreadWriter::new(durable, jetstream).await?);
- Recover pending deltas on startup.
let recovered = buffered.recover().await?;
eprintln!("recovered {} buffered deltas", recovered);
- Wire buffered writer for runtime commits, Postgres for reads.
use tirea::contracts::storage::ThreadReader;
use tirea::composition::{AgentDefinition, AgentDefinitionSpec, AgentOsBuilder};
let os = AgentOsBuilder::new()
.with_agent_state_store(buffered.clone())
.with_agent_spec(AgentDefinitionSpec::local_with_id(
"assistant",
AgentDefinition::new("deepseek-chat"),
))
.build()?;
let read_store: Arc<dyn ThreadReader> = postgres.clone();
Semantics
- During run: deltas are published to JetStream (
thread.<thread_id>.deltas). - On run-finished checkpoint: buffered deltas are materialized and persisted to Postgres.
- Query APIs read Postgres snapshot (CQRS), so they may lag active in-flight deltas.
Verify
- Active runs emit normal events while Postgres writes are reduced.
- After run completion, Postgres thread contains full committed messages/state.
recover()replays unacked deltas after crash.
Common Errors
- Skipping
ensure_table()when you expect startup-time validation of database permissions or schema creation. - Running without JetStream enabled on NATS server.
- Expecting query endpoints to include not-yet-flushed in-run deltas.
Related Example
- No dedicated UI starter ships for this storage path; use
crates/tirea-agentos-server/tests/e2e_nats_postgres.rsas the end-to-end integration fixture
Key Files
crates/tirea-store-adapters/src/nats_buffered.rscrates/tirea-store-adapters/src/postgres_store.rscrates/tirea-agentos-server/tests/e2e_nats_postgres.rs
Related
- Use Postgres Store
- Expose NATS
crates/tirea-agentos-server/tests/e2e_nats_postgres.rs