feat(solana-indexer) PR 5.1: ingester drain loop#4549
Conversation
Fills in the ingester run loop for the solana-indexer.
There was a problem hiding this comment.
Code Review
This pull request implements the Ingester component to drain the Yellowstone gRPC stream and forward transaction and account updates to the decoder. It also refactors the Store trait to support thread-safe async operations. Feedback on the changes suggests using fetch_max instead of store on the atomic slot counter to prevent regression from out-of-order updates, and rate-limiting the backpressure warning log to avoid log flooding when the decoder channel is full.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Patches remote memory exhaustion DoS in quinn-proto via unbounded out-of-order stream reassembly. Transitive dep via solana-client.
| } | ||
| } | ||
| } | ||
| tracing::info!("yellowstone stream ended; ingester stopping"); |
There was a problem hiding this comment.
Given that the architecture heavily leans towards actors did you already think about graceful shutdowns?
| latest_chain_slot: &AtomicU64, | ||
| update: SubscribeUpdate, | ||
| ) -> ControlFlow<()> { | ||
| use UpdateOneof::*; |
There was a problem hiding this comment.
Let's keep all imports on the top of the file.
| /// Associated function taking the channel and chain-tip counter by | ||
| /// reference rather than `&self`, so the future borrows only those (both | ||
| /// `Sync`) fields across awaits. That keeps `run`'s future `Send` without | ||
| /// requiring `Ingester: Sync` — the `GeyserStream` field is `Send` but not | ||
| /// `Sync`. |
There was a problem hiding this comment.
IMO this is not a doc comment. Doc comments should explain the effects of a function without overwhelming the reader with implementation details. If impl details are important they should be added as regular code comments in the code sections they relate to.
| Transaction(tx_msg) => Self::handle_transaction(tx, tx_msg).await, | ||
| Account(account) => Self::handle_account(tx, account).await, | ||
| Slot(slot) => Self::handle_slot(latest_chain_slot, slot).await, |
There was a problem hiding this comment.
This serializes the handling of each packet which can be slow depending on the futures. Is this intended? Do we not have to be worried about creating unnecessary back pressure here?
| let Some(inner) = tx_msg.transaction else { | ||
| tracing::warn!(slot = tx_msg.slot, "transaction update without a body"); | ||
| return ControlFlow::Continue(()); | ||
| }; |
There was a problem hiding this comment.
Is this something that can actually happen or is transaction unnecessarily optional?
| }; | ||
| let Ok(signature) = Signature::try_from(inner.signature.as_slice()) else { | ||
| tracing::warn!( | ||
| slot = tx_msg.slot, |
There was a problem hiding this comment.
It's not too bad yet in this code but ideally slot should be passed via a tracing span and .instrument(). That way you don't have to remember and manually add the slot to every related log.
| /// The persisted watermark could not be read. | ||
| #[error("failed to read the resume watermark: {0}")] | ||
| Store(#[from] StoreError), |
There was a problem hiding this comment.
The names of the error variants are too generic. Why not incorporate the additional context of the log messages into the names? Otherwise the reader will always have to jump to the error definition to understand anything about the error.
| let request = SubscribeRequest { | ||
| from_slot, | ||
| ..request | ||
| }; |
There was a problem hiding this comment.
What's the reason to not move this and the from_slot logic into subscribe_request()?
| ingester.run().await?; | ||
| Ok(()) |
There was a problem hiding this comment.
nit: you can just return ingester.run().await, no?
| //! Re-exports of the `yellowstone-grpc-proto` message types the indexer | ||
| //! consumes as its wire-format surface. | ||
| pub use yellowstone_grpc_proto::{ |
There was a problem hiding this comment.
What's the reasoning for re-exporting those types btw?
| /// Consume a slot message: advance the in-memory chain-tip counter. Slot | ||
| /// messages never enter the channel, so this always continues. | ||
| async fn handle_slot( | ||
| latest_chain_slot: &AtomicU64, | ||
| slot: SubscribeUpdateSlot, | ||
| ) -> ControlFlow<()> { | ||
| latest_chain_slot.fetch_max(slot.slot, Ordering::Relaxed); | ||
| ControlFlow::Continue(()) | ||
| } |
There was a problem hiding this comment.
Do we also have to update the last_chain_slot when we encounter the other message types? They also contain a slot number after all.
squadgazzz
left a comment
There was a problem hiding this comment.
Will need to do another round.
| // Ping/Pong frames carry no data the ingester needs; the library passes them through, | ||
| // and we drop them here. | ||
| Ping(_) | Pong(_) => ControlFlow::Continue(()), | ||
|
|
||
| // Not part of our subscription; irrelevant to the ingester even if the provider sends | ||
| // them. | ||
| TransactionStatus(_) | Block(_) | BlockMeta(_) | Entry(_) => ControlFlow::Continue(()), |
There was a problem hiding this comment.
Should those be merged?
| use UpdateOneof::*; | ||
|
|
||
| let Some(update) = update.update_oneof else { | ||
| tracing::warn!("update without a payload"); |
There was a problem hiding this comment.
Should we log the latest chain slot on such problems to understand on which one it happened?
| tracing::warn!("decoder channel full; ingester blocked on backpressure"); | ||
| match tx.send(update).await { | ||
| Ok(()) => ControlFlow::Continue(()), | ||
| Err(_) => ControlFlow::Break(()), | ||
| } |
There was a problem hiding this comment.
Will the failed update itself be logged somewhere? I don't really see it.
| async fn handle_account( | ||
| tx: &Sender<StreamUpdate>, | ||
| account: SubscribeUpdateAccount, | ||
| ) -> ControlFlow<()> { | ||
| let Some(inner) = account.account else { | ||
| tracing::warn!(slot = account.slot, "account update without a body"); | ||
| return ControlFlow::Continue(()); | ||
| }; | ||
| let txn_signature = inner | ||
| .txn_signature | ||
| .as_deref() | ||
| .and_then(|bytes| Signature::try_from(bytes).ok()); | ||
| Self::forward( | ||
| tx, | ||
| StreamUpdate::Account { | ||
| slot: account.slot, | ||
| txn_signature, | ||
| inner: Box::new(inner), | ||
| }, | ||
| ) | ||
| .await | ||
| } |
There was a problem hiding this comment.
A transaction with a malformed signature gets skipped with a warning, but an account update with a malformed txn_signature silently becomes None and still gets forwarded. That account can't link back to its transaction, so the tx half ages out and dead-letters as account missing even though the account did arrive. Probably worth the same warning here.
| // Ping/Pong frames carry no data the ingester needs; the library passes them through, | ||
| // and we drop them here. | ||
| Ping(_) | Pong(_) => ControlFlow::Continue(()), |
There was a problem hiding this comment.
As I understand yellowstone, the server sends periodic Ping frames and expects a Pong back on the request stream, or it can drop an idle connection. Here Ping/Pong are ignored and serve drops, so nothing answers. The PR says AutoReconnect handles keepalive, but the linked reconnect.rs looks like it forwards pings to us without answering them. If that's right, the connection only stays up on HTTP/2 keepalive set on the GeyserGrpcClient.
Description
This PR fills in
Ingester::run, which was previously justunimplemented!().The ingester now pulls updates from an
AutoReconnect-backedGeyserStreamand pushes taggedStreamUpdates into the decoder channel. Slot filter messages advance the latest chain slot counter in memory.Mind that the ingester does not decode messages and does not write the watermark; the decoder handles persistence.
Ingester::serveis the production entrypoint. It builds the subscription request, resumes from the stored watermark (from_slot = watermark + 1, or the live tip on a cold start), opens the stream throughGeyserGrpcClient::subscribe_with_request, and runs the drain loop. The caller passes in aGeyserGrpcClientbuilt withset_reconnect_config; otherwise the auto-reconnect wrapper will not actually reconnect.Reconnects, backoff, keepalive, and resume from checkpoint are handled by the
AutoReconnectstream wrapper. That wrapper also injects its ownBlockMeta+slotfilter under the__autoreconnectkey so it can checkpoint and resume; those messages stay inside the wrapper and never reach the ingester. Recoverable errors are swallowed there too.runonly returns when the stream ends for good or the decoder receiver drops.Ping/Pongframes are dropped. The library passes them through, but the ingester has no use for them.Unit tests for the run loop are defined in this follow-up PR: #4550.
Changes
Ingester::runas a plain drain loop. It reads updates from the stream, dispatches them, and stops when the stream ends or the decoder channel closes. No reconnect or backoff logic inside the ingester.S: Stream<Item = Result<SubscribeUpdate, Status>> + Unpin + Sendinstead of tying it to aGrpcConnector. Production uses anAutoReconnect-backedGeyserStream; tests can pass any stream.newtakes the stream, the decoder sender, and a sharedArc<AtomicU64>for the latest chain slot.Ingester::serve, a production entrypoint generic overSt: Store. It builds theSubscribeRequest, reads the watermark to setfrom_slot, opens the stream, and runs the drain loop. Added anErrorenum for setup failures, terminal stream errors, and clean stream end.subscribe_request, which defines the four program filters (settlement and solflow transactions and accounts, failed transactions included) plus achain_tipslot filter atconfirmedcommitment.from_slotis left empty soservecan fill it from the watermark.handle_updateand its helpershandle_transaction,handle_account, andhandle_slot. Transactions and accounts are forwarded asStreamUpdate::Tx/StreamUpdate::Account. Frames without a body or with malformed signatures are skipped with a warning. Slot messages only updatelatest_chain_slotin memory.forward, which tries a non-blocking send into the decoder channel, falls back to a blocking send with a warning when the channel is full, and stops when the receiver is gone.LATEST_CHAIN_SLOT: AtomicU64static with anArc<AtomicU64>owned by theIngester. The watchdog and finalization worker will take read clones. Updated related doc comments inwatchdog.rs,commitment.rs, anderrors.rs.Storeto requireSend + Syncand returnimpl Future<Output = Result<..., StoreError>> + Sendfrom each method. This letsIngester::servebetokio::spawned while still allowing implementors to writeasync fnbodies.serveintypes/wire.rs.futuresas a dependency forStreamExt.How to test
Implementation only; run-loop unit tests follow in the next PR.
cargo check -p solana-indexercargo clippy -p solana-indexer --all-targetscargo +nightly fmt --all -- --checkThis is a follow-up PR to #4514