From 75a0c754a3c4aa2aee0d50151e6cbacb5527187b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 16:29:54 -0300 Subject: [PATCH 1/2] Add simple unit tests for solana-indexer ingester - Create crates/solana-indexer/src/indexer/ingester/tests.rs with mock-stream tests covering tx/account/slot forwarding, ignored updates, malformed tx skipping, stream errors, clean shutdown, and closed decoder receiver. - Link the test module in ingester.rs with #[cfg(test)] mod tests. - Extend types::wire re-exports to cover the geyser types used by the tests, keeping imports local to the crate's wire surface. --- crates/solana-indexer/src/indexer/ingester.rs | 3 + .../src/indexer/ingester/tests.rs | 208 ++++++++++++++++++ crates/solana-indexer/src/types/wire.rs | 5 + 3 files changed, 216 insertions(+) create mode 100644 crates/solana-indexer/src/indexer/ingester/tests.rs diff --git a/crates/solana-indexer/src/indexer/ingester.rs b/crates/solana-indexer/src/indexer/ingester.rs index 553e69345c..cd7979122c 100644 --- a/crates/solana-indexer/src/indexer/ingester.rs +++ b/crates/solana-indexer/src/indexer/ingester.rs @@ -343,6 +343,9 @@ fn assert_serve_future_is_send( /// /// TODO: source the exact subscriptions from a config file once this crate's /// configuration module lands. +#[cfg(test)] +mod tests; + fn subscribe_request(settlement_program: Pubkey, solflow_program: Pubkey) -> SubscribeRequest { // `failed: None` includes failed transactions: the failure itself is the // on-chain signal downstream consumers read. diff --git a/crates/solana-indexer/src/indexer/ingester/tests.rs b/crates/solana-indexer/src/indexer/ingester/tests.rs new file mode 100644 index 0000000000..ad8de779d5 --- /dev/null +++ b/crates/solana-indexer/src/indexer/ingester/tests.rs @@ -0,0 +1,208 @@ +use { + super::{Error, INGEST_TO_DECODER_CAPACITY, Ingester}, + crate::types::{ + Signature, + shared::StreamUpdate, + wire::{ + SubscribeUpdate, + SubscribeUpdateAccount, + SubscribeUpdateAccountInfo, + SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, + SubscribeUpdateEntry, + SubscribeUpdatePing, + SubscribeUpdatePong, + SubscribeUpdateSlot, + SubscribeUpdateTransaction, + SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, + UpdateOneof, + }, + }, + futures::stream, + std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + tokio::sync::mpsc::channel, + yellowstone_grpc_proto::tonic::Status, +}; + +fn signature(n: u8) -> Signature { + Signature::from([n; 64]) +} + +fn signature_bytes(n: u8) -> Vec { + signature(n).as_ref().to_vec() +} + +fn tx_update(slot: u64, sig: u8) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot, + transaction: Some(SubscribeUpdateTransactionInfo { + signature: signature_bytes(sig), + ..Default::default() + }), + })), + ..Default::default() + }) +} + +fn account_update(slot: u64, sig: u8) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { + slot, + account: Some(SubscribeUpdateAccountInfo { + txn_signature: Some(signature_bytes(sig)), + ..Default::default() + }), + ..Default::default() + })), + ..Default::default() + }) +} + +fn slot_update(slot: u64) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot { + slot, + ..Default::default() + })), + ..Default::default() + }) +} + +fn update_of(update: UpdateOneof) -> Result { + Ok(SubscribeUpdate { + update_oneof: Some(update), + ..Default::default() + }) +} + +fn ingester( + stream: impl stream::Stream> + Unpin + Send, +) -> ( + Ingester> + Unpin + Send>, + tokio::sync::mpsc::Receiver, + Arc, +) { + let (tx, rx) = channel(INGEST_TO_DECODER_CAPACITY); + let latest_chain_slot = Arc::new(AtomicU64::new(0)); + ( + Ingester::new(stream, tx, latest_chain_slot.clone()), + rx, + latest_chain_slot, + ) +} + +#[tokio::test] +async fn transaction_update_with_valid_signature_is_forwarded() { + let signature = signature(1); + let (mut ingester, mut rx, _) = ingester(stream::iter(vec![tx_update(42, 1)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!(matches!(update, StreamUpdate::Tx { slot: 42, signature: s, .. } if s == signature)); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn account_update_with_body_is_forwarded() { + let signature = signature(2); + let (mut ingester, mut rx, _) = ingester(stream::iter(vec![account_update(100, 2)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!( + matches!(update, StreamUpdate::Account { slot: 100, txn_signature: Some(s), .. } if s == signature) + ); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn slot_update_advances_latest_chain_slot() { + let (mut ingester, _rx, slot) = ingester(stream::iter(vec![slot_update(9_001)])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + assert_eq!(slot.load(Ordering::Relaxed), 9_001); +} + +#[tokio::test] +async fn unrelated_and_empty_updates_are_ignored() { + let (mut ingester, mut rx, slot) = ingester(stream::iter(vec![ + Ok(SubscribeUpdate::default()), + update_of(UpdateOneof::Ping(SubscribeUpdatePing::default())), + update_of(UpdateOneof::Pong(SubscribeUpdatePong::default())), + update_of(UpdateOneof::TransactionStatus( + SubscribeUpdateTransactionStatus::default(), + )), + update_of(UpdateOneof::Block(SubscribeUpdateBlock::default())), + update_of(UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta::default())), + update_of(UpdateOneof::Entry(SubscribeUpdateEntry::default())), + tx_update(7, 3), + ])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!(matches!(update, StreamUpdate::Tx { slot: 7, signature: s, .. } if s == signature(3))); + assert!(rx.is_empty()); + assert_eq!(slot.load(Ordering::Relaxed), 0); +} + +#[tokio::test] +async fn transaction_without_body_or_malformed_signature_is_skipped() { + let signature = signature(4); + let (mut ingester, mut rx, _) = ingester(stream::iter(vec![ + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot: 1, + transaction: None, + })), + ..Default::default() + }), + Ok(SubscribeUpdate { + update_oneof: Some(UpdateOneof::Transaction(SubscribeUpdateTransaction { + slot: 2, + transaction: Some(SubscribeUpdateTransactionInfo { + signature: vec![1, 2, 3], + ..Default::default() + }), + })), + ..Default::default() + }), + tx_update(3, 4), + ])); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); + let update = rx.recv().await.unwrap(); + assert!(matches!(update, StreamUpdate::Tx { slot: 3, signature: s, .. } if s == signature)); + assert!(rx.is_empty()); +} + +#[tokio::test] +async fn terminal_grpc_status_returns_stream_error() { + let status = Status::invalid_argument("boom"); + let (mut ingester, _rx, _) = ingester(stream::iter(vec![Err(status.clone())])); + + let result = ingester.run().await; + assert!( + matches!(result, Err(Error::Stream(s)) if s.code() == status.code() && s.message() == status.message()) + ); +} + +#[tokio::test] +async fn clean_stream_end_returns_stream_ended() { + let (mut ingester, _rx, _) = + ingester(stream::iter(Vec::>::new())); + + assert!(matches!(ingester.run().await, Err(Error::StreamEnded))); +} + +#[tokio::test] +async fn closed_decoder_receiver_stops_cleanly() { + let (mut ingester, rx, _) = ingester(stream::iter(vec![tx_update(1, 5)])); + drop(rx); + + assert!(ingester.run().await.is_ok()); +} diff --git a/crates/solana-indexer/src/types/wire.rs b/crates/solana-indexer/src/types/wire.rs index c52e94703a..25216531bb 100644 --- a/crates/solana-indexer/src/types/wire.rs +++ b/crates/solana-indexer/src/types/wire.rs @@ -13,10 +13,15 @@ pub use yellowstone_grpc_proto::{ SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, + SubscribeUpdateBlock, + SubscribeUpdateBlockMeta, + SubscribeUpdateEntry, SubscribeUpdatePing, + SubscribeUpdatePong, SubscribeUpdateSlot, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo, + SubscribeUpdateTransactionStatus, subscribe_update::UpdateOneof, }, solana::storage::confirmed_block::{ From b107847e98442e2d641c73387c79f880fa710ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Mon, 22 Jun 2026 19:11:52 -0300 Subject: [PATCH 2/2] fix: bump quinn-proto to 0.11.15 (RUSTSEC-2026-0185) Patches remote memory exhaustion DoS in quinn-proto via unbounded out-of-order stream reassembly. Transitive dep via solana-client. --- Cargo.lock | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b6824c035..39d87b171f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,7 +921,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -932,7 +932,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -4070,7 +4070,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.117", + "syn 1.0.109", ] [[package]] @@ -4549,7 +4549,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -6278,7 +6278,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -7523,9 +7523,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.14" +version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +checksum = "4fcb935c5bec503c2f0e306bdd3e58bb9029dcb14fa8d9ac76e3a5256ac0763e" dependencies = [ "aws-lc-rs", "bytes", @@ -8141,7 +8141,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -8200,7 +8200,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -8853,7 +8853,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52d1cfed4120b4d927bf7c0f86d2087a4a7d6027c906d9f9d525a80573b9be51" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -11414,7 +11414,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -12564,7 +12564,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]]