Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/liquidity-sources/src/uniswap_v3/graph_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ impl V3PoolDataSource for UniV3SubgraphClient {
ids: &[Address],
target_block: u64,
) -> Result<PoolsWithTicks> {
// `0` means "latest available" (see the V3PoolDataSource contract). The
// subgraph needs a concrete block — `block: { number: 0 }` queries genesis
// — so resolve a recent safe block first.
let target_block = match target_block {
0 => self.get_safe_block().await?,
n => n,
};
let (pools, ticks) = futures::try_join!(
self.get_pools_by_pool_ids(ids, target_block),
self.get_ticks_by_pools_ids(ids, target_block)
Expand Down
319 changes: 280 additions & 39 deletions crates/liquidity-sources/src/uniswap_v3/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,19 @@ impl PoolsCheckpointHandler {
})
}

/// For a given list of token pairs, fetches the pools for the ones that
/// exist in the checkpoint. For the ones that don't exist, flag as
/// missing and expect to exist after the next maintenance run.
fn get(&self, token_pairs: &HashSet<TokenPair>) -> (HashMap<Address, Arc<PoolInfo>>, u64) {
/// Returns cached pools for the pairs, plus the ids of any that exist but
/// aren't cached yet. Misses are recorded for the next maintenance run; the
/// quote path fetches them on demand.
fn get(
&self,
token_pairs: &HashSet<TokenPair>,
) -> (HashMap<Address, Arc<PoolInfo>>, Vec<Address>, u64) {
let mut pool_ids = token_pairs
.iter()
.filter_map(|pair| self.pools_by_token_pair.get(pair))
.flatten()
.peekable();

tracing::trace!("get checkpoint for pool_ids: {:?}", pool_ids);

match pool_ids.peek() {
Some(_) => {
let mut pools_checkpoint = self.pools_checkpoint.lock().unwrap();
Expand All @@ -244,57 +245,93 @@ impl PoolsCheckpointHandler {
}
None => true,
})
.copied()
.collect::<Vec<_>>();

tracing::trace!(
"cache hit: {:?}, cache miss: {:?}",
existing_pools.keys(),
missing_pools
);
pools_checkpoint.missing_pools.extend(missing_pools);
(existing_pools, pools_checkpoint.block_number)
pools_checkpoint
.missing_pools
.extend(missing_pools.iter().copied());
(existing_pools, missing_pools, pools_checkpoint.block_number)
}
None => Default::default(),
}
}

/// Fetches state/ticks for missing pools and moves them from
/// `missing_pools` to `pools`
async fn update_missing_pools(&self) -> Result<()> {
let (missing_pools, block_number) = {
let checkpoint = self.pools_checkpoint.lock().unwrap();
if checkpoint.missing_pools.is_empty() {
return Ok(());
}
(checkpoint.missing_pools.clone(), checkpoint.block_number)
};
tracing::debug!("currently missing pools are {:?}", missing_pools);

let pool_ids = missing_pools.into_iter().collect::<Vec<_>>();
let start = std::time::Instant::now();
/// Fetches and converts the given pools from the source at `target_block`,
/// skipping any that can't be converted yet (e.g. no ticks). Doesn't touch
/// the cache. Pass the checkpoint block to cache for replay, or 0 to take
/// the indexer's current snapshot without blocking.
async fn fetch_pools(
&self,
pool_ids: &[Address],
target_block: u64,
) -> Result<Vec<(Address, Arc<PoolInfo>)>> {
if pool_ids.is_empty() {
return Ok(Vec::new());
}
let pools_with_ticks = self
.source
.get_pools_with_ticks_by_ids(&pool_ids, block_number)
.await;
tracing::debug!(
requested_pools = pool_ids.len(),
time = ?start.elapsed(),
request_successful = pools_with_ticks.is_ok(),
"fetched pool ticks"
);
let pools_with_ticks = pools_with_ticks?;
.get_pools_with_ticks_by_ids(pool_ids, target_block)
.await?;
Ok(pools_with_ticks
.pools
.into_iter()
.filter_map(|pool| {
let id = pool.id;
match PoolInfo::try_from(pool) {
Ok(info) => Some((id, Arc::new(info))),
Err(err) => {
tracing::debug!(?id, ?err, "skipping pool missing tick data");
None
}
}
})
.collect())
}

let mut checkpoint = self.pools_checkpoint.lock().unwrap();
for pool in pools_with_ticks.pools {
checkpoint.missing_pools.remove(&pool.id);
checkpoint.pools.insert(pool.id, Arc::new(pool.try_into()?));
/// Fetches the given pools at the indexer's current block, without waiting.
/// The checkpoint block can sit ahead of the indexer (it tracks latest
/// minus the reorg buffer, the indexer serves finalized), so waiting
/// for it would hang the quote. Returns empty on failure so the quote
/// falls back to cached pools.
async fn fetch_current(&self, pool_ids: &[Address]) -> Vec<(Address, Arc<PoolInfo>)> {
match self.fetch_pools(pool_ids, 0).await {
Comment thread
AryanGodara marked this conversation as resolved.
Ok(pools) => pools,
Err(err) => {
tracing::debug!(
?err,
"on-demand pool fetch failed; serving cached pools only"
);
Vec::new()
}
}
}

tracing::debug!("number of cached pools is {}", checkpoint.pools.len());
/// Fetches the pools flagged missing by `get` at the checkpoint block and
/// caches them. Runs from periodic maintenance.
async fn update_missing_pools(&self) -> Result<()> {
let (missing, block_number) = {
let checkpoint = self.pools_checkpoint.lock().unwrap();
(
checkpoint.missing_pools.iter().copied().collect::<Vec<_>>(),
checkpoint.block_number,
)
};
let fetched = self.fetch_pools(&missing, block_number).await?;
let mut checkpoint = self.pools_checkpoint.lock().unwrap();
for (id, info) in fetched {
checkpoint.missing_pools.remove(&id);
checkpoint.pools.insert(id, info);
}
if !checkpoint.missing_pools.is_empty() {
tracing::warn!(
"not all missing pools updated: {:?}",
checkpoint.missing_pools
remaining = checkpoint.missing_pools.len(),
"not all missing pools updated"
);
}
Ok(())
Expand Down Expand Up @@ -420,14 +457,28 @@ impl PoolFetching for UniswapV3PoolFetcher {

// this is the only place where this function uses checkpoint - no data racing
// between maintenance
let (mut checkpoint, checkpoint_block_number) = self.checkpoint.get(token_pairs);
let (mut checkpoint, missing, checkpoint_block_number) = self.checkpoint.get(token_pairs);
Comment thread
AryanGodara marked this conversation as resolved.

// No pools registered for these pairs: `get` returns block 0, so skip the
// replay below (it would otherwise scan events from block 1) and return.
if checkpoint.is_empty() && missing.is_empty() {
return Ok(Vec::new());
}

if block_number > checkpoint_block_number {
let block_range = RangeInclusive::try_new(checkpoint_block_number + 1, block_number)?;
let events = self.events.lock().await.store().get_events(block_range);
append_events(&mut checkpoint, events);
}

// The warm cache only holds the top pools by raw liquidity, so plenty of
// real pairs are absent. Fetch those on demand instead of waiting for the
// next maintenance tick. They come back current, so merge them after the
// replay rather than replaying them.
if !missing.is_empty() {
checkpoint.extend(self.checkpoint.fetch_current(&missing).await);
}

// return only pools which current liquidity is positive
Ok(checkpoint
.into_values()
Expand Down Expand Up @@ -745,4 +796,194 @@ mod tests {
])
);
}

/// Serves a fixed set of pools (with ticks) from
/// `get_pools_with_ticks_by_ids` so the on-demand fetch path can be
/// exercised without a real source. `served_block` models the indexer's
/// head: a request for a higher `target_block` fails, mirroring the real
/// client's `wait_until` blocking on a block the indexer hasn't reached.
struct StubSource {
with_ticks: HashMap<Address, PoolData>,
served_block: u64,
}

impl StubSource {
fn new(pools: impl IntoIterator<Item = PoolData>) -> Self {
Self {
with_ticks: pools.into_iter().map(|p| (p.id, p)).collect(),
served_block: u64::MAX,
}
}
}

#[async_trait::async_trait]
impl V3PoolDataSource for StubSource {
async fn get_registered_pools(
&self,
_target_block: u64,
) -> Result<crate::uniswap_v3::graph_api::RegisteredPools> {
Ok(Default::default())
}

async fn get_pools_with_ticks_by_ids(
&self,
ids: &[Address],
target_block: u64,
) -> Result<crate::uniswap_v3::graph_api::PoolsWithTicks> {
anyhow::ensure!(
target_block <= self.served_block,
"indexer at {} hasn't reached target block {target_block}",
self.served_block,
);
let pools = ids
.iter()
.filter_map(|id| self.with_ticks.get(id).cloned())
.collect();
Ok(crate::uniswap_v3::graph_api::PoolsWithTicks {
fetched_block_number: self.served_block,
pools,
})
}
}

fn pool_with_ticks(id: Address, token0: Address, token1: Address) -> PoolData {
PoolData {
id,
token0: Token {
id: token0,
decimals: 6,
},
token1: Token {
id: token1,
decimals: 18,
},
fee_tier: U256::from(3000),
liquidity: U256::from(1_000_000u64),
sqrt_price: U256::from(1u64),
tick: 0,
ticks: Some(vec![crate::uniswap_v3::graph_api::TickData {
tick_idx: -100,
liquidity_net: 1_000,
pool_address: id,
}]),
block_number: 100,
}
}

fn handler(source: StubSource, checkpoint: PoolsCheckpoint) -> PoolsCheckpointHandler {
PoolsCheckpointHandler {
source: Arc::new(source),
pools_by_token_pair: HashMap::new(),
pools_checkpoint: Mutex::new(checkpoint),
}
}

/// A pool registered for a pair but absent from the warm cache (the
/// raw-liquidity-ranked top-N) is flagged missing by `get` and resolved by
/// a single on-demand `fetch_current`, without waiting for a
/// maintenance cycle.
#[tokio::test]
async fn on_demand_fetch_serves_uncached_pool() {
let token0 = Address::with_last_byte(1);
let token1 = Address::with_last_byte(2);
let pair = TokenPair::new(token0, token1).unwrap();
let pool = Address::with_last_byte(9);

let mut handler = handler(
StubSource::new([pool_with_ticks(pool, token0, token1)]),
PoolsCheckpoint {
pools: HashMap::new(),
block_number: 100,
missing_pools: HashSet::new(),
},
);
handler.pools_by_token_pair = HashMap::from([(pair, vec![pool])]);

// Cold: not cached, flagged missing.
let (cached, missing, _) = handler.get(&HashSet::from([pair]));
assert!(cached.is_empty());
assert_eq!(missing, vec![pool]);

// On-demand fetch resolves it.
let fetched = handler.fetch_current(&missing).await;
assert_eq!(fetched.len(), 1);
assert_eq!(fetched[0].0, pool);
}

/// The on-demand path must not block on the checkpoint block (which can sit
/// persistently ahead of the indexer's served block); it fetches at the
/// indexer's current block. A source that errors for any block above its
/// head still yields the pool via `fetch_current`.
#[tokio::test]
async fn on_demand_does_not_wait_for_future_block() {
let token0 = Address::with_last_byte(1);
let token1 = Address::with_last_byte(2);
let pool = Address::with_last_byte(9);

let mut source = StubSource::new([pool_with_ticks(pool, token0, token1)]);
source.served_block = 50; // indexer behind the checkpoint below

let handler = handler(
source,
PoolsCheckpoint {
pools: HashMap::new(),
block_number: 100, // checkpoint ahead of the indexer
missing_pools: HashSet::new(),
},
);

// fetch_current uses target 0, so it succeeds despite served_block <
// checkpoint.
let fetched = handler.fetch_current(&[pool]).await;
assert_eq!(fetched.len(), 1);
assert_eq!(fetched[0].0, pool);

// Fetching at the checkpoint block would fail (indexer hasn't reached it).
assert!(handler.fetch_pools(&[pool], 100).await.is_err());
}

/// Unknown pairs have no registered pools, so `get` reports block 0 with
/// nothing cached or missing — the signal `fetch` uses to skip the replay.
#[test]
fn get_reports_zero_block_for_unknown_pairs() {
let handler = handler(
StubSource::new([]),
PoolsCheckpoint {
pools: HashMap::new(),
block_number: 100,
missing_pools: HashSet::new(),
},
);
let pair = TokenPair::new(Address::with_last_byte(1), Address::with_last_byte(2)).unwrap();
let (cached, missing, block) = handler.get(&HashSet::from([pair]));
assert!(cached.is_empty());
assert!(missing.is_empty());
assert_eq!(block, 0);
}

/// A pool that can't be converted (e.g. ticks not yet available) is skipped
/// rather than failing the whole batch; the convertible pool is returned.
#[tokio::test]
async fn fetch_pools_skips_unconvertible_pool() {
let token0 = Address::with_last_byte(1);
let token1 = Address::with_last_byte(2);
let good = Address::with_last_byte(9);
let bad = Address::with_last_byte(10);

let mut bad_pool = pool_with_ticks(bad, token0, token1);
bad_pool.ticks = None; // PoolInfo::try_from fails on missing ticks

let handler = handler(
StubSource::new([pool_with_ticks(good, token0, token1), bad_pool]),
PoolsCheckpoint {
pools: HashMap::new(),
block_number: 100,
missing_pools: HashSet::new(),
},
);

let fetched = handler.fetch_pools(&[good, bad], 0).await.unwrap();
assert_eq!(fetched.len(), 1);
assert_eq!(fetched[0].0, good);
}
}
Loading