Skip to content

Report message queue depth#748

Open
dahlia wants to merge 12 commits intofedify-dev:mainfrom
dahlia:mq-depth
Open

Report message queue depth#748
dahlia wants to merge 12 commits intofedify-dev:mainfrom
dahlia:mq-depth

Conversation

@dahlia
Copy link
Copy Markdown
Member

@dahlia dahlia commented Apr 28, 2026

Closes #735.

Summary

This adds an optional MessageQueue.getDepth() API for reporting queue backlog depth, including queued, ready, and delayed counts where a backend can distinguish them cheaply.

Rationale

Observability needs a backend-neutral way to report queue depth without forcing every MessageQueue implementation to support it. The API is optional so platforms without reliable queue counts, such as Deno KV and Cloudflare Workers Queues, can omit it instead of returning misleading approximate values.

Changes

  • Added MessageQueueDepth and optional MessageQueue.getDepth() in @fedify/fedify.
  • Implemented depth reporting for InProcessMessageQueue and delegated reporting from ParallelMessageQueue when the wrapped queue supports it.
  • Implemented getDepth() for @fedify/redis, @fedify/postgres, @fedify/mysql, @fedify/sqlite, and @fedify/amqp.
  • Documented the API, custom queue guidance, backend support matrix, shared queue counting note, and AMQP delayed queue tracking limits in docs/manual/mq.md.
  • Updated CHANGES.md with package-specific entries for @fedify/fedify, @fedify/amqp, @fedify/mysql, @fedify/postgres, @fedify/redis, and @fedify/sqlite.

Notes

AmqpMessageQueue.getDepth() counts configured ready queues and delayed queues created or tracked by the same AmqpMessageQueue instance. AMQP 0-9-1 does not provide a portable queue-listing API, so delayed queues created by another process before the instance starts are not included until that instance creates or tracks them.

Tests

  • mise run check
  • deno test --check --allow-read --allow-write --allow-env --unstable-kv packages/fedify/src/federation/mq.test.ts
  • deno test --check --allow-read --allow-write --allow-env packages/sqlite/src/mq.test.ts
  • deno test --check --allow-read --allow-write --allow-env --allow-net packages/redis/src/mq.test.ts packages/postgres/src/mq.test.ts packages/mysql/src/mq.test.ts packages/amqp/src/mq.test.ts
  • mise run test:deno

Add optional MessageQueue.getDepth() support so queue implementations can
report backlog size for observability.  The core in-process queue reports
ready and delayed messages, and the parallel queue delegates to its wrapped
queue when available.

Implement depth reporting for the Redis, PostgreSQL, MySQL, SQLite, and
AMQP queue adapters.  Document the API, backend support, and AMQP delayed
queue tracking limits, and update the changelog for each affected package.

Fixes fedify-dev#735

Assisted-by: Codex:gpt-5.5
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

@codex review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 28, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an optional queue-depth introspection API getDepth() returning MessageQueueDepth to MessageQueue, implements it for InProcess, Redis, Postgres, MySQL, SQLite, and AMQP (with AMQP caveats), delegates in ParallelMessageQueue, and updates docs, tests, and changelog to document semantics and observability usage.

Changes

Cohort / File(s) Summary
Core docs & changelog
CHANGES.md, docs/manual/mq.md
Document new optional MessageQueue.getDepth()/MessageQueueDepth, observability example and support matrix, AMQP constraint on delayed queues, and changelog entries per backend.
Fedify core (interface & in-process)
packages/fedify/src/federation/mq.ts, packages/fedify/src/federation/mq.test.ts
Add exported MessageQueueDepth and optional getDepth?() to MessageQueue; implement InProcessMessageQueue.getDepth() with delayed counters; tests for behavior and ParallelMessageQueue delegation.
AMQP implementation & test
packages/amqp/src/mq.ts, packages/amqp/src/mq.test.ts
Track asserted delayed queue names; add AmqpMessageQueue.getDepth() that passively checks main/ordering/delayed queues (prunes missing queues and recreates channel on 404); add integration test validating ready/delayed counts and cleanup.
Redis implementation & test
packages/redis/src/mq.ts, packages/redis/src/mq.test.ts
Add RedisMessageQueue.getDepth() via sorted-set Lua eval (total vs ready by score) and integration test asserting queued/ready/delayed.
Postgres implementation & test
packages/postgres/src/mq.ts, packages/postgres/src/mq.test.ts
Add PostgresMessageQueue.getDepth() using SQL aggregates to compute queued and ready; test verifies counts and cleans up; conditional run when POSTGRES_URL set.
MySQL implementation & test
packages/mysql/src/mq.ts, packages/mysql/src/mq.test.ts
Add MysqlMessageQueue.getDepth() querying table aggregates for queued/ready/delayed; guarded integration test executed when MYSQL_URL present.
SQLite implementation & test
packages/sqlite/src/mq.ts, packages/sqlite/src/mq.test.ts
Add SqliteMessageQueue.getDepth() via single aggregate query (scheduled vs now) and tests asserting before/after enqueue counts.
Test infra additions
packages/*/src/*mq.test.ts
Add Temporal polyfill imports and new integration tests across AMQP, Redis, Postgres, MySQL, SQLite to validate getDepth() behavior and resource cleanup.

Sequence Diagram(s)

sequenceDiagram
  participant Metrics as Fedify Metrics Collector
  participant MQ as MessageQueue
  participant Backend as Queue Backend

  Metrics->>MQ: call getDepth?()
  alt MQ implements getDepth
    MQ->>Backend: perform backend-specific count queries
    Backend-->>MQ: { queued, ready?, delayed? }
    MQ-->>Metrics: Promise<MessageQueueDepth>
  else MQ does not implement getDepth
    MQ-->>Metrics: undefined (no gauge)
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Suggested labels

component/federation, type/documentation

Suggested reviewers

  • sij411
  • 2chanhaeng
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Report message queue depth' clearly and concisely summarizes the main change: adding queue depth reporting functionality.
Description check ✅ Passed The description provides context on the feature, rationale, implementation details, and test commands, all related to the queue depth reporting functionality.
Linked Issues check ✅ Passed All core requirements from #735 are met: optional MessageQueue.getDepth() API with MessageQueueDepth type, implementations for InProcessMessageQueue/ParallelMessageQueue and all specified backends (Redis, Postgres, MySQL, SQLite, AMQP), documentation, comprehensive test coverage, and AMQP limitation notes.
Out of Scope Changes check ✅ Passed All changes are directly within scope of issue #735: the optional queue depth API, implementations across all specified backends, documentation updates, changelog entries, and tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@issues-auto-labeler issues-auto-labeler Bot added the component/otel OpenTelemetry integration label Apr 28, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an optional getDepth() method to the MessageQueue interface and implements it across several backends, including AMQP, MySQL, PostgreSQL, Redis, SQLite, and the in-process queue. This feature enhances observability by reporting the number of queued, ready, and delayed messages. Documentation and comprehensive tests for each implementation have also been added. Feedback was provided regarding the robustness of channel management in the AMQP implementation to prevent redundant closure attempts.

Comment thread packages/amqp/src/mq.ts Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 34427088f7

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread packages/redis/src/mq.ts Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@CHANGES.md`:
- Around line 11-16: Update the CHANGES.md `@fedify/fedify` entry to explicitly
mention the new MessageQueueDepth return type added alongside the optional
MessageQueue.getDepth() API; reference that InProcessMessageQueue now reports
depth (ready and delayed counts) via MessageQueueDepth and that
ParallelMessageQueue delegates depth reporting to its wrapped queue when it
supports MessageQueueDepth so consumers can discover the new type and shape.

In `@docs/manual/mq.md`:
- Around line 1009-1018: The docs table is missing ParallelMessageQueue in the
queue-depth support matrix; add a new row for `ParallelMessageQueue` stating it
delegates depth reporting to its wrapped backend (e.g., "same as wrapped queue"
or "delegates to wrapped queue") so the matrix accurately reflects the new API
behavior and aligns with `ParallelMessageQueue`'s delegation semantics.
- Around line 698-700: The example implementing async getDepth():
Promise<MessageQueueDepth> should not return a misleading zero-filled fallback;
update the docs to either remove the getDepth() stub entirely or replace the
return with an implementation-specific placeholder/comment (e.g.,
"implementation-specific" or "not implemented") so readers aren't encouraged to
return { queued: 0, ready: 0, delayed: 0 }; reference the getDepth() function
and the MessageQueueDepth type in the surrounding text to clarify that providers
should omit getDepth() if they cannot report real counts.

In `@packages/amqp/src/mq.ts`:
- Around line 381-392: The current catch-all around channel.checkQueue(...)
removes entries from `#delayedQueues` for any error; instead detect amqplib
NOT_FOUND errors (error.code === 404) and only delete the queue in that case. In
the catch block for the await channel.checkQueue(queue) inside the loop over
[...this.#delayedQueues], inspect the caught error: if error.code === 404 then
call this.#delayedQueues.delete(queue) and recreate the channel as you do now;
otherwise rethrow or log and recreate the channel without deleting the queue so
transient/permission errors are not silently purged. Ensure you reference the
existing channel, checkQueue, this.#delayedQueues and
this.#connection.createChannel symbols when implementing the change.

In `@packages/fedify/src/federation/mq.ts`:
- Around line 225-230: Snapshot the batch before scheduling to avoid depth
drift: capture the message count and a shallow copy of the messages array (e.g.,
const count = messages.length and const batch = messages.slice()) prior to
setTimeout, then inside the timeout use count to decrement this.#delayedMessages
and call this.enqueueMany(batch, { ...options, delay: undefined }); this ensures
this.#delayedMessages and enqueueMany operate on immutable values even if the
original messages array is mutated.

In `@packages/mysql/src/mq.test.ts`:
- Around line 100-124: Replace the direct use of node:test's test in the new
MysqlMessageQueue.getDepth() case with the runtime-agnostic fixture test: add an
import "import { test as fixtureTest } from '@fedify/fixture';" at the top and
change the test invocation to use fixtureTest(...) (preserving the test name,
options like { skip: dbUrl == null }, and the async body). Update the file so
any other package-style tests in this file also use fixtureTest to stay
cross-runtime.

In `@packages/postgres/src/mq.ts`:
- Around line 237-238: The readiness cutoff in getDepth() is exclusive and
undercounts messages scheduled exactly at now; update the SQL condition inside
getDepth() from "created + delay < CURRENT_TIMESTAMP" to an inclusive check
("created + delay <= CURRENT_TIMESTAMP") so messages scheduled at the current
timestamp are counted as ready; locate the query used in the getDepth()
implementation and change that operator, then run/update any related tests to
reflect the inclusive cutoff.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: db229740-990b-46c4-8839-8c33b6ac00ff

📥 Commits

Reviewing files that changed from the base of the PR and between 9071ca0 and 3442708.

📒 Files selected for processing (14)
  • CHANGES.md
  • docs/manual/mq.md
  • packages/amqp/src/mq.test.ts
  • packages/amqp/src/mq.ts
  • packages/fedify/src/federation/mq.test.ts
  • packages/fedify/src/federation/mq.ts
  • packages/mysql/src/mq.test.ts
  • packages/mysql/src/mq.ts
  • packages/postgres/src/mq.test.ts
  • packages/postgres/src/mq.ts
  • packages/redis/src/mq.test.ts
  • packages/redis/src/mq.ts
  • packages/sqlite/src/mq.test.ts
  • packages/sqlite/src/mq.ts

Comment thread CHANGES.md Outdated
Comment thread docs/manual/mq.md Outdated
Comment thread docs/manual/mq.md
Comment thread packages/amqp/src/mq.ts Outdated
Comment thread packages/fedify/src/federation/mq.ts Outdated
Comment thread packages/mysql/src/mq.test.ts Outdated
Comment thread packages/postgres/src/mq.ts Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

Codecov Report

❌ Patch coverage is 91.30435% with 14 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/amqp/src/mq.ts 80.82% 8 Missing and 6 partials ⚠️
Files with missing lines Coverage Δ
packages/fedify/src/federation/mq.ts 85.52% <100.00%> (+2.10%) ⬆️
packages/mysql/src/mq.ts 85.47% <100.00%> (+0.53%) ⬆️
packages/postgres/src/mq.ts 91.95% <100.00%> (+0.61%) ⬆️
packages/redis/src/mq.ts 83.82% <100.00%> (+2.55%) ⬆️
packages/sqlite/src/mq.ts 82.98% <100.00%> (+1.80%) ⬆️
packages/amqp/src/mq.ts 81.37% <80.82%> (-0.18%) ⬇️
🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

dahlia added 4 commits April 29, 2026 01:55
Snapshot delayed in-process message batches before scheduling them so later
caller mutations cannot affect queued depth or delivery.  Read Redis queued
and ready counts in one Lua script so concurrent writes cannot produce a
negative delayed count, and count PostgreSQL messages scheduled exactly at
CURRENT_TIMESTAMP as ready.

fedify-dev#748 (comment)
fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Treat only AMQP 404 queue-not-found errors as evidence that a tracked
delayed queue has disappeared.  Other passive queue-check failures now
propagate instead of silently dropping queue tracking, and the temporary
channel is closed through a single helper that tolerates server-closed
channels.

fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Run the new MysqlMessageQueue.getDepth() coverage through the
@fedify/fixture test adapter so the package test remains runtime-agnostic.

fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Mention the MessageQueueDepth return type in the changelog, avoid showing a
zero-filled custom getDepth() fallback, and document that ParallelMessageQueue
reports the same depth support as its wrapped queue.

fedify-dev#748 (comment)
fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

@codex review

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new getDepth() method to the MessageQueue interface and its implementations (InProcessMessageQueue, AmqpMessageQueue, MysqlMessageQueue, PostgresMessageQueue, RedisMessageQueue, and SqliteMessageQueue), allowing for the reporting of queue backlog depth. Documentation and tests have been updated to reflect this new capability. I have provided feedback regarding the need to protect the delayed count calculation against negative values in the database adapters to ensure consistency and robustness.

Comment thread packages/mysql/src/mq.ts
Comment thread packages/postgres/src/mq.ts
Comment thread packages/redis/src/mq.ts
Comment thread packages/sqlite/src/mq.ts Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 5b4c2baceb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread packages/sqlite/src/mq.ts Outdated
Comment thread packages/amqp/src/mq.ts Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/amqp/src/mq.ts`:
- Around line 371-406: getDepth() creates temporary Channel instances via
this.#connection.createChannel() (used in getDepth, closeChannel and when
recreating after a missing delayed queue) but doesn't attach an 'error'
listener, which lets amqplib emit an uncaught channel 'error' and kill the
process; fix by centralizing channel creation into a small helper (e.g.
createSafeChannel) that calls this.#connection.createChannel(), attaches a no-op
or logging 'error' listener to the returned Channel, and returns it; use this
helper wherever channel is created in getDepth(), ensure the error listener is
removed or ignored when closeChannel() successfully closes the channel, and keep
the existing catch/recreate logic around channel.checkQueue and
isQueueNotFoundError unchanged.

In `@packages/fedify/src/federation/mq.test.ts`:
- Around line 166-174: The test assumes mq.enqueueMany(...) with
Temporal.Duration.from({ milliseconds: 10 }) delays long enough for getDepth()
to observe items as delayed; increase the delay (e.g., to 100–250 ms) in the
mq.enqueueMany call so scheduler jitter on CI won't move items to ready before
the assertion; update the Temporal.Duration.from({ milliseconds: ... }) argument
used with mq.enqueueMany and keep the subsequent call to mq.getDepth() and
assertions unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: cdef779f-07b0-4de6-88aa-845f70ee706b

📥 Commits

Reviewing files that changed from the base of the PR and between 3442708 and 5b4c2ba.

📒 Files selected for processing (8)
  • CHANGES.md
  • docs/manual/mq.md
  • packages/amqp/src/mq.ts
  • packages/fedify/src/federation/mq.test.ts
  • packages/fedify/src/federation/mq.ts
  • packages/mysql/src/mq.test.ts
  • packages/postgres/src/mq.ts
  • packages/redis/src/mq.ts

Comment thread packages/amqp/src/mq.ts
Comment thread packages/fedify/src/federation/mq.test.ts
Mirror the existing PostgreSQL and MySQL test pattern so Redis, AMQP, and
SQLite depth tests can construct Temporal durations when running under Node
versions without a native Temporal global.

Assisted-by: Codex:gpt-5.5
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/amqp/src/mq.test.ts`:
- Around line 43-82: The teardown currently runs channel.deleteQueue(queue)
without guarding errors, which can prevent deletion of the delayed queue and
closing the channel/connection; update the finally block to perform best-effort
cleanup by wrapping each cleanup step in its own try/catch (or sequential try
blocks): call channel.deleteQueue(queue) in a try/catch, then attempt
channel.deleteQueue(`${delayedQueuePrefix}60000`) in its own try/catch, then
always attempt channel.close() in a try/catch, and finally call conn.close() in
a try/catch so failures in earlier steps don’t skip the remaining cleanup (refer
to channel.deleteQueue, delayedQueuePrefix, channel.close, conn.close).

In `@packages/sqlite/src/mq.test.ts`:
- Around line 31-53: The test currently relies on mq[Symbol.dispose]() to
indirectly close the PlatformDatabase, which can leave the temp DB locked; in
the finally block explicitly call db.close() after mq.drop() and
mq[Symbol.dispose]() to mirror the cleanup pattern used in kv.test.ts so the
PlatformDatabase handle is released; locate the PlatformDatabase instance
created via new PlatformDatabase(dbPath) and ensure db.close() is invoked there.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 2a4be814-063b-470f-92a9-e611f51dde62

📥 Commits

Reviewing files that changed from the base of the PR and between 5b4c2ba and bc8fac0.

📒 Files selected for processing (3)
  • packages/amqp/src/mq.test.ts
  • packages/redis/src/mq.test.ts
  • packages/sqlite/src/mq.test.ts

Comment thread packages/amqp/src/mq.test.ts
Comment thread packages/sqlite/src/mq.test.ts
dahlia added 4 commits April 29, 2026 02:25
Clamp delayed queue depth counts defensively across database
adapters and route SQLite depth reads through the existing busy retry
path.

fedify-dev#748 (comment)
fedify-dev#748 (comment)
fedify-dev#748 (comment)
fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Prune tracked delayed queues after their TTL window and add an error
listener to temporary depth channels so passive check failures cannot
surface as unhandled channel errors.

fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Increase the delayed batch snapshot window so busy CI workers do not
turn the delayed messages ready before the depth assertion runs.

fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Ensure a teardown failure while deleting one queue does not skip the
remaining queue cleanup or channel and connection close calls.

fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

/gemini review

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

@codex review

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/amqp/src/mq.ts`:
- Around line 401-409: getDepth currently calls `#prepareQueue`() and
`#prepareOrdering`(), which mutates the broker; change getDepth to be passive:
remove the calls to `#prepareQueue` and `#prepareOrdering` and instead only call
channel.checkQueue(this.#queue) (and for ordering, loop over
this.#ordering.partitions and call
channel.checkQueue(this.#getOrderingQueueName(i)) ) inside try/catch blocks that
treat a 404 (queue not found) as zero messages and rethrow other errors; ensure
you reference getDepth, `#queue`, `#ordering.partitions`, `#getOrderingQueueName`, and
channel.checkQueue when making the change so no queue/exchange declarations are
performed during depth probes.

In `@packages/fedify/src/federation/mq.test.ts`:
- Around line 188-216: The test "InProcessMessageQueue.getDepth() excludes
in-flight messages" can leave the listener pending if the assertion fails; wrap
the assertion and related checks in a try/finally so the listener is always
cleaned up: after setting up InProcessMessageQueue(), controller, listening,
handlerStarted, and resolveHandler, use try { await mq.enqueue(...); await
handlerStarted; assertEquals(await mq.getDepth(), ...); resolveHandler?.(); }
finally { controller.abort(); await listening; } to ensure controller.abort()
and awaiting the listening promise happen even on assertion failures; reference
the InProcessMessageQueue class, the listening promise, controller,
handlerStarted, and resolveHandler to locate where to add the try/finally.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a6ade21d-ab18-4926-9490-2e99ba003574

📥 Commits

Reviewing files that changed from the base of the PR and between bc8fac0 and b14f2ef.

📒 Files selected for processing (6)
  • packages/amqp/src/mq.ts
  • packages/fedify/src/federation/mq.test.ts
  • packages/mysql/src/mq.ts
  • packages/postgres/src/mq.ts
  • packages/redis/src/mq.ts
  • packages/sqlite/src/mq.ts

Comment thread packages/amqp/src/mq.ts Outdated
Comment thread packages/fedify/src/federation/mq.test.ts
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the getDepth() method to the MessageQueue interface, enabling observability of queue backlogs by reporting queued, ready, and delayed message counts. Implementations have been added for InProcessMessageQueue, AmqpMessageQueue, MysqlMessageQueue, PostgresMessageQueue, RedisMessageQueue, and SqliteMessageQueue, along with corresponding tests and documentation updates. Feedback was provided regarding the memory efficiency of iterating over delayed queue keys in the AMQP implementation, suggesting a more memory-efficient strategy for large numbers of queues.

Comment thread packages/amqp/src/mq.ts Outdated
dahlia added 2 commits April 29, 2026 02:46
Avoid declaring queues or ordering exchanges while reporting AMQP queue
depth.  Missing queues now count as zero while preserving the existing
channel recreation path for passive checks, and delayed queue tracking is
iterated without copying all keys first.

fedify-dev#748 (comment)
fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
Make the in-flight depth test release its blocked listener even when the
assertion fails, so the test cannot leave a pending listen promise behind.

fedify-dev#748 (comment)

Assisted-by: Codex:gpt-5.5
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

@codex review

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented Apr 28, 2026

/gemini review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Can't wait for the next one!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new MessageQueue.getDepth() method across the core library and its database adapters (AMQP, MySQL, Postgres, Redis, and SQLite) to provide observability into queue backlogs. The implementation includes support for reporting total queued, ready, and delayed message counts where supported by the backend. The review comments provided were filtered out as they were either validating the implementation or providing context that did not require actionable code changes.

@dahlia dahlia requested review from 2chanhaeng and sij411 April 28, 2026 23:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component/mq Message queue related component/otel OpenTelemetry integration

Development

Successfully merging this pull request may close these issues.

Optional queue depth reporting for OpenTelemetry metrics

1 participant