Report message queue depth#748
Conversation
Add optional MessageQueue.getDepth() support so queue implementations can report backlog size for observability. The core in-process queue reports ready and delayed messages, and the parallel queue delegates to its wrapped queue when available. Implement depth reporting for the Redis, PostgreSQL, MySQL, SQLite, and AMQP queue adapters. Document the API, backend support, and AMQP delayed queue tracking limits, and update the changelog for each affected package. Fixes fedify-dev#735 Assisted-by: Codex:gpt-5.5
|
@codex review |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an optional queue-depth introspection API Changes
Sequence Diagram(s)sequenceDiagram
participant Metrics as Fedify Metrics Collector
participant MQ as MessageQueue
participant Backend as Queue Backend
Metrics->>MQ: call getDepth?()
alt MQ implements getDepth
MQ->>Backend: perform backend-specific count queries
Backend-->>MQ: { queued, ready?, delayed? }
MQ-->>Metrics: Promise<MessageQueueDepth>
else MQ does not implement getDepth
MQ-->>Metrics: undefined (no gauge)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces an optional getDepth() method to the MessageQueue interface and implements it across several backends, including AMQP, MySQL, PostgreSQL, Redis, SQLite, and the in-process queue. This feature enhances observability by reporting the number of queued, ready, and delayed messages. Documentation and comprehensive tests for each implementation have also been added. Feedback was provided regarding the robustness of channel management in the AMQP implementation to prevent redundant closure attempts.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 34427088f7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@CHANGES.md`:
- Around line 11-16: Update the CHANGES.md `@fedify/fedify` entry to explicitly
mention the new MessageQueueDepth return type added alongside the optional
MessageQueue.getDepth() API; reference that InProcessMessageQueue now reports
depth (ready and delayed counts) via MessageQueueDepth and that
ParallelMessageQueue delegates depth reporting to its wrapped queue when it
supports MessageQueueDepth so consumers can discover the new type and shape.
In `@docs/manual/mq.md`:
- Around line 1009-1018: The docs table is missing ParallelMessageQueue in the
queue-depth support matrix; add a new row for `ParallelMessageQueue` stating it
delegates depth reporting to its wrapped backend (e.g., "same as wrapped queue"
or "delegates to wrapped queue") so the matrix accurately reflects the new API
behavior and aligns with `ParallelMessageQueue`'s delegation semantics.
- Around line 698-700: The example implementing async getDepth():
Promise<MessageQueueDepth> should not return a misleading zero-filled fallback;
update the docs to either remove the getDepth() stub entirely or replace the
return with an implementation-specific placeholder/comment (e.g.,
"implementation-specific" or "not implemented") so readers aren't encouraged to
return { queued: 0, ready: 0, delayed: 0 }; reference the getDepth() function
and the MessageQueueDepth type in the surrounding text to clarify that providers
should omit getDepth() if they cannot report real counts.
In `@packages/amqp/src/mq.ts`:
- Around line 381-392: The current catch-all around channel.checkQueue(...)
removes entries from `#delayedQueues` for any error; instead detect amqplib
NOT_FOUND errors (error.code === 404) and only delete the queue in that case. In
the catch block for the await channel.checkQueue(queue) inside the loop over
[...this.#delayedQueues], inspect the caught error: if error.code === 404 then
call this.#delayedQueues.delete(queue) and recreate the channel as you do now;
otherwise rethrow or log and recreate the channel without deleting the queue so
transient/permission errors are not silently purged. Ensure you reference the
existing channel, checkQueue, this.#delayedQueues and
this.#connection.createChannel symbols when implementing the change.
In `@packages/fedify/src/federation/mq.ts`:
- Around line 225-230: Snapshot the batch before scheduling to avoid depth
drift: capture the message count and a shallow copy of the messages array (e.g.,
const count = messages.length and const batch = messages.slice()) prior to
setTimeout, then inside the timeout use count to decrement this.#delayedMessages
and call this.enqueueMany(batch, { ...options, delay: undefined }); this ensures
this.#delayedMessages and enqueueMany operate on immutable values even if the
original messages array is mutated.
In `@packages/mysql/src/mq.test.ts`:
- Around line 100-124: Replace the direct use of node:test's test in the new
MysqlMessageQueue.getDepth() case with the runtime-agnostic fixture test: add an
import "import { test as fixtureTest } from '@fedify/fixture';" at the top and
change the test invocation to use fixtureTest(...) (preserving the test name,
options like { skip: dbUrl == null }, and the async body). Update the file so
any other package-style tests in this file also use fixtureTest to stay
cross-runtime.
In `@packages/postgres/src/mq.ts`:
- Around line 237-238: The readiness cutoff in getDepth() is exclusive and
undercounts messages scheduled exactly at now; update the SQL condition inside
getDepth() from "created + delay < CURRENT_TIMESTAMP" to an inclusive check
("created + delay <= CURRENT_TIMESTAMP") so messages scheduled at the current
timestamp are counted as ready; locate the query used in the getDepth()
implementation and change that operator, then run/update any related tests to
reflect the inclusive cutoff.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: db229740-990b-46c4-8839-8c33b6ac00ff
📒 Files selected for processing (14)
CHANGES.mddocs/manual/mq.mdpackages/amqp/src/mq.test.tspackages/amqp/src/mq.tspackages/fedify/src/federation/mq.test.tspackages/fedify/src/federation/mq.tspackages/mysql/src/mq.test.tspackages/mysql/src/mq.tspackages/postgres/src/mq.test.tspackages/postgres/src/mq.tspackages/redis/src/mq.test.tspackages/redis/src/mq.tspackages/sqlite/src/mq.test.tspackages/sqlite/src/mq.ts
Codecov Report❌ Patch coverage is
🚀 New features to boost your workflow:
|
Snapshot delayed in-process message batches before scheduling them so later caller mutations cannot affect queued depth or delivery. Read Redis queued and ready counts in one Lua script so concurrent writes cannot produce a negative delayed count, and count PostgreSQL messages scheduled exactly at CURRENT_TIMESTAMP as ready. fedify-dev#748 (comment) fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Treat only AMQP 404 queue-not-found errors as evidence that a tracked delayed queue has disappeared. Other passive queue-check failures now propagate instead of silently dropping queue tracking, and the temporary channel is closed through a single helper that tolerates server-closed channels. fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Run the new MysqlMessageQueue.getDepth() coverage through the @fedify/fixture test adapter so the package test remains runtime-agnostic. fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Mention the MessageQueueDepth return type in the changelog, avoid showing a zero-filled custom getDepth() fallback, and document that ParallelMessageQueue reports the same depth support as its wrapped queue. fedify-dev#748 (comment) fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
|
@codex review |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new getDepth() method to the MessageQueue interface and its implementations (InProcessMessageQueue, AmqpMessageQueue, MysqlMessageQueue, PostgresMessageQueue, RedisMessageQueue, and SqliteMessageQueue), allowing for the reporting of queue backlog depth. Documentation and tests have been updated to reflect this new capability. I have provided feedback regarding the need to protect the delayed count calculation against negative values in the database adapters to ensure consistency and robustness.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5b4c2baceb
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/amqp/src/mq.ts`:
- Around line 371-406: getDepth() creates temporary Channel instances via
this.#connection.createChannel() (used in getDepth, closeChannel and when
recreating after a missing delayed queue) but doesn't attach an 'error'
listener, which lets amqplib emit an uncaught channel 'error' and kill the
process; fix by centralizing channel creation into a small helper (e.g.
createSafeChannel) that calls this.#connection.createChannel(), attaches a no-op
or logging 'error' listener to the returned Channel, and returns it; use this
helper wherever channel is created in getDepth(), ensure the error listener is
removed or ignored when closeChannel() successfully closes the channel, and keep
the existing catch/recreate logic around channel.checkQueue and
isQueueNotFoundError unchanged.
In `@packages/fedify/src/federation/mq.test.ts`:
- Around line 166-174: The test assumes mq.enqueueMany(...) with
Temporal.Duration.from({ milliseconds: 10 }) delays long enough for getDepth()
to observe items as delayed; increase the delay (e.g., to 100–250 ms) in the
mq.enqueueMany call so scheduler jitter on CI won't move items to ready before
the assertion; update the Temporal.Duration.from({ milliseconds: ... }) argument
used with mq.enqueueMany and keep the subsequent call to mq.getDepth() and
assertions unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: cdef779f-07b0-4de6-88aa-845f70ee706b
📒 Files selected for processing (8)
CHANGES.mddocs/manual/mq.mdpackages/amqp/src/mq.tspackages/fedify/src/federation/mq.test.tspackages/fedify/src/federation/mq.tspackages/mysql/src/mq.test.tspackages/postgres/src/mq.tspackages/redis/src/mq.ts
Mirror the existing PostgreSQL and MySQL test pattern so Redis, AMQP, and SQLite depth tests can construct Temporal durations when running under Node versions without a native Temporal global. Assisted-by: Codex:gpt-5.5
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/amqp/src/mq.test.ts`:
- Around line 43-82: The teardown currently runs channel.deleteQueue(queue)
without guarding errors, which can prevent deletion of the delayed queue and
closing the channel/connection; update the finally block to perform best-effort
cleanup by wrapping each cleanup step in its own try/catch (or sequential try
blocks): call channel.deleteQueue(queue) in a try/catch, then attempt
channel.deleteQueue(`${delayedQueuePrefix}60000`) in its own try/catch, then
always attempt channel.close() in a try/catch, and finally call conn.close() in
a try/catch so failures in earlier steps don’t skip the remaining cleanup (refer
to channel.deleteQueue, delayedQueuePrefix, channel.close, conn.close).
In `@packages/sqlite/src/mq.test.ts`:
- Around line 31-53: The test currently relies on mq[Symbol.dispose]() to
indirectly close the PlatformDatabase, which can leave the temp DB locked; in
the finally block explicitly call db.close() after mq.drop() and
mq[Symbol.dispose]() to mirror the cleanup pattern used in kv.test.ts so the
PlatformDatabase handle is released; locate the PlatformDatabase instance
created via new PlatformDatabase(dbPath) and ensure db.close() is invoked there.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2a4be814-063b-470f-92a9-e611f51dde62
📒 Files selected for processing (3)
packages/amqp/src/mq.test.tspackages/redis/src/mq.test.tspackages/sqlite/src/mq.test.ts
Clamp delayed queue depth counts defensively across database adapters and route SQLite depth reads through the existing busy retry path. fedify-dev#748 (comment) fedify-dev#748 (comment) fedify-dev#748 (comment) fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Prune tracked delayed queues after their TTL window and add an error listener to temporary depth channels so passive check failures cannot surface as unhandled channel errors. fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Increase the delayed batch snapshot window so busy CI workers do not turn the delayed messages ready before the depth assertion runs. fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Ensure a teardown failure while deleting one queue does not skip the remaining queue cleanup or channel and connection close calls. fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
|
/gemini review |
|
@codex review |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/amqp/src/mq.ts`:
- Around line 401-409: getDepth currently calls `#prepareQueue`() and
`#prepareOrdering`(), which mutates the broker; change getDepth to be passive:
remove the calls to `#prepareQueue` and `#prepareOrdering` and instead only call
channel.checkQueue(this.#queue) (and for ordering, loop over
this.#ordering.partitions and call
channel.checkQueue(this.#getOrderingQueueName(i)) ) inside try/catch blocks that
treat a 404 (queue not found) as zero messages and rethrow other errors; ensure
you reference getDepth, `#queue`, `#ordering.partitions`, `#getOrderingQueueName`, and
channel.checkQueue when making the change so no queue/exchange declarations are
performed during depth probes.
In `@packages/fedify/src/federation/mq.test.ts`:
- Around line 188-216: The test "InProcessMessageQueue.getDepth() excludes
in-flight messages" can leave the listener pending if the assertion fails; wrap
the assertion and related checks in a try/finally so the listener is always
cleaned up: after setting up InProcessMessageQueue(), controller, listening,
handlerStarted, and resolveHandler, use try { await mq.enqueue(...); await
handlerStarted; assertEquals(await mq.getDepth(), ...); resolveHandler?.(); }
finally { controller.abort(); await listening; } to ensure controller.abort()
and awaiting the listening promise happen even on assertion failures; reference
the InProcessMessageQueue class, the listening promise, controller,
handlerStarted, and resolveHandler to locate where to add the try/finally.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a6ade21d-ab18-4926-9490-2e99ba003574
📒 Files selected for processing (6)
packages/amqp/src/mq.tspackages/fedify/src/federation/mq.test.tspackages/mysql/src/mq.tspackages/postgres/src/mq.tspackages/redis/src/mq.tspackages/sqlite/src/mq.ts
There was a problem hiding this comment.
Code Review
This pull request introduces the getDepth() method to the MessageQueue interface, enabling observability of queue backlogs by reporting queued, ready, and delayed message counts. Implementations have been added for InProcessMessageQueue, AmqpMessageQueue, MysqlMessageQueue, PostgresMessageQueue, RedisMessageQueue, and SqliteMessageQueue, along with corresponding tests and documentation updates. Feedback was provided regarding the memory efficiency of iterating over delayed queue keys in the AMQP implementation, suggesting a more memory-efficient strategy for large numbers of queues.
Avoid declaring queues or ordering exchanges while reporting AMQP queue depth. Missing queues now count as zero while preserving the existing channel recreation path for passive checks, and delayed queue tracking is iterated without copying all keys first. fedify-dev#748 (comment) fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
Make the in-flight depth test release its blocked listener even when the assertion fails, so the test cannot leave a pending listen promise behind. fedify-dev#748 (comment) Assisted-by: Codex:gpt-5.5
|
@codex review |
|
/gemini review |
|
Codex Review: Didn't find any major issues. Can't wait for the next one! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
Code Review
This pull request introduces a new MessageQueue.getDepth() method across the core library and its database adapters (AMQP, MySQL, Postgres, Redis, and SQLite) to provide observability into queue backlogs. The implementation includes support for reporting total queued, ready, and delayed message counts where supported by the backend. The review comments provided were filtered out as they were either validating the implementation or providing context that did not require actionable code changes.
Closes #735.
Summary
This adds an optional
MessageQueue.getDepth()API for reporting queue backlog depth, includingqueued,ready, anddelayedcounts where a backend can distinguish them cheaply.Rationale
Observability needs a backend-neutral way to report queue depth without forcing every
MessageQueueimplementation to support it. The API is optional so platforms without reliable queue counts, such as Deno KV and Cloudflare Workers Queues, can omit it instead of returning misleading approximate values.Changes
MessageQueueDepthand optionalMessageQueue.getDepth()in @fedify/fedify.InProcessMessageQueueand delegated reporting fromParallelMessageQueuewhen the wrapped queue supports it.getDepth()for @fedify/redis, @fedify/postgres, @fedify/mysql, @fedify/sqlite, and @fedify/amqp.Notes
AmqpMessageQueue.getDepth()counts configured ready queues and delayed queues created or tracked by the sameAmqpMessageQueueinstance. AMQP 0-9-1 does not provide a portable queue-listing API, so delayed queues created by another process before the instance starts are not included until that instance creates or tracks them.Tests
mise run checkdeno test --check --allow-read --allow-write --allow-env --unstable-kv packages/fedify/src/federation/mq.test.tsdeno test --check --allow-read --allow-write --allow-env packages/sqlite/src/mq.test.tsdeno test --check --allow-read --allow-write --allow-env --allow-net packages/redis/src/mq.test.ts packages/postgres/src/mq.test.ts packages/mysql/src/mq.test.ts packages/amqp/src/mq.test.tsmise run test:deno