Skip to content

Fix per-reconnect task leak in client run loop#98

Open
Kyzgor wants to merge 1 commit into
permitio:masterfrom
Kyzgor:fix/reconnect-task-leak
Open

Fix per-reconnect task leak in client run loop#98
Kyzgor wants to merge 1 commit into
permitio:masterfrom
Kyzgor:fix/reconnect-task-leak

Conversation

@Kyzgor

@Kyzgor Kyzgor commented Jun 8, 2026

Copy link
Copy Markdown

Summary

Fixes a per-reconnect task leak in PubSubClient.run(). Over a long-lived client that
reconnects many times (for example an OPAL client behind a server that periodically restarts
workers), one pending asyncio task accumulated on every reconnect, growing unboundedly.

The bug

In the reconnect loop the client races the reader task against the disconnect signal:

wait_on_reader_task = client.wait_on_reader()
for task in asyncio.as_completed([wait_on_reader_task, self._disconnect_signal.wait()]):
    await task
    break

as_completed wraps both awaitables in internal tasks. Awaiting the first to finish and then
breaking abandons the generator without cancelling the other future, so the losing future
(self._disconnect_signal.wait() on a normal reconnect) is left pending and stays registered on
the Event's waiter list, along with the as_completed machinery. One leaks per reconnect.

I found this while investigating an OPAL client memory growth report (permitio/opal#844,
permitio/opal#770). It is small (KB-scale, object-count growth) but genuinely unbounded over time.

The fix

Use asyncio.wait(..., return_when=FIRST_COMPLETED) and explicitly cancel the future that did
not complete. When the reader task is the one that completed (the normal server-close or
websocket-error path that drives a reconnect), it is awaited so its exception still propagates,
preserving the existing reconnect-on-error behaviour. When an explicit disconnect() wins the
race instead, the still-pending reader is cancelled and its cancellation swallowed, which is the
intended shutdown path:

wait_on_reader_task = asyncio.ensure_future(client.wait_on_reader())
disconnect_wait_task = asyncio.ensure_future(self._disconnect_signal.wait())
done, pending = await asyncio.wait(
    [wait_on_reader_task, disconnect_wait_task], return_when=asyncio.FIRST_COMPLETED
)
for pending_task in pending:
    pending_task.cancel()
    try:
        await pending_task
    except (asyncio.CancelledError, Exception):
        pass
if wait_on_reader_task in done:
    await wait_on_reader_task  # surface reader/websocket exceptions as before

Tests

  • New tests/reconnect_memory_leak_test.py drives repeated reconnects against a server that
    closes on every connect and asserts the disconnect-signal waiter count stays bounded. It fails
    on master (waiters grow about one per reconnect) and passes with this change.
  • Existing tests/reconnect_test.py and tests/basic_test.py pass (17 invocations: three
    reconnect tests each run 4 times via the parametrized delayed_death_server fixture, plus
    test_immediate_server_disconnect and 4 basic tests). The reconnect, graceful-close, and
    disconnect-callback behaviour is unchanged.

Notes to reviewers

  • Behaviour is intentionally preserved: on a graceful server close the reader task completes, its
    exception (if any) is surfaced via await wait_on_reader_task, and the loop reconnects; on
    disconnect() the disconnect signal wins and the reader task is cancelled.
  • The regression test reads client._disconnect_signal._waiters (the Event's internal waiter
    deque). It is a private attribute, stable on CPython 3.8+. Happy to switch to a less private
    signal if you would prefer.

The reconnect loop raced the reader task against `self._disconnect_signal.wait()`
using `asyncio.as_completed([...])` followed by `break` after the first completes.
Breaking out of the generator left the other future pending, so on every reconnect
one task (and the as_completed machinery) leaked, accumulating unboundedly over a
long-lived client's reconnects.

Replace it with `asyncio.wait(..., return_when=FIRST_COMPLETED)` and cancel the
future that did not complete, while still awaiting the reader task so that
websocket-layer exceptions still propagate (preserving the reconnect-on-error
behaviour). Adds a regression test asserting the disconnect-signal waiter count
stays bounded across many reconnects.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant