Skip to content

feat: persist MQTT 5.0 subscription identifier#108

Merged
robertsLando merged 3 commits into
moscajs:mainfrom
BenjaminDobler:feat/mqtt5
Jun 29, 2026
Merged

feat: persist MQTT 5.0 subscription identifier#108
robertsLando merged 3 commits into
moscajs:mainfrom
BenjaminDobler:feat/mqtt5

Conversation

@BenjaminDobler

@BenjaminDobler BenjaminDobler commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

What

Store the optional MQTT 5.0 subscriptionIdentifier alongside each subscription, so it survives a non-clean session reconnect and can be echoed on matching PUBLISHes after the session is restored.

It is written only when present, keeping the stored shape of pre-v5 subscriptions unchanged.

Also adds a subscriptionIdentifier round-trip test to the abstract conformance suite (abstract.js).

Why

Prerequisite for MQTT 5.0 subscription identifiers in aedes. Clean-session subscriptions work without this; persistent sessions need the identifier round-tripped through the store.

Backends: no code change needed ✅

The in-memory store was the only implementation that projected a fixed { qos, rh, rap, nl } shape, so it needed this change. The external backends (aedes-persistence-redis, -mongodb, -level) already serialize the whole subscription object (redis/level via msgpack, mongodb via { ...sub }), so they preserve subscriptionIdentifier automatically.

Because all three backends run this abstract.js conformance suite against their own store, the new round-trip test validates them too — no backend code changes required.

Tests

Full abstract conformance suite passes (240/240), including the new subscription-identifier test.

🤖 Generated with Claude Code

Store the optional `subscriptionIdentifier` alongside each subscription so
it survives a non-clean session reconnect. It is only written when present,
keeping the stored shape of pre-v5 subscriptions unchanged (and the abstract
conformance suite green).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Asserts that an MQTT 5.0 per-subscription `subscriptionIdentifier` round-trips
through the store. Because every backend (redis, mongodb, level) runs this
abstract suite against its own store, this both validates the in-memory
implementation and confirms the backends already preserve the identifier
(they serialize whole subscription objects), so no backend code change is
required for subscription-identifier support.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@robertsLando robertsLando left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review — persist MQTT 5.0 subscription identifier

Verdict: Ship with minor changes. The approach is sound and targets the only persistence path that actually matters.

I reviewed this in the full context of the consumer (moscajs/aedes#1088) and the downstream backends (aedes-persistence-redis, -mongodb, -level), including the cluster broadcast path.

Why the approach is correct

aedes#1088 reads the identifier back from persistence.subscriptionsByClient() on a non-clean session restore (connect.jshandleSubscribe(..., restore=true)const si = sub.subscriptionIdentifier) and stores it in the in-memory Subscription, which is what echoes it on matching PUBLISHes. It never reads subscriptionIdentifier from subscriptionsByTopic(). So the only contract the feature depends on is subscriptionsByClient() round-tripping the field — which is exactly the effective change here (stored.set(sub.topic, record)). The field name matches what the consumer reads. ✅

Backends + cluster — "no backend changes needed" is correct (verified)

  • redismsgpack.encode(sub) whole object → decoded whole object. ✅
  • mongodb{ ...sub } stored → exclusion projection returns all other fields. ✅
  • levelmsgpack.encode(sub) → destructure drops only clientId. ✅
  • cluster — the subscribe broadcast carries the full subs (with the identifier) and the shared store holds it, so reconnect to a different node restores it via subscriptionsByClient(). ✅

One correction to the PR description's reasoning: none of the backends carry the identifier through subscriptionsByTopic() (the qlobber trie strips it — see the inline comment), but the feature works anyway because aedes echoes from in-memory state, not the topic-match path. The conclusion is right; the mechanism is slightly different than described.

Top risks

  1. Public type contract not updated (below).
  2. Inert/misleading trie.add spread (inline comment).
  3. Feature relies on backends serializing the whole sub in subscriptionsByClient() — true today for all three; the new conformance test is the right guard against a future backend regressing this.

Strengths

  • Minimal and surgical; only the in-memory store needed changing.
  • "Only when present" preserves the exact pre-v5 stored shape.
  • No aliasing bug (trie.add gets a fresh spread copy; nothing mutates stored sub objects).
  • Test added to the shared conformance suite, so every backend inherits the guard.

Minor (outside the diff — noted here per review convention)

types/index.d.ts:52-59 — The subscriptionsByClient return type is { topic: Topic; qos: QoS }[], which doesn't expose the new subscriptionIdentifier (it already omits rh/rap/nl too). The consumer reading the field is TypeScript, so the public contract should carry it as optional, matching "only when present." Runtime is correct and the pre-existing rh/rap/nl omission is the precedent, so this is hygiene rather than a blocker — but it's the most useful follow-up.

FYI

  • The new conformance test only protects the external backends once this is released and each bumps its aedes-persistence dependency — not immediately.
  • The in-memory store still won't round-trip the identifier across cluster nodes (its broadcast path populates only the #trie, never the #subscriptions map behind subscriptionsByClient()). Pre-existing limitation of the memory store in clusters; real clusters use redis/mongo, which work.

Coverage

Verified first-hand: aedes#1088 consumption, qlobber-sub.js internals (all 4 repos, v8.0.1 identical), redis/mongodb/level add+byClient+byTopic paths, broadcast/cluster path, type defs.

Comment thread asyncPersistence.js Outdated
rh: sub.rh,
rap: sub.rap,
nl: sub.nl
...record

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] · Correctness/Clarity

The ...record spread carries subscriptionIdentifier, but QlobberSub (qlobber 8.0.1) projects trie values to exactly { qos, rh, rap, nl } in _initial_value/_add_value and yields { clientId, topic, qos, rh, rap, nl } on match — so the identifier is unconditionally dropped from the trie. This part of the change is effectively dead code.

Why: It reads as if subscriptionsByTopic() now echoes the identifier — it doesn't, and can't, without a qlobber change. Future readers may rely on that false impression. (The feature works regardless: aedes#1088 echoes from the in-memory Subscription, restored via subscriptionsByClient(), not from the topic-match path.)

Fix: Either keep trie.add on the plain { qos, rh, rap, nl } shape, or add a one-line comment that the topic-match path intentionally omits the identifier. Leave the stored.set(..., record) line as-is — that one is correct and load-bearing.

…identifier

- Revert the trie.add value to its explicit { qos, rh, rap, nl } shape so it
  no longer reads as if subscriptionsByTopic() carries the Subscription
  Identifier. QlobberSub only retains those four fields, so the topic-match
  path cannot carry the identifier; a comment now states this. The
  load-bearing stored.set(topic, record) still persists it for
  subscriptionsByClient().
- Add subscriptionIdentifier?: number to the subscriptionsByClient return
  type so the public TypeScript contract exposes the field aedes reads back
  on a non-clean reconnect.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@BenjaminDobler

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review, @robertsLando! Both minor points addressed in 78b6ac7:

  1. trie.add shape (inline comment) — reverted trie.add back to the explicit { clientId, topic, qos, rh, rap, nl } shape so it no longer reads as if subscriptionsByTopic() carries the identifier (it can't — QlobberSub only retains those four fields). Added a comment noting the identifier lives on the per-client record (round-tripped by subscriptionsByClient, which is what aedes restores on a non-clean reconnect) and is deliberately not added to the trie. The load-bearing stored.set(sub.topic, record) is unchanged.

  2. Type contract (types/index.d.ts) — added subscriptionIdentifier?: number (optional, "only when present") to the subscriptionsByClient return type, matching the field aedes#1088 reads back.

No behavioral change, so the existing subscriptionsByClient conformance test still covers the round-trip. Full suite (lint + unit + tsd) green.

@BenjaminDobler

Copy link
Copy Markdown
Contributor Author

Friendly nudge: ready for review whenever convenient, @robertsLando. Companion to the MQTT 5.0 work in moscajs/aedes#1088 — persisting subscriptionIdentifier so it survives a non-clean session reconnect. Together with aedes-packet#192, this gates the eventual mqttv5 → main release (the broker pins both to fork SHAs in the interim and a release preflight blocks publishing until they're on published ranges). Thanks!

Comment thread abstract.js

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other persistences depends on this abstract tests so ther problem in merging this is that other persistences may have failing tests then, we coul dconsider creating a mqttv5 branch for persistence too and release it under a specific tag or as beta in the meanwhile or use the sha in pakcage.json like we are doing now

@BenjaminDobler

Copy link
Copy Markdown
Contributor Author

Good catch on the shared abstract.js — agreed it shouldn't land on main while the downstream backends (redis/mongo/level) don't yet implement subscriptionIdentifier, or their conformance runs would go red. Keeping the SHA pin as we're doing now works fine for the aedes mqttv5 integration branch (aedes pins this PR's commit directly), so there's no rush to merge here. A persistence mqttv5 branch + a beta/tag also works if you'd rather have a resolvable version to point at — whichever you prefer. Either way this stays decoupled from aedes landing on its own mqttv5.

@robertsLando robertsLando merged commit 7eef04c into moscajs:main Jun 29, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants