Fix per-reconnect task leak in client run loop#98
Open
Kyzgor wants to merge 1 commit into
Open
Conversation
The reconnect loop raced the reader task against `self._disconnect_signal.wait()` using `asyncio.as_completed([...])` followed by `break` after the first completes. Breaking out of the generator left the other future pending, so on every reconnect one task (and the as_completed machinery) leaked, accumulating unboundedly over a long-lived client's reconnects. Replace it with `asyncio.wait(..., return_when=FIRST_COMPLETED)` and cancel the future that did not complete, while still awaiting the reader task so that websocket-layer exceptions still propagate (preserving the reconnect-on-error behaviour). Adds a regression test asserting the disconnect-signal waiter count stays bounded across many reconnects.
7 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes a per-reconnect task leak in
PubSubClient.run(). Over a long-lived client thatreconnects many times (for example an OPAL client behind a server that periodically restarts
workers), one pending
asynciotask accumulated on every reconnect, growing unboundedly.The bug
In the reconnect loop the client races the reader task against the disconnect signal:
as_completedwraps both awaitables in internal tasks. Awaiting the first to finish and thenbreaking abandons the generator without cancelling the other future, so the losing future
(
self._disconnect_signal.wait()on a normal reconnect) is left pending and stays registered onthe Event's waiter list, along with the
as_completedmachinery. One leaks per reconnect.I found this while investigating an OPAL client memory growth report (permitio/opal#844,
permitio/opal#770). It is small (KB-scale, object-count growth) but genuinely unbounded over time.
The fix
Use
asyncio.wait(..., return_when=FIRST_COMPLETED)and explicitly cancel the future that didnot complete. When the reader task is the one that completed (the normal server-close or
websocket-error path that drives a reconnect), it is awaited so its exception still propagates,
preserving the existing reconnect-on-error behaviour. When an explicit
disconnect()wins therace instead, the still-pending reader is cancelled and its cancellation swallowed, which is the
intended shutdown path:
Tests
tests/reconnect_memory_leak_test.pydrives repeated reconnects against a server thatcloses on every connect and asserts the disconnect-signal waiter count stays bounded. It fails
on
master(waiters grow about one per reconnect) and passes with this change.tests/reconnect_test.pyandtests/basic_test.pypass (17 invocations: threereconnect tests each run 4 times via the parametrized
delayed_death_serverfixture, plustest_immediate_server_disconnectand 4 basic tests). The reconnect, graceful-close, anddisconnect-callback behaviour is unchanged.
Notes to reviewers
exception (if any) is surfaced via
await wait_on_reader_task, and the loop reconnects; ondisconnect()the disconnect signal wins and the reader task is cancelled.client._disconnect_signal._waiters(the Event's internal waiterdeque). It is a private attribute, stable on CPython 3.8+. Happy to switch to a less private
signal if you would prefer.