pub struct EngineJournalBatcher {
pool: DbPool,
receiver: Receiver<JournalMessage>,
config: JournalBatcherConfig,
shutdown: Receiver<()>,
wal_writer_tx: SyncSender<WalWriterMessage>,
wal_writer_handle: Option<JoinHandle<()>>,
replication_tx: Sender<ReplicationMessage>,
replication_rx: Option<Receiver<ReplicationMessage>>,
rsm_environment: ValidatorRsmEnvironment,
}Expand description
The batcher that runs in the background and appends to local WAL first, then replicates to PostgreSQL asynchronously.
Fields§
§pool: DbPool§receiver: Receiver<JournalMessage>§config: JournalBatcherConfig§shutdown: Receiver<()>§wal_writer_tx: SyncSender<WalWriterMessage>Sender half of the bounded channel to the dedicated WAL writer thread.
wal_writer_handle: Option<JoinHandle<()>>Join handle for the WAL writer OS thread (taken during shutdown).
replication_tx: Sender<ReplicationMessage>§replication_rx: Option<Receiver<ReplicationMessage>>§rsm_environment: ValidatorRsmEnvironmentEnvironment partition for persisted RSM roots and block headers.
Implementations§
Source§impl EngineJournalBatcher
impl EngineJournalBatcher
pub(super) fn rsm_block_persistence_batch( input: RsmBlockPersistenceInput<'_>, ) -> Result<EngineJournalRsmBlockBatch>
Source§impl EngineJournalBatcher
impl EngineJournalBatcher
Sourcepub fn new(
pool: DbPool,
config: JournalBatcherConfig,
shutdown: Receiver<()>,
cold_signer: Option<PrivateKeySigner>,
) -> (Self, JournalBatchSender)
pub fn new( pool: DbPool, config: JournalBatcherConfig, shutdown: Receiver<()>, cold_signer: Option<PrivateKeySigner>, ) -> (Self, JournalBatchSender)
Create a new batcher and return the sender for pushing entries.
fn load_rsm_block_cursor( pool: &DbPool, rsm_environment: ValidatorRsmEnvironment, ) -> (u64, [u8; 32], u64)
Sourcefn recover_wal_sync(
pool: &DbPool,
wal_path: &Path,
checkpoint_path: &Path,
persist_digests: bool,
max_batch_size: usize,
rsm_environment: ValidatorRsmEnvironment,
rsm_commitments: Option<&mut ValidatorRsmStateCommitmentRuntime>,
) -> Option<u64>
fn recover_wal_sync( pool: &DbPool, wal_path: &Path, checkpoint_path: &Path, persist_digests: bool, max_batch_size: usize, rsm_environment: ValidatorRsmEnvironment, rsm_commitments: Option<&mut ValidatorRsmStateCommitmentRuntime>, ) -> Option<u64>
Run streaming WAL recovery. Returns Some(truncated_len) if the WAL
was truncated to remove torn bytes, so the caller can fix up WalState.
Sourcepub async fn run(self)
pub async fn run(self)
Run the batcher loop with group commit.
Entries are flushed immediately on arrival. Natural batching occurs when new entries arrive while the writer thread is performing fsync — they accumulate in the channel and get drained in the next iteration.
Sourceasync fn flush_batch(
&mut self,
batch: &mut Vec<JournalEntry>,
completion: Option<Sender<()>>,
)
async fn flush_batch( &mut self, batch: &mut Vec<JournalEntry>, completion: Option<Sender<()>>, )
Enqueue a batch for the WAL writer thread.
Extracts ack senders from the entries and sends a WalWriterMessage::WriteBatch
to the writer thread. The writer thread handles write + fsync + ack firing.
If completion is Some, the writer thread fires it after fsync. Callers
who need to block until fsync (e.g. Flush) should await the receiver side.
Sourceasync fn send_flush_barrier(&self, completion: Sender<()>)
async fn send_flush_barrier(&self, completion: Sender<()>)
Send a flush barrier to the WAL writer thread.
The barrier is processed in FIFO order behind any in-flight WriteBatch messages, ensuring the completion fires only after all prior fsyncs.
Sourceasync fn await_replication_flush(&self)
async fn await_replication_flush(&self)
Send a flush barrier through the replication channel and wait for all pending WAL entries to be replicated to Postgres.
After this returns, the WAL checkpoint is guaranteed to reflect the state
of all entries written to the WAL up to this point. This prevents
snapshot-checkpoint lag: without this barrier, persist_engine_state_snapshot
could read a stale checkpoint and tag the snapshot with a last_command_id
that trails the live engine state, causing double-application of fills
during the next recovery.
Sourceasync fn replication_loop(
pool: DbPool,
persist_digests: bool,
max_batch_size: usize,
flush_interval: Duration,
checkpoint_path: PathBuf,
wal_path: PathBuf,
receiver: Receiver<ReplicationMessage>,
rsm_environment: ValidatorRsmEnvironment,
)
async fn replication_loop( pool: DbPool, persist_digests: bool, max_batch_size: usize, flush_interval: Duration, checkpoint_path: PathBuf, wal_path: PathBuf, receiver: Receiver<ReplicationMessage>, rsm_environment: ValidatorRsmEnvironment, )
Replication loop: drains WAL records from the writer thread and replicates to PostgreSQL.
NOTE: This loop does NOT listen to the broadcast shutdown signal directly.
Shutdown is driven by ReplicationMessage::Shutdown sent by the batcher
AFTER the WAL writer thread has been joined. This prevents a race where
the broadcast fires early, the replication loop exits, and the WAL writer
thread panics on replication_tx.blocking_send() to a closed channel.
fn read_checkpoint_or_panic(path: &Path) -> WalCheckpointMetadata
fn drain_replication_channel( receiver: &mut Receiver<ReplicationMessage>, pending: &mut Vec<WalRecordWithOffset>, )
Sourcefn punch_wal_hole(wal_path: &Path, from: u64, to: u64)
fn punch_wal_hole(wal_path: &Path, from: u64, to: u64)
Punch a hole in the WAL file for the replicated byte range [from, to). Releases disk blocks without changing the file size (sparse file). Non-fatal: logs a warning if the filesystem doesn’t support it.
fn advance_checkpoint( checkpoint_path: &Path, last_offset: u64, progress: ReplicationProgress, ) -> Result<(), String>
Sourcefn emit_wal_checkpoint_metrics(wal_path: &Path, checkpoint_offset: u64)
fn emit_wal_checkpoint_metrics(wal_path: &Path, checkpoint_offset: u64)
Emit Prometheus gauges for WAL checkpoint offset and unreplicated tail.
async fn replicate_pending( pool: &DbPool, persist_digests: bool, max_batch_size: usize, checkpoint_path: &Path, pending: &mut Vec<WalRecordWithOffset>, rsm_environment: ValidatorRsmEnvironment, ) -> Result<(), String>
async fn replicate_chunk( pool: &DbPool, persist_digests: bool, records: &[WalRecordWithOffset], rsm_environment: ValidatorRsmEnvironment, ) -> Result<ReplicationProgress, String>
Sourcefn insert_batch(
pool: &DbPool,
entries: &[JournalEntry],
persist_digests: bool,
rsm_blocks: Option<EngineJournalRsmBlockBatch>,
) -> Result<(usize, Vec<(DbUuid, i64)>)>
fn insert_batch( pool: &DbPool, entries: &[JournalEntry], persist_digests: bool, rsm_blocks: Option<EngineJournalRsmBlockBatch>, ) -> Result<(usize, Vec<(DbUuid, i64)>)>
Insert a batch of entries into PostgreSQL (blocking). Returns (inserted_count, [(request_uuid, command_id)]) for downstream event publish accounting.
fn lookup_command_ids_by_request_ids( pool: &DbPool, request_ids: &[DbUuid], ) -> Result<Vec<(DbUuid, i64)>>
Auto Trait Implementations§
impl Freeze for EngineJournalBatcher
impl !RefUnwindSafe for EngineJournalBatcher
impl Send for EngineJournalBatcher
impl Sync for EngineJournalBatcher
impl Unpin for EngineJournalBatcher
impl UnsafeUnpin for EngineJournalBatcher
impl !UnwindSafe for EngineJournalBatcher
Blanket Implementations§
§impl<T> AggregateExpressionMethods for T
impl<T> AggregateExpressionMethods for T
§fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
fn aggregate_distinct(self) -> Self::Outputwhere
Self: DistinctDsl,
DISTINCT modifier for aggregate functions Read more§fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
fn aggregate_all(self) -> Self::Outputwhere
Self: AllDsl,
ALL modifier for aggregate functions Read more§fn aggregate_filter<P>(self, f: P) -> Self::Outputwhere
P: AsExpression<Bool>,
Self: FilterDsl<<P as AsExpression<Bool>>::Expression>,
fn aggregate_filter<P>(self, f: P) -> Self::Outputwhere
P: AsExpression<Bool>,
Self: FilterDsl<<P as AsExpression<Bool>>::Expression>,
§fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
fn aggregate_order<O>(self, o: O) -> Self::Outputwhere
Self: OrderAggregateDsl<O>,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T> Downcast for Twhere
T: Any,
impl<T> Downcast for Twhere
T: Any,
§fn into_any(self: Box<T>) -> Box<dyn Any>
fn into_any(self: Box<T>) -> Box<dyn Any>
Box<dyn Trait> (where Trait: Downcast) to Box<dyn Any>, which can then be
downcast into Box<dyn ConcreteType> where ConcreteType implements Trait.§fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
fn into_any_rc(self: Rc<T>) -> Rc<dyn Any>
Rc<Trait> (where Trait: Downcast) to Rc<Any>, which can then be further
downcast into Rc<ConcreteType> where ConcreteType implements Trait.§fn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
&Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &Any’s vtable from &Trait’s.§fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn as_any_mut(&mut self) -> &mut (dyn Any + 'static)
&mut Trait (where Trait: Downcast) to &Any. This is needed since Rust cannot
generate &mut Any’s vtable from &mut Trait’s.§impl<T> DowncastSend for T
impl<T> DowncastSend for T
§impl<T> DowncastSync for T
impl<T> DowncastSync for T
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request§impl<T> IntoSql for T
impl<T> IntoSql for T
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T, Conn> RunQueryDsl<Conn> for T
impl<T, Conn> RunQueryDsl<Conn> for T
§fn execute<'conn, 'query>(
self,
conn: &'conn mut Conn,
) -> <Conn as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>where
Conn: AsyncConnectionCore + Send,
Self: ExecuteDsl<Conn> + 'query,
fn execute<'conn, 'query>(
self,
conn: &'conn mut Conn,
) -> <Conn as AsyncConnectionCore>::ExecuteFuture<'conn, 'query>where
Conn: AsyncConnectionCore + Send,
Self: ExecuteDsl<Conn> + 'query,
§fn load<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn load<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
§fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
fn load_stream<'conn, 'query, U>(
self,
conn: &'conn mut Conn,
) -> Self::LoadFuture<'conn>where
Conn: AsyncConnectionCore,
U: 'conn,
Self: LoadQuery<'query, Conn, U> + 'query,
Stream] with the returned rows. Read more§fn get_result<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, LoadNext<Pin<Box<Self::Stream<'conn>>>>>where
U: Send + 'conn,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn get_result<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, LoadNext<Pin<Box<Self::Stream<'conn>>>>>where
U: Send + 'conn,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
§fn get_results<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
fn get_results<'query, 'conn, U>(
self,
conn: &'conn mut Conn,
) -> AndThen<Self::LoadFuture<'conn>, TryCollect<Self::Stream<'conn>, Vec<U>>>where
U: Send,
Conn: AsyncConnectionCore,
Self: LoadQuery<'query, Conn, U> + 'query,
Vec with the affected rows. Read more§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.