-
Notifications
You must be signed in to change notification settings - Fork 176
feat(solana-indexer): PR 4 — Component structs (skeleton declarations) #4514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: solana-indexer/PR3-bootstrap
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| //! The decoder pulls `StreamUpdate`s from the ingester, decodes | ||
| //! settlement-program and SolFlow transactions, joins account-update snapshots, | ||
| //! and persists typed events. | ||
|
|
||
| // TODO: This file only declares the component skeleton. The `run` body is | ||
| // `unimplemented!`; the dispatch logic and persist path arrive in a later | ||
| // change. | ||
|
|
||
| use { | ||
| crate::{ | ||
| traits::store::Store, | ||
| types::{ | ||
| errors::StoreError, | ||
| shared::{PartialEvent, PartialEventKey, StreamUpdate}, | ||
| }, | ||
| }, | ||
| dashmap::DashMap, | ||
| solana_sdk::pubkey::Pubkey, | ||
| std::sync::Arc, | ||
| tokio::sync::mpsc::Receiver, | ||
| }; | ||
|
|
||
| /// Decoder component. | ||
| /// | ||
| /// The watchdog holds a clone of the same `partials` map, so the two operate on | ||
| /// the same concurrent map without any message passing between them. | ||
| pub struct Decoder<St: Store> { | ||
| /// Store implementor. | ||
| pub store: St, | ||
|
|
||
| /// Incoming `StreamUpdate` from the ingester. | ||
| pub rx: Receiver<StreamUpdate>, | ||
|
|
||
| /// Shared in-memory map of partial events keyed by `PartialEventKey`, | ||
| /// holding either-half events waiting for their pair. The watchdog holds a | ||
| /// clone of this `Arc`. | ||
| pub partials: Arc<DashMap<PartialEventKey, PartialEvent>>, | ||
|
|
||
| /// Settlement program id (filter target for the decoder). | ||
| pub settlement_program: Pubkey, | ||
|
|
||
| /// SolFlow program id (filter target for the decoder). | ||
| pub solflow_program: Pubkey, | ||
| } | ||
|
|
||
| impl<St: Store> Decoder<St> { | ||
| /// Construct a new decoder. The caller owns the channel capacity decision. | ||
| pub fn new( | ||
| store: St, | ||
| rx: Receiver<StreamUpdate>, | ||
| partials: Arc<DashMap<PartialEventKey, PartialEvent>>, | ||
| settlement_program: Pubkey, | ||
| solflow_program: Pubkey, | ||
| ) -> Self { | ||
| Self { | ||
| store, | ||
| rx, | ||
| partials, | ||
| settlement_program, | ||
| solflow_program, | ||
| } | ||
| } | ||
|
|
||
| /// Main loop. Pulls `StreamUpdate` from the receiver, runs the decode | ||
| /// pipeline, persists, and records partial events in the shared map for the | ||
| /// watchdog to read. | ||
| pub async fn run(&mut self) -> Result<(), StoreError> { | ||
| unimplemented!() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| //! The finalization worker updates the commitment level of the transactions | ||
| //! tracked by the indexer, promoting rows written at `confirmed` to | ||
| //! `finalized`. | ||
| //! | ||
| //! It does so through two flows. Two are needed because the relevant RPC | ||
| //! methods trade off differently: `getSignatureStatuses` is batchable but the | ||
| //! node only retains statuses for recent slots, while `getTransaction` reaches | ||
| //! arbitrarily old transactions on archival nodes but costs one call per | ||
| //! signature. The batched pass handles the common case cheaply; the per-row | ||
| //! sweep catches rows that age out of it. | ||
| //! | ||
| //! - **Promotion pass**: batch-polls `getSignatureStatuses` (at most | ||
| //! [`PROMOTION_BATCH_LIMIT`] signatures per call) over rows still at | ||
| //! `confirmed` that are at least [`FINALIZATION_WINDOW_SLOTS`] behind the | ||
| //! chain tip, and promotes rows whose `confirmationStatus` is `"finalized"`. | ||
| //! | ||
| //! - **Aged-row sweep**: fallback for rows past the signature-status retention | ||
| //! horizon ([`SIGNATURE_STATUS_RETENTION_SLOTS`]), which the promotion pass | ||
| //! can no longer check. Each row costs one `getTransaction` call; a non-null | ||
| //! response promotes to `finalized`, a null response marks `rolled_back`. | ||
|
|
||
| // TODO: This file only declares the component skeleton. The `run` body is | ||
| // `unimplemented!`; both flows arrive in a later change. | ||
|
|
||
| use crate::traits::{solana_client::SolanaClient, store::Store}; | ||
|
|
||
| /// Typical number of slots for a transaction to finalize (~12.8 s). The | ||
| /// promotion pass skips rows fresher than this. | ||
|
Comment on lines
+27
to
+28
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||
| #[allow(dead_code)] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of using |
||
| pub const FINALIZATION_WINDOW_SLOTS: u64 = 32; | ||
|
|
||
| /// Upper limit for the `getSignatureStatuses` batch RPC call. | ||
| #[allow(dead_code)] | ||
| pub const PROMOTION_BATCH_LIMIT: usize = 256; | ||
|
|
||
| /// Approximate slot horizon past which `getSignatureStatuses` no longer returns | ||
| /// a result. | ||
| #[allow(dead_code)] | ||
| pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150; | ||
|
|
||
| /// Transaction finalization worker. See the module docs for the two flows it | ||
| /// runs. | ||
| pub struct FinalizationWorker<St: Store, R: SolanaClient> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO going down the route of generics instead of |
||
| /// Store implementor. | ||
| pub store: St, | ||
|
|
||
| /// RPC implementor. | ||
| pub rpc: R, | ||
| } | ||
|
|
||
| impl<St: Store, R: SolanaClient> FinalizationWorker<St, R> { | ||
| /// Construct a new finalization worker. | ||
| pub fn new(store: St, rpc: R) -> Self { | ||
| Self { store, rpc } | ||
| } | ||
|
|
||
| /// Outer loop. Runs the promotion pass and the aged-row sweep on a timer. | ||
| /// | ||
| /// Placeholder for now; implemented in a later change. | ||
| pub async fn run(&mut self) { | ||
| unimplemented!("implemented in PR 11–12") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| //! The ingester owns the yellowstone gRPC stream. It drains the socket as fast | ||
| //! as yellowstone delivers, pushes tagged updates into the channel, and updates | ||
| //! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding. | ||
|
|
||
| // TODO: This file only declares the component skeleton. The `run` body is | ||
| // `unimplemented!`; the actual drain and reconnect with backoff logic arrives | ||
| // in a later change. | ||
|
|
||
| use { | ||
| crate::{traits::store::Store, types::shared::StreamUpdate}, | ||
| std::sync::atomic::AtomicU64, | ||
| tokio::sync::mpsc::Sender, | ||
| yellowstone_grpc_client::GrpcConnector, | ||
| }; | ||
|
|
||
| /// The sole writer is the ingester, on every slot-filter message. Anchors the | ||
| /// partial-event watchdog and the finalization worker. Cold start is zero; the | ||
| /// watchdog skips its comparison on the first tick. | ||
| pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a global static Actionable Suggestion: References
Comment on lines
+16
to
+19
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reason that this value does not live inside a specific component that gets accessed via a getter?
Comment on lines
+16
to
+19
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be documented that this slot number is completely unrelated to the indexing progress of the system. Obviously depends on the rest of the indexer logic but it sounds like a counter that tracks fully processed slots might be more useful. |
||
|
|
||
| /// Cap on the exponential backoff between reconnect attempts. | ||
| #[allow(dead_code)] | ||
| pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30); | ||
|
|
||
| /// Capacity of the channel from the ingester to the decoder. | ||
| pub const INGEST_TO_DECODER_CAPACITY: usize = 1024; | ||
|
Comment on lines
+25
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and a few other consts seems like it should be configurable by a config file.
Comment on lines
+25
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain why the ingester is necessary? Why is it better to have it forward the events to a different channel? If the other channel it pushed into does not get cleared fast enough the backpressure will end up here anyway.
Comment on lines
+21
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those should probably be a part of some config with default config values, which aligns with the general codebase. |
||
|
|
||
| /// Ingester component. | ||
| /// | ||
| /// Generic over a `GrpcConnector` implementor so the unit tests can drive it | ||
| /// with a mock. | ||
| pub struct Ingester<C: GrpcConnector, St: Store> { | ||
| /// gRPC connector implementor | ||
| pub connector: C, | ||
|
|
||
| /// Sends `StreamUpdate` to the decoder. Should be bounded to | ||
| /// `RECONNECT_BACKOFF_CAP` entries. | ||
| pub tx: Sender<StreamUpdate>, | ||
|
Comment on lines
+36
to
+38
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment says the size of the channel should be bounded to a unit of time which does not seem to make sense. |
||
|
|
||
| /// Store implementor; used to checkpoint the slot. | ||
| pub store: St, | ||
| } | ||
|
|
||
| impl<C: GrpcConnector, St: Store> Ingester<C, St> { | ||
| /// Construct a new ingester. The caller owns the channel capacity decision. | ||
| pub fn new(connector: C, tx: Sender<StreamUpdate>, store: St) -> Self { | ||
| Self { | ||
| connector, | ||
| tx, | ||
| store, | ||
| } | ||
| } | ||
|
|
||
| /// TODO: Outer loop: open the subscription, drain it, push into the | ||
| /// channel, reconnect on failure with exponential backoff. | ||
| pub async fn run(&mut self) { | ||
| unimplemented!() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| //! Consumer components of the Solana settlement indexer. | ||
| //! | ||
| //! The four components and their roles: | ||
| //! | ||
| //! - [`Ingester`]: subscribes to the Yellowstone gRPC stream and drains it as | ||
| //! fast as updates arrive, forwarding them to the decoder. It does no | ||
| //! decoding itself, so the socket never backs up behind slow processing. It | ||
| //! is also the single writer of the "latest chain slot" counter that the | ||
| //! other components use to know how far the chain has advanced. | ||
| //! | ||
| //! - [`Decoder`]: receives the raw stream updates, picks out transactions | ||
| //! belonging to the settlement and SolFlow programs, matches each transaction | ||
| //! with its corresponding account-update snapshot, and persists the resulting | ||
| //! typed events to the store. | ||
| //! | ||
| //! - [`PartialEventWatchdog`]: some events arrive in two halves (a transaction | ||
| //! update and an account update) that don't always land together. The decoder | ||
| //! parks the half it has in a map shared with the watchdog; the watchdog | ||
| //! periodically scans that map and dead-letters any entry whose other half | ||
| //! never showed up within the slot window, recording which half went missing. | ||
| //! | ||
| //! - [`FinalizationWorker`]: rows are first written at the `confirmed` | ||
| //! commitment level. This worker re-checks them against the chain and | ||
| //! promotes them to `finalized`, or marks them rolled back if the transaction | ||
| //! disappeared. It uses a cheap batched RPC call for recent rows and falls | ||
| //! back to one-call-per-row lookups for rows old enough that the batched | ||
| //! method no longer reports them. | ||
|
|
||
| pub mod decoder; | ||
| pub mod finalization; | ||
| pub mod ingester; | ||
| pub mod watchdog; | ||
|
|
||
| pub use { | ||
| decoder::Decoder, | ||
| finalization::FinalizationWorker, | ||
| ingester::Ingester, | ||
| watchdog::PartialEventWatchdog, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stlooks a bit odd to me.