Skip to content

feat(solana-indexer) PR 5.1: ingester drain loop#4549

Open
tilacog wants to merge 4 commits into
solana-indexer/PR4-bootstrapfrom
solana-indexer/PR5.1-bootstrap
Open

feat(solana-indexer) PR 5.1: ingester drain loop#4549
tilacog wants to merge 4 commits into
solana-indexer/PR4-bootstrapfrom
solana-indexer/PR5.1-bootstrap

Conversation

@tilacog

@tilacog tilacog commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Description

This PR fills in Ingester::run, which was previously just unimplemented!().

The ingester now pulls updates from an AutoReconnect-backed GeyserStream and pushes tagged StreamUpdates into the decoder channel. Slot filter messages advance the latest chain slot counter in memory.

Mind that the ingester does not decode messages and does not write the watermark; the decoder handles persistence.

Ingester::serve is the production entrypoint. It builds the subscription request, resumes from the stored watermark (from_slot = watermark + 1, or the live tip on a cold start), opens the stream through GeyserGrpcClient::subscribe_with_request, and runs the drain loop. The caller passes in a GeyserGrpcClient built with set_reconnect_config; otherwise the auto-reconnect wrapper will not actually reconnect.

Reconnects, backoff, keepalive, and resume from checkpoint are handled by the AutoReconnect stream wrapper. That wrapper also injects its own BlockMeta + slot filter under the __autoreconnect key so it can checkpoint and resume; those messages stay inside the wrapper and never reach the ingester. Recoverable errors are swallowed there too. run only returns when the stream ends for good or the decoder receiver drops.

Ping/Pong frames are dropped. The library passes them through, but the ingester has no use for them.

Unit tests for the run loop are defined in this follow-up PR: #4550.

Changes

  • Implemented Ingester::run as a plain drain loop. It reads updates from the stream, dispatches them, and stops when the stream ends or the decoder channel closes. No reconnect or backoff logic inside the ingester.
  • Made the ingester generic over S: Stream<Item = Result<SubscribeUpdate, Status>> + Unpin + Send instead of tying it to a GrpcConnector. Production uses an AutoReconnect-backed GeyserStream; tests can pass any stream. new takes the stream, the decoder sender, and a shared Arc<AtomicU64> for the latest chain slot.
  • Added Ingester::serve, a production entrypoint generic over St: Store. It builds the SubscribeRequest, reads the watermark to set from_slot, opens the stream, and runs the drain loop. Added an Error enum for setup failures, terminal stream errors, and clean stream end.
  • Added subscribe_request, which defines the four program filters (settlement and solflow transactions and accounts, failed transactions included) plus a chain_tip slot filter at confirmed commitment. from_slot is left empty so serve can fill it from the watermark.
  • Added handle_update and its helpers handle_transaction, handle_account, and handle_slot. Transactions and accounts are forwarded as StreamUpdate::Tx / StreamUpdate::Account. Frames without a body or with malformed signatures are skipped with a warning. Slot messages only update latest_chain_slot in memory.
  • Added forward, which tries a non-blocking send into the decoder channel, falls back to a blocking send with a warning when the channel is full, and stops when the receiver is gone.
  • Replaced the module-level LATEST_CHAIN_SLOT: AtomicU64 static with an Arc<AtomicU64> owned by the Ingester. The watchdog and finalization worker will take read clones. Updated related doc comments in watchdog.rs, commitment.rs, and errors.rs.
  • Refactored Store to require Send + Sync and return impl Future<Output = Result<..., StoreError>> + Send from each method. This lets Ingester::serve be tokio::spawned while still allowing implementors to write async fn bodies.
  • Re-exported the yellowstone geyser types needed by the ingester and serve in types/wire.rs.
  • Added futures as a dependency for StreamExt.

How to test

Implementation only; run-loop unit tests follow in the next PR.

  1. cargo check -p solana-indexer
  2. cargo clippy -p solana-indexer --all-targets
  3. cargo +nightly fmt --all -- --check

This is a follow-up PR to #4514

Fills in the ingester run loop for the solana-indexer.
@tilacog tilacog requested a review from a team as a code owner June 22, 2026 19:07

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the Ingester component to drain the Yellowstone gRPC stream and forward transaction and account updates to the decoder. It also refactors the Store trait to support thread-safe async operations. Feedback on the changes suggests using fetch_max instead of store on the atomic slot counter to prevent regression from out-of-order updates, and rate-limiting the backpressure warning log to avoid log flooding when the decoder channel is full.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread crates/solana-indexer/src/indexer/ingester.rs
Comment thread crates/solana-indexer/src/indexer/ingester.rs
Patches remote memory exhaustion DoS in quinn-proto via unbounded
out-of-order stream reassembly. Transitive dep via solana-client.
@tilacog tilacog changed the title feat(solana-indexer): PR 5 — ingester drain loop feat(solana-indexer): PR 5.1 — ingester drain loop Jun 23, 2026
@tilacog tilacog changed the title feat(solana-indexer): PR 5.1 — ingester drain loop feat(solana-indexer) PR 5.1: ingester drain loop Jun 23, 2026
}
}
}
tracing::info!("yellowstone stream ended; ingester stopping");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the architecture heavily leans towards actors did you already think about graceful shutdowns?

latest_chain_slot: &AtomicU64,
update: SubscribeUpdate,
) -> ControlFlow<()> {
use UpdateOneof::*;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep all imports on the top of the file.

Comment on lines +136 to +140
/// Associated function taking the channel and chain-tip counter by
/// reference rather than `&self`, so the future borrows only those (both
/// `Sync`) fields across awaits. That keeps `run`'s future `Send` without
/// requiring `Ingester: Sync` — the `GeyserStream` field is `Send` but not
/// `Sync`.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is not a doc comment. Doc comments should explain the effects of a function without overwhelming the reader with implementation details. If impl details are important they should be added as regular code comments in the code sections they relate to.

Comment on lines +153 to +155
Transaction(tx_msg) => Self::handle_transaction(tx, tx_msg).await,
Account(account) => Self::handle_account(tx, account).await,
Slot(slot) => Self::handle_slot(latest_chain_slot, slot).await,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This serializes the handling of each packet which can be slow depending on the futures. Is this intended? Do we not have to be worried about creating unnecessary back pressure here?

Comment on lines +173 to +176
let Some(inner) = tx_msg.transaction else {
tracing::warn!(slot = tx_msg.slot, "transaction update without a body");
return ControlFlow::Continue(());
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that can actually happen or is transaction unnecessarily optional?

};
let Ok(signature) = Signature::try_from(inner.signature.as_slice()) else {
tracing::warn!(
slot = tx_msg.slot,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not too bad yet in this code but ideally slot should be passed via a tracing span and .instrument(). That way you don't have to remember and manually add the slot to every related log.

Comment on lines +252 to +254
/// The persisted watermark could not be read.
#[error("failed to read the resume watermark: {0}")]
Store(#[from] StoreError),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names of the error variants are too generic. Why not incorporate the additional context of the log messages into the names? Otherwise the reader will always have to jump to the error definition to understand anything about the error.

Comment on lines +294 to +297
let request = SubscribeRequest {
from_slot,
..request
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to not move this and the from_slot logic into subscribe_request()?

Comment on lines +306 to +307
ingester.run().await?;
Ok(())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can just return ingester.run().await, no?

Comment on lines 3 to 5
//! Re-exports of the `yellowstone-grpc-proto` message types the indexer
//! consumes as its wire-format surface.
pub use yellowstone_grpc_proto::{

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for re-exporting those types btw?

Comment on lines +220 to +228
/// Consume a slot message: advance the in-memory chain-tip counter. Slot
/// messages never enter the channel, so this always continues.
async fn handle_slot(
latest_chain_slot: &AtomicU64,
slot: SubscribeUpdateSlot,
) -> ControlFlow<()> {
latest_chain_slot.fetch_max(slot.slot, Ordering::Relaxed);
ControlFlow::Continue(())
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also have to update the last_chain_slot when we encounter the other message types? They also contain a slot number after all.

@squadgazzz squadgazzz left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to do another round.

Comment on lines +157 to +163
// Ping/Pong frames carry no data the ingester needs; the library passes them through,
// and we drop them here.
Ping(_) | Pong(_) => ControlFlow::Continue(()),

// Not part of our subscription; irrelevant to the ingester even if the provider sends
// them.
TransactionStatus(_) | Block(_) | BlockMeta(_) | Entry(_) => ControlFlow::Continue(()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should those be merged?

use UpdateOneof::*;

let Some(update) = update.update_oneof else {
tracing::warn!("update without a payload");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log the latest chain slot on such problems to understand on which one it happened?

Comment on lines +238 to +242
tracing::warn!("decoder channel full; ingester blocked on backpressure");
match tx.send(update).await {
Ok(()) => ControlFlow::Continue(()),
Err(_) => ControlFlow::Break(()),
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the failed update itself be logged somewhere? I don't really see it.

Comment on lines +197 to +218
async fn handle_account(
tx: &Sender<StreamUpdate>,
account: SubscribeUpdateAccount,
) -> ControlFlow<()> {
let Some(inner) = account.account else {
tracing::warn!(slot = account.slot, "account update without a body");
return ControlFlow::Continue(());
};
let txn_signature = inner
.txn_signature
.as_deref()
.and_then(|bytes| Signature::try_from(bytes).ok());
Self::forward(
tx,
StreamUpdate::Account {
slot: account.slot,
txn_signature,
inner: Box::new(inner),
},
)
.await
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A transaction with a malformed signature gets skipped with a warning, but an account update with a malformed txn_signature silently becomes None and still gets forwarded. That account can't link back to its transaction, so the tx half ages out and dead-letters as account missing even though the account did arrive. Probably worth the same warning here.

Comment on lines +157 to +159
// Ping/Pong frames carry no data the ingester needs; the library passes them through,
// and we drop them here.
Ping(_) | Pong(_) => ControlFlow::Continue(()),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand yellowstone, the server sends periodic Ping frames and expects a Pong back on the request stream, or it can drop an idle connection. Here Ping/Pong are ignored and serve drops, so nothing answers. The PR says AutoReconnect handles keepalive, but the linked reconnect.rs looks like it forwards pings to us without answering them. If that's right, the connection only stays up on HTTP/2 keepalive set on the GeyserGrpcClient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants