From e8fd659acc37be60fc43c68921651f275774dd15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 13:52:56 -0300 Subject: [PATCH 1/4] =?UTF-8?q?feat(solana-indexer):=20PR=205=20=E2=80=94?= =?UTF-8?q?=20ingester=20drain=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fills in the ingester run loop for the solana-indexer. --- Cargo.lock | 1 + crates/solana-indexer/Cargo.toml | 1 + crates/solana-indexer/src/indexer/ingester.rs | 400 ++++++++++++++++-- crates/solana-indexer/src/indexer/watchdog.rs | 5 +- crates/solana-indexer/src/traits/store.rs | 49 ++- crates/solana-indexer/src/types/commitment.rs | 3 +- crates/solana-indexer/src/types/errors.rs | 4 +- crates/solana-indexer/src/types/wire.rs | 17 +- 8 files changed, 419 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45edab9ebc..6b6824c035 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9398,6 +9398,7 @@ name = "solana-indexer" version = "0.1.0" dependencies = [ "dashmap 6.1.0", + "futures", "observe", "prometheus", "prometheus-metric-storage", diff --git a/crates/solana-indexer/Cargo.toml b/crates/solana-indexer/Cargo.toml index f086ae9997..09867574d6 100644 --- a/crates/solana-indexer/Cargo.toml +++ b/crates/solana-indexer/Cargo.toml @@ -17,6 +17,7 @@ path = "src/main.rs" [dependencies] dashmap = { workspace = true } +futures = { workspace = true } observe = { workspace = true } prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index c5afb566d5..553e69345c 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -1,59 +1,385 @@ -//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast -//! as yellowstone delivers, pushes tagged updates into the channel, and updates -//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding. - -// TODO: This file only declares the component skeleton. The `run` body is -// `unimplemented!`; the actual drain and reconnect with backoff logic arrives -// in a later change. +//! The ingester drains the yellowstone gRPC stream as fast as it delivers, +//! pushes tagged updates into the channel, and advances the latest-chain-slot +//! counter on every slot-filter message. It performs no decoding. +//! +//! The stream it drains is an `AutoReconnect`-backed +//! [`GeyserStream`](yellowstone_grpc_client::GeyserStream) from +//! `yellowstone-grpc-client`: reconnects, backoff, keepalive, and +//! resume-from-checkpoint are handled inside that stream and never surface +//! here. The ingester's [`Ingester::run`] loop therefore has no backoff of its +//! own; it returns when the stream ends (the wrapper gave up on an +//! unrecoverable error) or when the decoder hangs up. +//! +//! [`Ingester::serve`] is the production entrypoint — the "actual caller" — +//! that builds the subscription request, resumes from the persisted watermark, +//! opens the `GeyserStream`, and runs the drain loop. It expects the +//! [`GeyserGrpcClient`] it receives to have been built with a reconnect config +//! (via `set_reconnect_config`), otherwise the `AutoReconnect` wrapper won't +//! actually reconnect. use { - crate::{traits::store::Store, types::shared::StreamUpdate}, - std::sync::atomic::AtomicU64, - tokio::sync::mpsc::Sender, - yellowstone_grpc_client::GrpcConnector, + crate::{ + traits::store::Store, + types::{ + Signature, + errors::StoreError, + shared::StreamUpdate, + wire::{ + CommitmentLevel, + SubscribeRequest, + SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + UpdateOneof, + }, + }, + }, + futures::stream::{Stream, StreamExt}, + solana_sdk::pubkey::Pubkey, + std::{ + ops::ControlFlow, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + }, + tokio::sync::mpsc::{Sender, error::TrySendError}, + yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserStream}, + yellowstone_grpc_proto::tonic::Status, }; -/// The sole writer is the ingester, on every slot-filter message. Anchors the -/// partial-event watchdog and the finalization worker. Cold start is zero; the -/// watchdog skips its comparison on the first tick. -pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); - -/// Cap on the exponential backoff between reconnect attempts. -#[allow(dead_code)] -pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); - /// Capacity of the channel from the ingester to the decoder. pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; /// Ingester component. /// -/// Generic over a `GrpcConnector` implementor so the unit tests can drive it -/// with a mock. -pub struct Ingester { - /// gRPC connector implementor - pub connector: C, +/// Generic over the update `Stream` so unit tests can drive it with a mock. +/// Production wires this to an `AutoReconnect`-backed `GeyserStream` via +/// [`Ingester::serve`]. +/// +/// `Ping`/`Pong` frames are ignored: the library passes them through, but they +/// carry no data the ingester needs, and answering server pings is not part of +/// the drain path. +pub struct Ingester +where + S: Stream> + Unpin + Send, +{ + /// The yellowstone update stream. Expected to be `AutoReconnect`-backed in + /// production, so reconnects happen inside the stream and never surface to + /// the drain loop. + pub stream: S, /// Sends `StreamUpdate` to the decoder. Should be bounded to - /// `RECONNECT_BACKOFF_CAP` entries. + /// `INGEST_TO_DECODER_CAPACITY` entries. pub tx: Sender, - /// Store implementor; used to checkpoint the slot. - pub store: St, + /// Latest chain slot seen on the slot filter. The ingester is the sole + /// writer. The `Arc` is taken from the caller so the watchdog and the + /// finalization worker can share it as a read handle once they are wired + /// up; neither reads it yet. Cold start is zero (`AtomicU64::default`). + pub latest_chain_slot: Arc, } -impl Ingester { - /// Construct a new ingester. The caller owns the channel capacity decision. - pub fn new(connector: C, tx: Sender, store: St) -> Self { +impl Ingester +where + S: Stream> + Unpin + Send, +{ + /// Construct a new ingester over an already-open update stream. The caller + /// supplies `latest_chain_slot` so it can share the same `Arc` + /// with the partial-event watchdog and the finalization worker, and reuse + /// it across restarts. The caller also owns building the stream, the + /// subscription request, the resume slot, and the reconnect policy that + /// come with it. Production wiring lives in [`Ingester::serve`]. + pub fn new(stream: S, tx: Sender, latest_chain_slot: Arc) -> Self { Self { - connector, + stream, tx, - store, + latest_chain_slot, + } + } + + /// Drain the update stream until it ends or the decoder hangs up. + /// + /// Recoverable stream errors never reach this loop: the `AutoReconnect` + /// wrapper handles them internally. Returns `Ok(())` when the decoder + /// dropped its receiver (clean shutdown), or [`Err(Error)`] when the stream + /// ended terminally (the wrapper gave up on an unrecoverable error, or the + /// stream closed). + pub async fn run(&mut self) -> Result<(), Error> { + while let Some(update) = self.stream.next().await { + match update { + Ok(update) => { + if Self::handle_update(&self.tx, &self.latest_chain_slot, update) + .await + .is_break() + { + tracing::info!("decoder channel closed; ingester stopping"); + return Ok(()); + } + } + Err(status) => { + tracing::warn!(%status, "yellowstone stream error; ingester stopping"); + return Err(Error::Stream(status)); + } + } + } + tracing::info!("yellowstone stream ended; ingester stopping"); + Err(Error::StreamEnded) + } + + /// Dispatch one wire message. Breaks when the decoder is gone. + /// + /// Associated function taking the channel and chain-tip counter by + /// reference rather than `&self`, so the future borrows only those (both + /// `Sync`) fields across awaits. That keeps `run`'s future `Send` without + /// requiring `Ingester: Sync` — the `GeyserStream` field is `Send` but not + /// `Sync`. + async fn handle_update( + tx: &Sender, + latest_chain_slot: &AtomicU64, + update: SubscribeUpdate, + ) -> ControlFlow<()> { + use UpdateOneof::*; + + let Some(update) = update.update_oneof else { + tracing::warn!("update without a payload"); + return ControlFlow::Continue(()); + }; + match update { + Transaction(tx_msg) => Self::handle_transaction(tx, tx_msg).await, + Account(account) => Self::handle_account(tx, account).await, + Slot(slot) => Self::handle_slot(latest_chain_slot, slot).await, + + // Ping/Pong frames carry no data the ingester needs; the library passes them through, + // and we drop them here. + Ping(_) | Pong(_) => ControlFlow::Continue(()), + + // Not part of our subscription; irrelevant to the ingester even if the provider sends + // them. + TransactionStatus(_) | Block(_) | BlockMeta(_) | Entry(_) => ControlFlow::Continue(()), } } - /// TODO: Outer loop: open the subscription, drain it, push into the - /// channel, reconnect on failure with exponential backoff. - pub async fn run(&mut self) { - unimplemented!() + /// Forward a transaction update to the decoder, skipping frames without a + /// body or with a malformed signature. + async fn handle_transaction( + tx: &Sender, + tx_msg: SubscribeUpdateTransaction, + ) -> ControlFlow<()> { + let Some(inner) = tx_msg.transaction else { + tracing::warn!(slot = tx_msg.slot, "transaction update without a body"); + return ControlFlow::Continue(()); + }; + let Ok(signature) = Signature::try_from(inner.signature.as_slice()) else { + tracing::warn!( + slot = tx_msg.slot, + "transaction update with a malformed signature" + ); + return ControlFlow::Continue(()); + }; + Self::forward( + tx, + StreamUpdate::Tx { + slot: tx_msg.slot, + signature, + inner: Box::new(inner), + }, + ) + .await + } + + /// Forward an account update to the decoder, skipping frames without a + /// body. + async fn handle_account( + tx: &Sender, + account: SubscribeUpdateAccount, + ) -> ControlFlow<()> { + let Some(inner) = account.account else { + tracing::warn!(slot = account.slot, "account update without a body"); + return ControlFlow::Continue(()); + }; + let txn_signature = inner + .txn_signature + .as_deref() + .and_then(|bytes| Signature::try_from(bytes).ok()); + Self::forward( + tx, + StreamUpdate::Account { + slot: account.slot, + txn_signature, + inner: Box::new(inner), + }, + ) + .await + } + + /// Consume a slot message: advance the in-memory chain-tip counter. Slot + /// messages never enter the channel, so this always continues. + async fn handle_slot( + latest_chain_slot: &AtomicU64, + slot: SubscribeUpdateSlot, + ) -> ControlFlow<()> { + latest_chain_slot.store(slot.slot, Ordering::Relaxed); + ControlFlow::Continue(()) + } + + /// Push one update into the decoder channel. A full channel is the intended + /// overload signal: warn once, then block until the decoder drains. Breaks + /// when the decoder dropped its receiver. + async fn forward(tx: &Sender, update: StreamUpdate) -> ControlFlow<()> { + match tx.try_send(update) { + Ok(()) => ControlFlow::Continue(()), + Err(TrySendError::Full(update)) => { + tracing::warn!("decoder channel full; ingester blocked on backpressure"); + match tx.send(update).await { + Ok(()) => ControlFlow::Continue(()), + Err(_) => ControlFlow::Break(()), + } + } + Err(TrySendError::Closed(_)) => ControlFlow::Break(()), + } + } +} + +/// Why the ingester stopped. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The persisted watermark could not be read. + #[error("failed to read the resume watermark: {0}")] + Store(#[from] StoreError), + /// The yellowstone subscription could not be opened. + #[error("failed to open the yellowstone subscription: {0}")] + Subscribe(#[from] GeyserGrpcClientError), + /// The stream returned a terminal gRPC error — the `AutoReconnect` wrapper + /// gave up on an unrecoverable failure. + #[error("yellowstone stream error: {0}")] + Stream(#[from] Status), + /// The stream ended without an error — the `AutoReconnect` wrapper stopped. + #[error("yellowstone stream ended")] + StreamEnded, +} + +impl Ingester { + /// Production entrypoint: build the subscription request, resume from the + /// persisted watermark, open an `AutoReconnect`-backed `GeyserStream`, and + /// run the drain loop. + /// + /// The initial `from_slot` is `watermark + 1`, or `None` on a cold start + /// (the provider subscribes from the live tip). Reconnect `from_slot` is + /// driven by the `AutoReconnect` wrapper's `BlockMeta` checkpoint, not this + /// method. + /// + /// Returns `Ok(())` on a clean shutdown (the decoder dropped its receiver), + /// or `Err(Error)` if setup failed or the stream ended terminally. The + /// client is consumed and dropped with the ingester. + /// + /// `latest_chain_slot` is taken from the caller so the same `Arc` can be + /// shared with the watchdog and finalization worker and reused across + /// restarts. + pub async fn serve( + mut client: GeyserGrpcClient, + tx: Sender, + store: St, + latest_chain_slot: Arc, + settlement_program: Pubkey, + solflow_program: Pubkey, + ) -> Result<(), Error> { + let request = subscribe_request(settlement_program, solflow_program); + let from_slot = store.read_watermark().await?.map(|watermark| watermark + 1); + let request = SubscribeRequest { + from_slot, + ..request + }; + + // The sink is the bidi request half: if kept, it can reconfigure the + // subscription at runtime (add/remove a tracked program, change commitment, + // narrow filters). Not used for this puprose at this time, but worth + // considering in case our indexing requirements get more dynamic. + let (_sink, stream) = client.subscribe_with_request(Some(request)).await?; + + let mut ingester = Ingester::new(stream, tx, latest_chain_slot); + ingester.run().await?; + Ok(()) + } +} + +/// Temporary compile-time proof that [`Ingester::serve`]'s future is `Send`. +/// +/// Keep this only until a real `tokio::spawn(Ingester::serve(...))` call site +/// lands; the actual spawn is the better check. Delete this helper then. +#[allow(dead_code)] +fn assert_serve_future_is_send( + client: GeyserGrpcClient, + tx: Sender, + store: St, + latest_chain_slot: Arc, + settlement_program: Pubkey, + solflow_program: Pubkey, +) { + fn is_send(_: F) {} + is_send(Ingester::serve( + client, + tx, + store, + latest_chain_slot, + settlement_program, + solflow_program, + )); +} + +/// The wire-level filter shape: the four named program filters and the +/// `chain_tip` slot filter, multiplexed into a single subscription at +/// `confirmed` commitment. `from_slot` is left unset; [`Ingester::serve`] fills +/// it in from the persisted watermark before subscribing. +/// +/// The library auto-adds a `BlockMeta` + `slot` filter (under its +/// `__autoreconnect` key) so the `AutoReconnect` wrapper can checkpoint and +/// resume on reconnect; those messages are consumed inside the wrapper and +/// never reach the ingester. +/// +/// TODO: source the exact subscriptions from a config file once this crate's +/// configuration module lands. +fn subscribe_request(settlement_program: Pubkey, solflow_program: Pubkey) -> SubscribeRequest { + // `failed: None` includes failed transactions: the failure itself is the + // on-chain signal downstream consumers read. + let transactions = |program: Pubkey| SubscribeRequestFilterTransactions { + vote: Some(false), + failed: None, + account_include: vec![program.to_string()], + ..Default::default() + }; + let accounts = |program: Pubkey| SubscribeRequestFilterAccounts { + owner: vec![program.to_string()], + ..Default::default() + }; + SubscribeRequest { + transactions: [ + ( + "settlement_txs".to_owned(), + transactions(settlement_program), + ), + ("sol_flow_txs".to_owned(), transactions(solflow_program)), + ] + .into(), + accounts: [ + ("settlement_owned".to_owned(), accounts(settlement_program)), + ("sol_flow_owned".to_owned(), accounts(solflow_program)), + ] + .into(), + slots: [( + "chain_tip".to_owned(), + SubscribeRequestFilterSlots { + // one message per slot at the subscription's commitment level + filter_by_commitment: Some(true), + ..Default::default() + }, + )] + .into(), + commitment: Some(CommitmentLevel::Confirmed as i32), + ..Default::default() } } diff --git a/crates/solana-indexer/src/indexer/watchdog.rs b/crates/solana-indexer/src/indexer/watchdog.rs index 5115749afa..1a8a143e07 100644 --- a/crates/solana-indexer/src/indexer/watchdog.rs +++ b/crates/solana-indexer/src/indexer/watchdog.rs @@ -16,15 +16,12 @@ use { std::sync::Arc, }; -#[allow(unused_imports)] -use crate::indexer::ingester::LATEST_CHAIN_SLOT; - /// Partial-event watchdog component. /// /// The watchdog holds a view of the partial-event map the decoder mutates. /// /// Every 500 ms it scans the map and gives up on any partial more than 32 slots -/// behind `LATEST_CHAIN_SLOT`. +/// behind the ingester's latest-chain-slot counter. /// /// Those entries are flushed to `solana.dead_letter` with a reason of /// `AccountUpdateMissing` or `TxUpdateMissing` depending on which half was diff --git a/crates/solana-indexer/src/traits/store.rs b/crates/solana-indexer/src/traits/store.rs index 9aff797218..e98e725ab7 100644 --- a/crates/solana-indexer/src/traits/store.rs +++ b/crates/solana-indexer/src/traits/store.rs @@ -8,66 +8,83 @@ use { events::DecodedEvent, recovery::PdaSnapshot, }, - std::ops::Range, + std::{future::Future, ops::Range}, }; /// PostgreSQL persistence. Used by Decoder, Watchdog, and FinalizationWorker. -pub trait Store { +/// +/// `Send + Sync` so store instances can be shared across async tasks, and each +/// method returns a `Send` future so callers like `Ingester::serve` can be +/// `tokio::spawn`ed. Implementors may still write the bodies as `async fn`; the +/// compiler enforces that the resulting future is `Send`. +pub trait Store: Send + Sync { /// Save decoded events and advance the slot watermark atomically. - async fn persist_events( + fn persist_events( &self, events: Vec, new_watermark: u64, - ) -> Result<(), StoreError>; + ) -> impl Future> + Send; /// Record a slot checkpoint. Rejects downward writes. - async fn write_watermark(&self, slot: u64) -> Result<(), StoreError>; + fn write_watermark(&self, slot: u64) -> impl Future> + Send; /// Read persisted watermark for resuming after reconnect. - async fn read_watermark(&self) -> Result, StoreError>; + fn read_watermark(&self) -> impl Future, StoreError>> + Send; /// Move stale partials (>32 slots behind) to dead letter table. - async fn write_dead_letter(&self, entry: DeadLetterEntry) -> Result<(), StoreError>; + fn write_dead_letter( + &self, + entry: DeadLetterEntry, + ) -> impl Future> + Send; /// Record gaps that fell outside the replay window (write-only in v0.1). - async fn record_lost_slot_range(&self, range: Range) -> Result<(), StoreError>; + fn record_lost_slot_range( + &self, + range: Range, + ) -> impl Future> + Send; /// Primary promotion pass: fetch `confirmed` rows whose `slot` is at or /// above the finalization-window threshold (`slot >= newer_than_slot`). /// `limit` caps the batch at 256 (RPC batch size). Returns `Err` on /// backend failure so the caller can back off rather than /// silently stall on a dead store. - async fn get_confirmed_rows( + fn get_confirmed_rows( &self, newer_than_slot: u64, limit: usize, - ) -> Result, StoreError>; + ) -> impl Future, StoreError>> + Send; /// Safety-net sweep for `confirmed` rows the primary promotion pass missed /// (i.e. rows that aged past the signature-status retention horizon, /// ~150 slots behind the chain tip). Returns `Err` on backend failure /// (see `get_confirmed_rows`). - async fn get_aged_rows( + fn get_aged_rows( &self, retention_horizon_slot: u64, - ) -> Result, StoreError>; + ) -> impl Future, StoreError>> + Send; /// Flip the `commitment` label on a specific row. /// /// The row's `table` field tells the implementer which `solana.*` table to /// UPDATE. - async fn update_commitment( + fn update_commitment( &self, row: &UnfinalizedRow, new_commitment: Commitment, - ) -> Result<(), StoreError>; + ) -> impl Future> + Send; /// Persist a single event during recovery/backfills, not the live ingestion /// path. /// /// Unlike `persist_events`, this does not advance the watermark. - async fn backfill_event(&self, event: DecodedEvent) -> Result<(), StoreError>; + fn backfill_event( + &self, + event: DecodedEvent, + ) -> impl Future> + Send; /// Upsert on-chain PDA state for reconciliation. - async fn upsert_pda_snapshot(&self, snapshot: PdaSnapshot) -> Result<(), StoreError>; + fn upsert_pda_snapshot( + &self, + snapshot: PdaSnapshot, + ) -> impl Future> + Send; } diff --git a/crates/solana-indexer/src/types/commitment.rs b/crates/solana-indexer/src/types/commitment.rs index 0a51aaee77..4bb60a36ae 100644 --- a/crates/solana-indexer/src/types/commitment.rs +++ b/crates/solana-indexer/src/types/commitment.rs @@ -48,7 +48,8 @@ pub struct AccountInfo { /// A `solana.*` row that has not yet reached `finalized` commitment — the kind /// picked up by the aged-row sweep, where `commitment = 'confirmed'` and the -/// row's slot is at least one finalization window behind `LATEST_CHAIN_SLOT`. +/// row's slot is at least one finalization window behind the latest chain +/// slot. #[derive(Debug, Clone)] pub struct UnfinalizedRow { /// Table the row lives in. diff --git a/crates/solana-indexer/src/types/errors.rs b/crates/solana-indexer/src/types/errors.rs index 364f95de62..96d871fda6 100644 --- a/crates/solana-indexer/src/types/errors.rs +++ b/crates/solana-indexer/src/types/errors.rs @@ -42,8 +42,8 @@ pub enum StreamError { #[error("stream send timeout")] SendTimeout, /// The resume slot is outside the provider's replay window. The caller - /// should reset `from_slot` to `LATEST_CHAIN_SLOT − replay_window`, - /// record the lost range, and retry the subscription. + /// should reset `from_slot` to the latest chain slot minus the replay + /// window, record the lost range, and retry the subscription. #[error("replay window exceeded")] ReplayWindowExceeded { /// The slot the subscriber attempted to resume from. diff --git a/crates/solana-indexer/src/types/wire.rs b/crates/solana-indexer/src/types/wire.rs index 8db1bf55b6..c52e94703a 100644 --- a/crates/solana-indexer/src/types/wire.rs +++ b/crates/solana-indexer/src/types/wire.rs @@ -3,7 +3,22 @@ //! Re-exports of the `yellowstone-grpc-proto` message types the indexer //! consumes as its wire-format surface. pub use yellowstone_grpc_proto::{ - geyser::{SubscribeUpdateAccountInfo, SubscribeUpdateTransactionInfo}, + geyser::{ + CommitmentLevel, + SlotStatus, + SubscribeRequest, + SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, + SubscribeRequestFilterTransactions, + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, + SubscribeUpdatePing, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + SubscribeUpdateTransactionInfo, + subscribe_update::UpdateOneof, + }, solana::storage::confirmed_block::{ CompiledInstruction, InnerInstructions, From 78ab02a12aa22e9b4b89ea73966956b4aff077e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 16:34:11 -0300 Subject: [PATCH 2/4] fix(solana-indexer): keep latest chain slot monotonic on out-of-order slot messages --- crates/solana-indexer/src/indexer/ingester.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 553e69345c..59b2e3ac63 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -223,7 +223,7 @@ where latest_chain_slot: &AtomicU64, slot: SubscribeUpdateSlot, ) -> ControlFlow<()> { - latest_chain_slot.store(slot.slot, Ordering::Relaxed); + latest_chain_slot.fetch_max(slot.slot, Ordering::Relaxed); ControlFlow::Continue(()) } From 74663815a6ee7a14523a4f62e341f62a256eba71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 16:35:56 -0300 Subject: [PATCH 3/4] docs(solana-indexer): add TODO about rate-limiting backpressure warning --- crates/solana-indexer/src/indexer/ingester.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 59b2e3ac63..f641ee635e 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -234,6 +234,7 @@ where match tx.try_send(update) { Ok(()) => ControlFlow::Continue(()), Err(TrySendError::Full(update)) => { + // TODO: Rate-limit if sustained backpressure floods logs. tracing::warn!("decoder channel full; ingester blocked on backpressure"); match tx.send(update).await { Ok(()) => ControlFlow::Continue(()), From 301c61b99a836fcf1bee940e5ac43edefe4d2b28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 19:11:52 -0300 Subject: [PATCH 4/4] fix: bump quinn-proto to 0.11.15 (RUSTSEC-2026-0185) Patches remote memory exhaustion DoS in quinn-proto via unbounded out-of-order stream reassembly. Transitive dep via solana-client. --- Cargo.lock | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b6824c035..39d87b171f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,7 +921,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -932,7 +932,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -4070,7 +4070,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.117", + "syn 1.0.109", ] [[package]] @@ -4549,7 +4549,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -6278,7 +6278,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -7523,9 +7523,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.14" +version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +checksum = "4fcb935c5bec503c2f0e306bdd3e58bb9029dcb14fa8d9ac76e3a5256ac0763e" dependencies = [ "aws-lc-rs", "bytes", @@ -8141,7 +8141,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -8200,7 +8200,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -8853,7 +8853,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -11414,7 +11414,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -12564,7 +12564,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]]