Skip to content

Add stats aggregation and publishing for moq-lite sessions#1389

Closed
kixelated wants to merge 7 commits into
mainfrom
claude/plan-relay-stats-NPK7p
Closed

Add stats aggregation and publishing for moq-lite sessions#1389
kixelated wants to merge 7 commits into
mainfrom
claude/plan-relay-stats-NPK7p

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

This PR introduces a comprehensive stats aggregation system for moq-lite sessions that tracks and publishes per-broadcast and per-prefix-level metrics. The system publishes statistics as .stats/<level>/<name> broadcasts on a configurable origin, with separate counters for publisher (egress) and subscriber (ingress) roles.

Key Changes

  • New Stats module (rs/moq-lite/src/stats.rs): Core stats aggregator that:

    • Tracks cumulative counters for both publisher and subscriber roles (broadcasts, subscriptions, bytes, frames, groups)
    • Aggregates metrics across configurable path-prefix levels
    • Spawns per-level snapshot tasks that publish JSON-formatted stats every second
    • Automatically manages task lifecycle (spawns on first subscription, exits when idle)
    • Prevents feedback loops by treating hidden paths (segments starting with .) as no-ops
  • Path visibility filtering: Added Path::is_hidden() method to identify infrastructure paths (e.g., .stats/...) and updated OriginConsumer to:

    • Filter hidden paths from announced() / try_announced() methods
    • Provide separate announced_hidden() / try_announced_hidden() methods for infrastructure broadcasts
    • Support announced_broadcast() for both visible and hidden paths
  • Session integration: Updated Publisher and Subscriber in lite protocol to:

    • Accept optional Stats handle during initialization
    • Create per-broadcast stats guards that track lifecycle events
    • Bump counters for frames, bytes, and groups as data flows through tracks
    • Properly manage guard lifecycle to record open/close events
  • Relay configuration (rs/moq-relay/src/stats.rs): New StatsConfig struct with:

    • name: Identifier for stats broadcasts (disabled when unset)
    • levels: Configurable path-prefix bucketing depth
  • Cluster and connection updates: Integrated stats aggregator into relay's cluster and connection handling to attach stats to all sessions

  • API surface: Exposed stats through Client and Server builders with with_stats() methods

Notable Implementation Details

  • Stats use atomic counters with Relaxed ordering for minimal synchronization overhead
  • Snapshot task uses tokio::time::interval with Delay behavior to handle missed ticks gracefully
  • JSON serialization is hand-rolled to keep serde optional in moq-lite
  • Level keys are computed as path prefixes (0 to N-1 segments, excluding the full path itself)
  • RAII guards (PublisherStats, SubscriberStats, PublisherTrack, SubscriberTrack) ensure proper counter lifecycle management
  • Hidden path detection prevents stats infrastructure from generating its own stats traffic

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP

Add a self-contained Stats aggregator in moq-lite that publishes
.stats/<level>/<name> broadcasts on a caller-provided OriginProducer,
plus the relay wiring to attach one per session.

Each stats broadcast carries two tracks (publisher and subscriber) with
disjoint cumulative counters (broadcasts, subscriptions, bytes, frames,
groups). A relay populates both; one-sided clients populate one. Per-level
snapshot tasks spawn on first subscribe and exit when both roles are idle,
so idle deployments do zero scheduled work. Counter fanout to multiple
levels is per-event via Arc<[Arc<Level>]>; no per-flush walks.

Also adds Path::is_hidden(), filters hidden paths from
OriginConsumer::announced(), and adds announced_hidden() / try_announced_hidden()
so .stats/* broadcasts don't leak into ordinary discovery while still being
addressable. The stats module returns empty handles for hidden paths so
forwarding stats traffic doesn't feed back into the aggregator.

Relay gains a [stats] config section (name, levels). The cluster constructs
the Stats handle in Cluster::new and threads it through with_stats() on
incoming connections, websocket sessions, and outbound cluster dials.

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 96bb6a4f-5158-4c82-afd8-1b4d75fea607

📥 Commits

Reviewing files that changed from the base of the PR and between 2bf47d4 and 93e20b7.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • js/net/src/connection/reload.ts
  • js/net/src/path.ts
💤 Files with no reviewable changes (2)
  • js/net/src/path.ts
  • js/net/src/connection/reload.ts

Walkthrough

This PR wires tier-scoped statistics configuration into moq-relay. It introduces StatsConfig to control whether and how stats publishing occurs, integrates it into the relay Config for CLI/environment/TOML configurability, exposes with_stats(...) builder methods in moq-native Client, Server, and Request, initializes an optional Stats instance in Cluster when configured, determines internal vs external tier based on mTLS peer identity in Connection, propagates stats through WebSocket handlers into MOQ sessions, and adds hidden-path detection in JavaScript Reload to track broadcast paths separately from visible content paths.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main objective: adding stats aggregation and publishing for moq-lite sessions, which is the core feature across all modified files.
Description check ✅ Passed The description comprehensively relates to the changeset, detailing the new Stats module, path visibility filtering, session integration, relay configuration, and API updates that match the file summaries provided.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/plan-relay-stats-NPK7p
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/plan-relay-stats-NPK7p

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
rs/moq-lite/src/client.rs (1)

61-80: 💤 Low value

Missing TODO comment for the IETF Draft-17 stats gap.

rs/moq-lite/src/server.rs has // TODO: ietf code path does not yet record stats. on the analogous Draft-17 early-return path, but client.rs has no equivalent note. Aligning them makes the known limitation consistently visible in both entry points.

🔧 Proposed addition
 			tracing::debug!(version = ?v, "connected");
+			// TODO: ietf code path does not yet record stats.
 			return Ok(Session::new(session, v, None));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/client.rs` around lines 61 - 80, Add the same TODO comment
about missing stats for the IETF Draft-17 code path to the early-return branch
in client.rs where ALPN_17 is matched; specifically, update the block that
selects Version::Ietf(ietf::Version::Draft17), calls ietf::start(...,
ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "//
TODO: ietf code path does not yet record stats." note so the limitation is
visible alongside the analogous comment in server.rs.
rs/moq-lite/src/stats.rs (1)

123-128: entries grows monotonically — consider documenting the memory profile.

StatsInner.entries is never pruned: every unique broadcast prefix that passes through broadcast() creates a Level that lives for the lifetime of the Stats handle. For a long-running relay seeing high broadcast-path cardinality this will accumulate indefinitely. The memory per Level is small (a few AtomicU64s + strings), but it is worth calling out in the struct or Stats::new doc so operators can size the levels config appropriately.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 123 - 128, StatsInner.entries (the
Lock<HashMap<PathOwned, Arc<Level>>>) grows monotonically because broadcast()
inserts a Level per unique prefix and never prunes them; add a short doc comment
on StatsInner or the public Stats::new describing this memory behavior, the
expected per-Level footprint, and advising operators to size the levels config
appropriately for long‑running relays with high broadcast-path cardinality so
they understand the potential unbounded growth. Mention the relevant symbols:
StatsInner, entries, Level, broadcast(), and Stats::new.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 281-284: recv_subscribe currently creates a temporary
PublisherStats via stats.as_ref().map(|s|
s.broadcast(&absolute).publisher().track(&track)) which increments then
immediately decrements the broadcast counters; instead create and retain the
PublisherStats guard alongside the PublisherTrack so it isn't dropped
immediately. Locate recv_subscribe and change the mapping to first call
stats.as_ref().map(|s| s.broadcast(&absolute).publisher()) to obtain a
PublisherStats guard, then call guard.track(&track) and store the guard together
with the PublisherTrack (e.g., return or insert a tuple/struct (PublisherTrack,
PublisherStats) or push the guard into the existing stats_guards collection used
by run_announce) so the guard's lifetime matches the real announce guard rather
than being a temporary. Ensure symbols: recv_subscribe, PublisherStats,
publisher(), track(), run_announce, stats_guards are updated accordingly.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 290-304: The temporary SubscriberStats created in run_subscribe by
calling stats.broadcast(&path).subscriber() is dropped immediately (incrementing
broadcasts_closed) and thus creates spurious open/close events; fix by retaining
the guard for the broadcast's lifetime (like run_announce_prefix does) — obtain
the guard from self.stats.as_ref().map(|s| s.broadcast(&path).subscriber()),
store it in the existing self.stats_guards container (or attach it to the
TrackEntry) so it is not dropped after creating track_stats, and then use the
guard when deriving track_stats (e.g., clone or reference the guard when calling
.track(&track.name)) so the broadcasts counters reflect the true lifetime.

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 785-793: Update the doc for try_announced_hidden to include the
same caveat as try_announced: state that returning None means "no update
available" (not that the consumer has closed), and add a pointer sentence like
"Use Self::try_announced to receive visible paths instead" so callers know the
counterpart method for non-hidden paths; reference the try_announced_hidden and
try_announced symbols when editing the documentation.

In `@rs/moq-lite/src/stats.rs`:
- Around line 538-546: The function level_keys currently treats levels == 0 the
same as levels == 1 (returning root), which is surprising; update level_keys so
that when levels == 0 it returns an empty Vec (no stats) while keeping the
existing broadcast.is_empty() check returning vec![PathOwned::default()] for
empty broadcasts; modify the early conditional in level_keys (and any related
logic using broadcast/PathOwned) accordingly and add a unit test asserting
levels == 0 yields an empty Vec to prevent regression.

In `@rs/moq-relay/src/config.rs`:
- Around line 43-46: Add a post-load validation in Config::load() that checks if
stats.name is set (non-empty/Some) and stats.levels == 0, and if so return an
error (or otherwise fail early) so enabled stats cannot have zero buckets;
locate the check around the deserialization result in Config::load(), reference
the StatsConfig fields stats.name and stats.levels, and ensure the guard
produces a clear error message indicating that stats.levels must be >= 1 when
stats.name is configured.

---

Nitpick comments:
In `@rs/moq-lite/src/client.rs`:
- Around line 61-80: Add the same TODO comment about missing stats for the IETF
Draft-17 code path to the early-return branch in client.rs where ALPN_17 is
matched; specifically, update the block that selects
Version::Ietf(ietf::Version::Draft17), calls ietf::start(...,
ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "//
TODO: ietf code path does not yet record stats." note so the limitation is
visible alongside the analogous comment in server.rs.

In `@rs/moq-lite/src/stats.rs`:
- Around line 123-128: StatsInner.entries (the Lock<HashMap<PathOwned,
Arc<Level>>>) grows monotonically because broadcast() inserts a Level per unique
prefix and never prunes them; add a short doc comment on StatsInner or the
public Stats::new describing this memory behavior, the expected per-Level
footprint, and advising operators to size the levels config appropriately for
long‑running relays with high broadcast-path cardinality so they understand the
potential unbounded growth. Mention the relevant symbols: StatsInner, entries,
Level, broadcast(), and Stats::new.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 06d888d0-654f-4a85-ab59-30ba24894540

📥 Commits

Reviewing files that changed from the base of the PR and between bdda6bd and 39832ec.

📒 Files selected for processing (18)
  • rs/moq-lite/src/client.rs
  • rs/moq-lite/src/lib.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/session.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/path.rs
  • rs/moq-lite/src/server.rs
  • rs/moq-lite/src/stats.rs
  • rs/moq-native/src/client.rs
  • rs/moq-native/src/server.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/config.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/lib.rs
  • rs/moq-relay/src/main.rs
  • rs/moq-relay/src/stats.rs
  • rs/moq-relay/src/websocket.rs

Comment thread rs/moq-net/src/lite/publisher.rs
Comment thread rs/moq-net/src/lite/subscriber.rs
Comment thread rs/moq-lite/src/model/origin.rs Outdated
Comment on lines 785 to 793
/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> {
loop {
let next = self.updates.try_recv().ok()?;
if next.0.is_hidden() {
return Some(next);
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

try_announced_hidden is missing the try_announced doc caveats.

The try_announced doc explicitly states that None means "no update available", not "consumer closed". try_announced_hidden omits this, and also lacks the counterpart pointer ("Use Self::try_announced to receive visible paths instead").

📝 Suggested doc update
-	/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+	/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+	///
+	/// Returns `None` if there is no hidden update available; NOT because the consumer is closed.
+	/// You have to use `is_closed` to check if the consumer is closed.
+	///
+	/// Silently consumes and discards any visible-path updates encountered while scanning.
+	/// Use [`Self::try_announced`] to receive visible paths instead.
+	/// Do not mix calls to both variants on the same consumer instance.
 	pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/model/origin.rs` around lines 785 - 793, Update the doc for
try_announced_hidden to include the same caveat as try_announced: state that
returning None means "no update available" (not that the consumer has closed),
and add a pointer sentence like "Use Self::try_announced to receive visible
paths instead" so callers know the counterpart method for non-hidden paths;
reference the try_announced_hidden and try_announced symbols when editing the
documentation.

Comment thread rs/moq-net/src/stats.rs
Comment on lines +43 to +46
/// Stats publishing configuration. Disabled when `stats.name` is unset.
#[command(flatten)]
#[serde(default)]
pub stats: StatsConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate stats.levels when stats are enabled.

Line 46 accepts deserialized values without enforcing a minimum. If stats.name is set and stats.levels == 0, stats can appear enabled but produce no level buckets. Please add a post-load guard in Config::load().

Suggested guard in Config::load()
 pub fn load() -> anyhow::Result<Self> {
     // Parse just the CLI arguments initially.
     let mut config = Config::parse();

     // If a file is provided, load it and merge the CLI arguments.
     if let Some(file) = config.file {
         config = toml::from_str(&std::fs::read_to_string(file)?)?;
         config.update_from(std::env::args());
     }
+
+    if config.stats.name.is_some() {
+        anyhow::ensure!(config.stats.levels > 0, "stats.levels must be >= 1 when stats are enabled");
+    }

     config.log.init();
     tracing::trace!(?config, "final config");

     Ok(config)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-relay/src/config.rs` around lines 43 - 46, Add a post-load validation
in Config::load() that checks if stats.name is set (non-empty/Some) and
stats.levels == 0, and if so return an error (or otherwise fail early) so
enabled stats cannot have zero buckets; locate the check around the
deserialization result in Config::load(), reference the StatsConfig fields
stats.name and stats.levels, and ensure the guard produces a clear error message
indicating that stats.levels must be >= 1 when stats.name is configured.

Stats broadcasts now carry four tracks per level instead of two:
publisher and publisher_internal for egress, subscriber and
subscriber_internal for ingress. Each pair has its own atomic counter
set so a billing service can rate-differentiate between intra-cluster
and customer traffic.

A `Stats` handle carries a tier flag set via `Stats::external` /
`Stats::internal` / `Stats::tier(bool)`. All four tiers share level
state and a single per-level snapshot task; the task ticks only while
at least one of the four roles has an active subscription.

The relay decides the tier per session: mTLS-authenticated peers
(connection.rs and websocket.rs) and outbound cluster dials all flag
internal; everything else stays external.

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
rs/moq-lite/src/stats.rs (1)

174-177: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Clarify or fix levels = 0 semantics.

Right now level_keys(..., 0) still returns the root bucket, so a config of levels = 0 continues to publish .stats/<name> for non-empty paths even though the public docs only describe 1 as the root-only case. Either make 0 return no keys, or document 0 == 1 and add a regression test so operators do not read 0 as "disabled".

Suggested change
 fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
-	if levels == 0 || broadcast.is_empty() {
+	if broadcast.is_empty() {
 		return vec![PathOwned::default()];
 	}
+	if levels == 0 {
+		return Vec::new();
+	}
 	#[test]
 	fn level_keys_basic() {
 		let key = |s: &str, n: u32| {
 			level_keys(&Path::new(s), n)
 				.into_iter()
 				.map(|p| p.as_str().to_string())
 				.collect::<Vec<_>>()
 		};
 
+		assert!(key("demo/bbb", 0).is_empty());
 		assert_eq!(key("demo/bbb", 1), vec![""]);

As per coding guidelines, "Comments must reflect the current state of the code, not its history. Don't write historical context; describe what the code does today or delete the comment."

Also applies to: 676-679

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 174 - 177, The comment and behavior
are inconsistent for levels = 0; change the level_keys function so that when
levels == 0 it returns no keys (i.e., an empty vec/iterator) instead of
producing the root bucket, update the doc comment above level_keys (and the
similar block at 676-679) to state explicitly that 0 yields no keys, and add a
regression test (e.g., test_level_keys_zero_returns_none) verifying that
level_keys(path, 0) -> empty and level_keys("", 0) behaves as expected to
prevent treating 0 as “disabled” accidentally.
🧹 Nitpick comments (1)
rs/moq-lite/src/stats.rs (1)

576-591: ⚡ Quick win

Use one ts_ms per tick for all four role frames.

This loop emits four frames for one logical snapshot, but each write_snapshot() call computes its own timestamp. That makes exact joins across publisher, publisher_internal, subscriber, and subscriber_internal harder downstream for no real benefit.

Suggested change
 	loop {
 		tick.tick().await;
@@
+		let ts_ms = now_ms();
+
 		// Always emit a snapshot for every track. Idle roles see their counters
 		// held steady; that itself is informative for a billing service.
-		write_snapshot(&mut pub_ext, "publisher", &level, level.publisher_external.snapshot());
+		write_snapshot(
+			&mut pub_ext,
+			"publisher",
+			&level,
+			ts_ms,
+			level.publisher_external.snapshot(),
+		);
 		write_snapshot(
 			&mut pub_int,
 			"publisher_internal",
 			&level,
+			ts_ms,
 			level.publisher_internal.snapshot(),
 		);
-		write_snapshot(&mut sub_ext, "subscriber", &level, level.subscriber_external.snapshot());
+		write_snapshot(
+			&mut sub_ext,
+			"subscriber",
+			&level,
+			ts_ms,
+			level.subscriber_external.snapshot(),
+		);
 		write_snapshot(
 			&mut sub_int,
 			"subscriber_internal",
 			&level,
+			ts_ms,
 			level.subscriber_internal.snapshot(),
 		);
 	}
 }
@@
-fn write_snapshot(track: &mut crate::TrackProducer, role: &str, level: &Level, snap: RoleSnapshot) {
+fn write_snapshot(
+	track: &mut crate::TrackProducer,
+	role: &str,
+	level: &Level,
+	ts_ms: u64,
+	snap: RoleSnapshot,
+) {
@@
-		now_ms(),
+		ts_ms,

Also applies to: 603-617

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 576 - 591, The four write_snapshot()
calls compute their own timestamps causing mismatched ts_ms across role frames;
compute a single timestamp once (e.g., let ts_ms = now_ms() or call the existing
timestamp helper) before the four frames and pass that same ts_ms into each
write_snapshot invocation (and repeat the same change for the later block around
the other four calls at 603-617), ensuring the snapshot values from
level.publisher_external, level.publisher_internal, level.subscriber_external,
and level.subscriber_internal all share the same ts_ms for a single logical
tick.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 174-177: The comment and behavior are inconsistent for levels = 0;
change the level_keys function so that when levels == 0 it returns no keys
(i.e., an empty vec/iterator) instead of producing the root bucket, update the
doc comment above level_keys (and the similar block at 676-679) to state
explicitly that 0 yields no keys, and add a regression test (e.g.,
test_level_keys_zero_returns_none) verifying that level_keys(path, 0) -> empty
and level_keys("", 0) behaves as expected to prevent treating 0 as “disabled”
accidentally.

---

Nitpick comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 576-591: The four write_snapshot() calls compute their own
timestamps causing mismatched ts_ms across role frames; compute a single
timestamp once (e.g., let ts_ms = now_ms() or call the existing timestamp
helper) before the four frames and pass that same ts_ms into each write_snapshot
invocation (and repeat the same change for the later block around the other four
calls at 603-617), ensuring the snapshot values from level.publisher_external,
level.publisher_internal, level.subscriber_external, and
level.subscriber_internal all share the same ts_ms for a single logical tick.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c2c57b10-1417-4b55-9eca-a3cc6420ccd2

📥 Commits

Reviewing files that changed from the base of the PR and between 39832ec and 9a47c6a.

📒 Files selected for processing (4)
  • rs/moq-lite/src/stats.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/websocket.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/websocket.rs

Comment thread rs/moq-lite/src/lite/publisher.rs Outdated
Comment on lines +33 to +36
origin: Option<OriginConsumer>,
stats: Option<MoqStats>,
self_origin: Origin,
version: Version,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting large enough we should have a config struct.

Comment thread rs/moq-lite/src/lite/publisher.rs Outdated
let absolute = origin.absolute(&path).to_owned();
stats_guards
.entry(absolute.clone())
.or_insert_with(|| stats.broadcast(absolute.clone()).publisher());
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a bug if this is ever not vacant?

Comment thread rs/moq-lite/src/model/origin.rs Outdated
Comment on lines 759 to 767
/// Like [`Self::announced`] but returns only paths where any segment starts with `.`.
pub async fn announced_hidden(&mut self) -> Option<OriginAnnounce> {
loop {
let next = self.updates.recv().await?;
if next.0.is_hidden() {
return Some(next);
}
}
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this will consume the updates. You should have announced_all or something that returns all updates, while announced filters out hidden.

Comment thread rs/moq-lite/src/stats.rs Outdated
pub groups: AtomicU64,
}

trait RoleCounters {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this trait at all? Can we just use the Counters directly or something?

Comment thread rs/moq-lite/src/stats.rs Outdated
Comment on lines +142 to +145
publisher_external: PublisherCounters,
publisher_internal: PublisherCounters,
subscriber_external: SubscriberCounters,
subscriber_internal: SubscriberCounters,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not have the split here. Instead we should have internal: Stats and external: Stats, doing Session.with_stats(internal) if the connection is deemed internal.

Comment thread rs/moq-lite/src/stats.rs Outdated
/// Cumulative atomic counters for the publisher role (egress).
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct PublisherCounters {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two counters are the same... you should combine PublisherCounters and SubscriberCounters into Counters?

Comment thread rs/moq-lite/src/stats.rs Outdated

fn write_snapshot(track: &mut crate::TrackProducer, role: &str, level: &Level, snap: RoleSnapshot) {
use std::fmt::Write as _;
// Hand-rolled JSON keeps serde optional in moq-lite while still producing valid output.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use serde please.

kixelated added 2 commits May 17, 2026 22:09
…ts-NPK7p

# Conflicts:
#	rs/moq-lite/src/client.rs
#	rs/moq-lite/src/server.rs
* Combine PublisherCounters and SubscriberCounters into a single
  Counters struct and drop the RoleCounters trait abstraction.
* Replace the internal/external tier flag with two independent Stats
  handles. The relay constructs one for external traffic and one for
  internal (mTLS / cluster peer) traffic, and connection acceptance
  picks the appropriate handle to thread into the session.
* Use serde + serde_json for snapshot frames instead of hand-rolled
  JSON. serde is now a required dependency of moq-lite.
* Replace announced_hidden / try_announced_hidden with announced_all
  / try_announced_all so the hidden filter no longer silently consumes
  visible updates from the same queue.
* Group Publisher and Subscriber constructor arguments into
  PublisherConfig / SubscriberConfig structs.
* Assert via debug_assert that the publisher stats_guards entry is
  vacant when a new active announcement arrives (the origin contract
  guarantees toggling).
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
rs/moq-lite/src/lite/publisher.rs (1)

284-289: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid temporary PublisherStats creation when building track_stats.

stats.broadcast(&absolute).publisher() opens a broadcast-level guard; in this expression it is temporary, so it can be dropped immediately after .track(&track), producing spurious broadcast open/close events per subscription and skewing broadcast counters.

Please derive PublisherTrack from a retained broadcast guard (or a dedicated track-only stats API) so broadcast counters are tied only to announce lifecycle.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 284 - 289, The current
creation of track_stats uses a temporary guard chain
(stats.broadcast(&absolute).publisher().track(&track)) which drops the broadcast
guard immediately and causes spurious open/close events; instead create and
retain the broadcast guard first (e.g., let broadcast_guard =
self.stats.as_ref().map(|s| s.broadcast(&absolute))) and then derive the
publisher/track from that retained guard (call
broadcast_guard.publisher().track(&track)) so the broadcast-level guard lives
for the intended announce lifecycle and broadcast counters are not skewed;
update the logic around track_stats and any related variables to use the
retained broadcast_guard rather than the temporary chain.
rs/moq-lite/src/lite/subscriber.rs (1)

299-303: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t instantiate SubscriberStats as a temporary in run_subscribe.

stats.broadcast(&path).subscriber() appears to open a broadcast guard; using it as a temporary for .track(...) can emit broadcast close events at end of expression, inflating broadcasts/broadcasts_closed for each subscription.

Please create SubscriberTrack from a retained broadcast guard (or a track-only API) so broadcast counters follow announce lifetime, not subscription count.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 299 - 303, In run_subscribe
the current map creates a temporary broadcast guard via
stats.broadcast(&path).subscriber() and immediately calls .track(...), causing
the guard to drop at end of the expression and incorrectly increment
broadcasts_closed per subscription; instead obtain and retain the broadcast
guard before making the SubscriberTrack so the announce lifetime is preserved:
bind let guard = self.stats.as_ref().map(|s| s.broadcast(&path).subscriber())
(or use a track-only API if available), then map that retained guard to create
the SubscriberTrack (or call guard.track(&track.name)) and store that in
track_stats so the broadcast counters reflect the announce lifetime rather than
each subscription. Ensure you update the mapping that sets track_stats to use
the retained guard variable and produce a SubscriberTrack tied to the guard’s
lifetime.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 369-370: The call to
level.origin.publish_broadcast(&level.advertised, broadcast.consume()) currently
ignores its Result, so if OriginProducer rejects publishing the stats topic the
task remains "running" and keeps emitting into an unannounced broadcast; change
the logic in the function containing that call to inspect the Result from
publish_broadcast (for example via match or ?), and on Err ensure you stop the
stats task and clean up the broadcast (drop/close/consume it), propagate or
return the error (or set the running flag to false) instead of proceeding to
return (broadcast, publisher, subscriber); use the symbols publish_broadcast,
level.origin, level.advertised, broadcast.consume(), and the surrounding task
lifecycle/running flag to locate and implement the early-return/cleanup path.

---

Duplicate comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 284-289: The current creation of track_stats uses a temporary
guard chain (stats.broadcast(&absolute).publisher().track(&track)) which drops
the broadcast guard immediately and causes spurious open/close events; instead
create and retain the broadcast guard first (e.g., let broadcast_guard =
self.stats.as_ref().map(|s| s.broadcast(&absolute))) and then derive the
publisher/track from that retained guard (call
broadcast_guard.publisher().track(&track)) so the broadcast-level guard lives
for the intended announce lifecycle and broadcast counters are not skewed;
update the logic around track_stats and any related variables to use the
retained broadcast_guard rather than the temporary chain.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 299-303: In run_subscribe the current map creates a temporary
broadcast guard via stats.broadcast(&path).subscriber() and immediately calls
.track(...), causing the guard to drop at end of the expression and incorrectly
increment broadcasts_closed per subscription; instead obtain and retain the
broadcast guard before making the SubscriberTrack so the announce lifetime is
preserved: bind let guard = self.stats.as_ref().map(|s|
s.broadcast(&path).subscriber()) (or use a track-only API if available), then
map that retained guard to create the SubscriberTrack (or call
guard.track(&track.name)) and store that in track_stats so the broadcast
counters reflect the announce lifetime rather than each subscription. Ensure you
update the mapping that sets track_stats to use the retained guard variable and
produce a SubscriberTrack tied to the guard’s lifetime.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e50562e5-f221-41bb-8096-77b0df09a9b6

📥 Commits

Reviewing files that changed from the base of the PR and between 9a47c6a and 457dec6.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • rs/moq-lite/Cargo.toml
  • rs/moq-lite/src/client.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/session.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/server.rs
  • rs/moq-lite/src/stats.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/websocket.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • rs/moq-lite/src/client.rs
  • rs/moq-lite/src/server.rs
  • rs/moq-relay/src/websocket.rs

Comment thread rs/moq-lite/src/stats.rs Outdated
Comment on lines +369 to +370
level.origin.publish_broadcast(&level.advertised, broadcast.consume());
(broadcast, publisher, subscriber)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle rejected stats publications.

Line 369 ignores the publish_broadcast result. If the provided OriginProducer is not allowed to publish .stats/..., this task stays marked as running and keeps emitting snapshots into a broadcast that was never announced, so stats go silently dark until the source-side tracks close.

Suggested fix
-		level.origin.publish_broadcast(&level.advertised, broadcast.consume());
+		if !level.origin.publish_broadcast(&level.advertised, broadcast.consume()) {
+			tracing::warn!(level = %level.advertised, "stats: failed to publish stats broadcast");
+			clear_task(&level);
+			return;
+		}
 		(broadcast, publisher, subscriber)
 	};
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 369 - 370, The call to
level.origin.publish_broadcast(&level.advertised, broadcast.consume()) currently
ignores its Result, so if OriginProducer rejects publishing the stats topic the
task remains "running" and keeps emitting into an unannounced broadcast; change
the logic in the function containing that call to inspect the Result from
publish_broadcast (for example via match or ?), and on Err ensure you stop the
stats task and clean up the broadcast (drop/close/consume it), propagate or
return the error (or set the running flag to false) instead of proceeding to
return (broadcast, publisher, subscriber); use the symbols publish_broadcast,
level.origin, level.advertised, broadcast.consume(), and the surrounding task
lifecycle/running flag to locate and implement the early-return/cleanup path.

Move stats advertisements from `.stats/<level>/<name>` to
`<level>/.stats/<name>[/<pop>]` so apps subscribing to a prefix don't see
infrastructure broadcasts, and so multiple relays sharing a cluster
origin can coexist without clobbering each other's broadcast. Hidden
paths (any segment starting with `.`) are filtered from `announced` on
both the Rust and JS sides; `announcedAll` exposes the unfiltered view
for tools that need infrastructure visibility.

Collapse `stats_external`/`stats_internal` into a single shared `Stats`
aggregator with `Tier::External`/`Tier::Internal` selecting which counter
set within each level bumps land in. The connection-acceptance path picks
a tier based on whether the peer authenticated via mTLS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 171-173: The public doc for the `levels` parameter is out of sync:
update the doc comment that references `levels` (near the `level_keys()`
implementation and any public struct/constructor docs) to state that `levels=0`
produces no buckets, `levels=1` yields the root bucket plus first-segment keys
(as observed by `level_keys()` and validated by `level_keys_basic`), and
increasing `levels` adds further path-prefix levels accordingly; also update any
relay-facing docs to mirror this corrected behavior so callers don't
misconfigure `levels`.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e80a39e4-0502-446d-8d28-94eede674830

📥 Commits

Reviewing files that changed from the base of the PR and between 457dec6 and 2bf47d4.

📒 Files selected for processing (15)
  • demo/relay/localhost.toml
  • js/lite/src/connection/reload.ts
  • js/lite/src/path.ts
  • rs/moq-lite/src/client.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/session.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/server.rs
  • rs/moq-lite/src/stats.rs
  • rs/moq-native/src/client.rs
  • rs/moq-native/src/server.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/stats.rs
  • rs/moq-relay/src/websocket.rs
✅ Files skipped from review due to trivial changes (1)
  • demo/relay/localhost.toml

Comment thread rs/moq-lite/src/stats.rs Outdated
Comment on lines +171 to +173
/// * `levels` controls how many path-prefix levels stats are bucketed into. A value
/// of `1` produces only the root bucket. `2` adds a per-first-segment bucket, and
/// so on. Levels deeper than the number of segments in a given broadcast path are
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

levels public API docs are out of sync with current bucketing behavior.

At Line 171, the docs say levels=1 is root-only and levels=2 adds first-segment buckets, but level_keys() now yields root + first segment for levels=1 and no buckets for levels=0 (as validated by level_keys_basic). Please update this API description (and matching relay-facing docs) to prevent config mistakes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 171 - 173, The public doc for the
`levels` parameter is out of sync: update the doc comment that references
`levels` (near the `level_keys()` implementation and any public
struct/constructor docs) to state that `levels=0` produces no buckets,
`levels=1` yields the root bucket plus first-segment keys (as observed by
`level_keys()` and validated by `level_keys_basic`), and increasing `levels`
adds further path-prefix levels accordingly; also update any relay-facing docs
to mirror this corrected behavior so callers don't misconfigure `levels`.

kixelated and others added 2 commits May 21, 2026 22:34
# Conflicts:
#	Cargo.lock
#	rs/moq-lite/Cargo.toml
#	rs/moq-lite/src/lib.rs
#	rs/moq-net/src/lite/publisher.rs
#	rs/moq-net/src/lite/session.rs
#	rs/moq-net/src/lite/subscriber.rs
#	rs/moq-net/src/model/origin.rs
#	rs/moq-relay/src/cluster.rs
#	rs/moq-relay/src/websocket.rs
- handle origin rejection of stats broadcast (warn + clear task, don't keep
  publishing into an unannounced broadcast)
- correct `levels` documentation: `0` disables stats entirely, `1` produces
  root + first-segment buckets, etc. (was misdescribed as root-only)
- tighten try_announced_all docstring to mirror try_announced caveats

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated closed this May 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants