diff --git a/crates/liquidity-sources/src/uniswap_v3/graph_api.rs b/crates/liquidity-sources/src/uniswap_v3/graph_api.rs index 39d0245e2f..db436a3342 100644 --- a/crates/liquidity-sources/src/uniswap_v3/graph_api.rs +++ b/crates/liquidity-sources/src/uniswap_v3/graph_api.rs @@ -260,6 +260,13 @@ impl V3PoolDataSource for UniV3SubgraphClient { ids: &[Address], target_block: u64, ) -> Result { + // `0` means "latest available" (see the V3PoolDataSource contract). The + // subgraph needs a concrete block — `block: { number: 0 }` queries genesis + // — so resolve a recent safe block first. + let target_block = match target_block { + 0 => self.get_safe_block().await?, + n => n, + }; let (pools, ticks) = futures::try_join!( self.get_pools_by_pool_ids(ids, target_block), self.get_ticks_by_pools_ids(ids, target_block) diff --git a/crates/liquidity-sources/src/uniswap_v3/pool_fetching.rs b/crates/liquidity-sources/src/uniswap_v3/pool_fetching.rs index 5ad81425ec..154e72d9d8 100644 --- a/crates/liquidity-sources/src/uniswap_v3/pool_fetching.rs +++ b/crates/liquidity-sources/src/uniswap_v3/pool_fetching.rs @@ -220,18 +220,19 @@ impl PoolsCheckpointHandler { }) } - /// For a given list of token pairs, fetches the pools for the ones that - /// exist in the checkpoint. For the ones that don't exist, flag as - /// missing and expect to exist after the next maintenance run. - fn get(&self, token_pairs: &HashSet) -> (HashMap>, u64) { + /// Returns cached pools for the pairs, plus the ids of any that exist but + /// aren't cached yet. Misses are recorded for the next maintenance run; the + /// quote path fetches them on demand. + fn get( + &self, + token_pairs: &HashSet, + ) -> (HashMap>, Vec
, u64) { let mut pool_ids = token_pairs .iter() .filter_map(|pair| self.pools_by_token_pair.get(pair)) .flatten() .peekable(); - tracing::trace!("get checkpoint for pool_ids: {:?}", pool_ids); - match pool_ids.peek() { Some(_) => { let mut pools_checkpoint = self.pools_checkpoint.lock().unwrap(); @@ -244,6 +245,7 @@ impl PoolsCheckpointHandler { } None => true, }) + .copied() .collect::>(); tracing::trace!( @@ -251,50 +253,85 @@ impl PoolsCheckpointHandler { existing_pools.keys(), missing_pools ); - pools_checkpoint.missing_pools.extend(missing_pools); - (existing_pools, pools_checkpoint.block_number) + pools_checkpoint + .missing_pools + .extend(missing_pools.iter().copied()); + (existing_pools, missing_pools, pools_checkpoint.block_number) } None => Default::default(), } } - /// Fetches state/ticks for missing pools and moves them from - /// `missing_pools` to `pools` - async fn update_missing_pools(&self) -> Result<()> { - let (missing_pools, block_number) = { - let checkpoint = self.pools_checkpoint.lock().unwrap(); - if checkpoint.missing_pools.is_empty() { - return Ok(()); - } - (checkpoint.missing_pools.clone(), checkpoint.block_number) - }; - tracing::debug!("currently missing pools are {:?}", missing_pools); - - let pool_ids = missing_pools.into_iter().collect::>(); - let start = std::time::Instant::now(); + /// Fetches and converts the given pools from the source at `target_block`, + /// skipping any that can't be converted yet (e.g. no ticks). Doesn't touch + /// the cache. Pass the checkpoint block to cache for replay, or 0 to take + /// the indexer's current snapshot without blocking. + async fn fetch_pools( + &self, + pool_ids: &[Address], + target_block: u64, + ) -> Result)>> { + if pool_ids.is_empty() { + return Ok(Vec::new()); + } let pools_with_ticks = self .source - .get_pools_with_ticks_by_ids(&pool_ids, block_number) - .await; - tracing::debug!( - requested_pools = pool_ids.len(), - time = ?start.elapsed(), - request_successful = pools_with_ticks.is_ok(), - "fetched pool ticks" - ); - let pools_with_ticks = pools_with_ticks?; + .get_pools_with_ticks_by_ids(pool_ids, target_block) + .await?; + Ok(pools_with_ticks + .pools + .into_iter() + .filter_map(|pool| { + let id = pool.id; + match PoolInfo::try_from(pool) { + Ok(info) => Some((id, Arc::new(info))), + Err(err) => { + tracing::debug!(?id, ?err, "skipping pool missing tick data"); + None + } + } + }) + .collect()) + } - let mut checkpoint = self.pools_checkpoint.lock().unwrap(); - for pool in pools_with_ticks.pools { - checkpoint.missing_pools.remove(&pool.id); - checkpoint.pools.insert(pool.id, Arc::new(pool.try_into()?)); + /// Fetches the given pools at the indexer's current block, without waiting. + /// The checkpoint block can sit ahead of the indexer (it tracks latest + /// minus the reorg buffer, the indexer serves finalized), so waiting + /// for it would hang the quote. Returns empty on failure so the quote + /// falls back to cached pools. + async fn fetch_current(&self, pool_ids: &[Address]) -> Vec<(Address, Arc)> { + match self.fetch_pools(pool_ids, 0).await { + Ok(pools) => pools, + Err(err) => { + tracing::debug!( + ?err, + "on-demand pool fetch failed; serving cached pools only" + ); + Vec::new() + } } + } - tracing::debug!("number of cached pools is {}", checkpoint.pools.len()); + /// Fetches the pools flagged missing by `get` at the checkpoint block and + /// caches them. Runs from periodic maintenance. + async fn update_missing_pools(&self) -> Result<()> { + let (missing, block_number) = { + let checkpoint = self.pools_checkpoint.lock().unwrap(); + ( + checkpoint.missing_pools.iter().copied().collect::>(), + checkpoint.block_number, + ) + }; + let fetched = self.fetch_pools(&missing, block_number).await?; + let mut checkpoint = self.pools_checkpoint.lock().unwrap(); + for (id, info) in fetched { + checkpoint.missing_pools.remove(&id); + checkpoint.pools.insert(id, info); + } if !checkpoint.missing_pools.is_empty() { tracing::warn!( - "not all missing pools updated: {:?}", - checkpoint.missing_pools + remaining = checkpoint.missing_pools.len(), + "not all missing pools updated" ); } Ok(()) @@ -420,7 +457,13 @@ impl PoolFetching for UniswapV3PoolFetcher { // this is the only place where this function uses checkpoint - no data racing // between maintenance - let (mut checkpoint, checkpoint_block_number) = self.checkpoint.get(token_pairs); + let (mut checkpoint, missing, checkpoint_block_number) = self.checkpoint.get(token_pairs); + + // No pools registered for these pairs: `get` returns block 0, so skip the + // replay below (it would otherwise scan events from block 1) and return. + if checkpoint.is_empty() && missing.is_empty() { + return Ok(Vec::new()); + } if block_number > checkpoint_block_number { let block_range = RangeInclusive::try_new(checkpoint_block_number + 1, block_number)?; @@ -428,6 +471,14 @@ impl PoolFetching for UniswapV3PoolFetcher { append_events(&mut checkpoint, events); } + // The warm cache only holds the top pools by raw liquidity, so plenty of + // real pairs are absent. Fetch those on demand instead of waiting for the + // next maintenance tick. They come back current, so merge them after the + // replay rather than replaying them. + if !missing.is_empty() { + checkpoint.extend(self.checkpoint.fetch_current(&missing).await); + } + // return only pools which current liquidity is positive Ok(checkpoint .into_values() @@ -745,4 +796,194 @@ mod tests { ]) ); } + + /// Serves a fixed set of pools (with ticks) from + /// `get_pools_with_ticks_by_ids` so the on-demand fetch path can be + /// exercised without a real source. `served_block` models the indexer's + /// head: a request for a higher `target_block` fails, mirroring the real + /// client's `wait_until` blocking on a block the indexer hasn't reached. + struct StubSource { + with_ticks: HashMap, + served_block: u64, + } + + impl StubSource { + fn new(pools: impl IntoIterator) -> Self { + Self { + with_ticks: pools.into_iter().map(|p| (p.id, p)).collect(), + served_block: u64::MAX, + } + } + } + + #[async_trait::async_trait] + impl V3PoolDataSource for StubSource { + async fn get_registered_pools( + &self, + _target_block: u64, + ) -> Result { + Ok(Default::default()) + } + + async fn get_pools_with_ticks_by_ids( + &self, + ids: &[Address], + target_block: u64, + ) -> Result { + anyhow::ensure!( + target_block <= self.served_block, + "indexer at {} hasn't reached target block {target_block}", + self.served_block, + ); + let pools = ids + .iter() + .filter_map(|id| self.with_ticks.get(id).cloned()) + .collect(); + Ok(crate::uniswap_v3::graph_api::PoolsWithTicks { + fetched_block_number: self.served_block, + pools, + }) + } + } + + fn pool_with_ticks(id: Address, token0: Address, token1: Address) -> PoolData { + PoolData { + id, + token0: Token { + id: token0, + decimals: 6, + }, + token1: Token { + id: token1, + decimals: 18, + }, + fee_tier: U256::from(3000), + liquidity: U256::from(1_000_000u64), + sqrt_price: U256::from(1u64), + tick: 0, + ticks: Some(vec![crate::uniswap_v3::graph_api::TickData { + tick_idx: -100, + liquidity_net: 1_000, + pool_address: id, + }]), + block_number: 100, + } + } + + fn handler(source: StubSource, checkpoint: PoolsCheckpoint) -> PoolsCheckpointHandler { + PoolsCheckpointHandler { + source: Arc::new(source), + pools_by_token_pair: HashMap::new(), + pools_checkpoint: Mutex::new(checkpoint), + } + } + + /// A pool registered for a pair but absent from the warm cache (the + /// raw-liquidity-ranked top-N) is flagged missing by `get` and resolved by + /// a single on-demand `fetch_current`, without waiting for a + /// maintenance cycle. + #[tokio::test] + async fn on_demand_fetch_serves_uncached_pool() { + let token0 = Address::with_last_byte(1); + let token1 = Address::with_last_byte(2); + let pair = TokenPair::new(token0, token1).unwrap(); + let pool = Address::with_last_byte(9); + + let mut handler = handler( + StubSource::new([pool_with_ticks(pool, token0, token1)]), + PoolsCheckpoint { + pools: HashMap::new(), + block_number: 100, + missing_pools: HashSet::new(), + }, + ); + handler.pools_by_token_pair = HashMap::from([(pair, vec![pool])]); + + // Cold: not cached, flagged missing. + let (cached, missing, _) = handler.get(&HashSet::from([pair])); + assert!(cached.is_empty()); + assert_eq!(missing, vec![pool]); + + // On-demand fetch resolves it. + let fetched = handler.fetch_current(&missing).await; + assert_eq!(fetched.len(), 1); + assert_eq!(fetched[0].0, pool); + } + + /// The on-demand path must not block on the checkpoint block (which can sit + /// persistently ahead of the indexer's served block); it fetches at the + /// indexer's current block. A source that errors for any block above its + /// head still yields the pool via `fetch_current`. + #[tokio::test] + async fn on_demand_does_not_wait_for_future_block() { + let token0 = Address::with_last_byte(1); + let token1 = Address::with_last_byte(2); + let pool = Address::with_last_byte(9); + + let mut source = StubSource::new([pool_with_ticks(pool, token0, token1)]); + source.served_block = 50; // indexer behind the checkpoint below + + let handler = handler( + source, + PoolsCheckpoint { + pools: HashMap::new(), + block_number: 100, // checkpoint ahead of the indexer + missing_pools: HashSet::new(), + }, + ); + + // fetch_current uses target 0, so it succeeds despite served_block < + // checkpoint. + let fetched = handler.fetch_current(&[pool]).await; + assert_eq!(fetched.len(), 1); + assert_eq!(fetched[0].0, pool); + + // Fetching at the checkpoint block would fail (indexer hasn't reached it). + assert!(handler.fetch_pools(&[pool], 100).await.is_err()); + } + + /// Unknown pairs have no registered pools, so `get` reports block 0 with + /// nothing cached or missing — the signal `fetch` uses to skip the replay. + #[test] + fn get_reports_zero_block_for_unknown_pairs() { + let handler = handler( + StubSource::new([]), + PoolsCheckpoint { + pools: HashMap::new(), + block_number: 100, + missing_pools: HashSet::new(), + }, + ); + let pair = TokenPair::new(Address::with_last_byte(1), Address::with_last_byte(2)).unwrap(); + let (cached, missing, block) = handler.get(&HashSet::from([pair])); + assert!(cached.is_empty()); + assert!(missing.is_empty()); + assert_eq!(block, 0); + } + + /// A pool that can't be converted (e.g. ticks not yet available) is skipped + /// rather than failing the whole batch; the convertible pool is returned. + #[tokio::test] + async fn fetch_pools_skips_unconvertible_pool() { + let token0 = Address::with_last_byte(1); + let token1 = Address::with_last_byte(2); + let good = Address::with_last_byte(9); + let bad = Address::with_last_byte(10); + + let mut bad_pool = pool_with_ticks(bad, token0, token1); + bad_pool.ticks = None; // PoolInfo::try_from fails on missing ticks + + let handler = handler( + StubSource::new([pool_with_ticks(good, token0, token1), bad_pool]), + PoolsCheckpoint { + pools: HashMap::new(), + block_number: 100, + missing_pools: HashSet::new(), + }, + ); + + let fetched = handler.fetch_pools(&[good, bad], 0).await.unwrap(); + assert_eq!(fetched.len(), 1); + assert_eq!(fetched[0].0, good); + } }