feat: add per-job stop capability to serverless worker#510
Conversation
|
Promptless prepared a documentation update related to this change. Triggered by PR #510 Added documentation for the per-job stop capability to the main docs site. The update explains that workers handling multiple jobs concurrently can now stop individual jobs without affecting siblings, and includes guidance on catching Review: Document per-job stop capability for concurrent workers |
There was a problem hiding this comment.
Pull request overview
Adds per-job cancellation support to the serverless worker so a single in-flight job can be stopped without terminating the whole worker (and other concurrent jobs). This introduces a new stop-signal long-poll channel and tracks running job tasks by job id so they can be cancelled individually.
Changes:
- Track in-progress jobs as
asyncio.Tasks keyed by job id and addstop_job()+monitor_stop_signals()to cancel individual jobs. - Add a new
get_stop_signals()job module helper that long-polls a derivedjob-stopendpoint. - Add tests covering per-job stop behavior and stop-signal polling, plus documentation describing cancellation behavior for handlers.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
runpod/serverless/modules/rp_scale.py |
Track per-job tasks, long-poll stop signals, and cancel individual jobs. |
runpod/serverless/modules/rp_job.py |
Add stop-channel URL derivation and get_stop_signals() long-poll helper. |
tests/test_serverless/test_rp_scale.py |
Add integration-style async tests for stopping jobs and stop-signal monitoring. |
tests/test_serverless/test_modules/test_job.py |
Add unit tests for stop URL derivation and stop-signal parsing/behavior. |
docs/serverless/worker.md |
Document “Stopping Individual Jobs” and handler cleanup via asyncio.CancelledError. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| base_url = JOB_GET_URL.split("?")[0] | ||
| if "/job-take/" not in base_url: | ||
| return None | ||
| return base_url.replace("/job-take/", "/job-stop/") |
| STOP_TAKE_URL = "http://mock.url/v2/ep/job-take/pod?gpu=x" | ||
|
|
||
| def test_job_stop_url_derived_from_job_take(self): | ||
| with patch("runpod.serverless.modules.rp_job.JOB_GET_URL", self.STOP_TAKE_URL): | ||
| assert rp_job._job_stop_url() == "http://mock.url/v2/ep/job-stop/pod" | ||
|
|
| try: | ||
| job_ids = await self.stop_signals_fetcher(session) | ||
| for job_id in job_ids: | ||
| await self.stop_job(job_id) |
deanq
left a comment
There was a problem hiding this comment.
Automated review pass (per-job stop capability). 9 inline comments below — 2 critical (silent error logging on the stop loop; no backpressure on the poll's success path), 4 important (silent swallows in get_stop_signals, silent feature-disable on unset JOB_GET_URL, except-clause ordering, docs overpromising long-poll), and 3 suggestions/nits. Nothing blocking the design, which is sound.
| except asyncio.CancelledError: | ||
| raise | ||
| except Exception as error: | ||
| log.debug(f"JobScaler.monitor_stop_signals | Error: {error}.") |
There was a problem hiding this comment.
This catch-all logs at debug, which is suppressed at the default production log level. monitor_stop_signals is the only loop that stops billed jobs — if get_stop_signals starts failing persistently (DNS/TLS error, stop endpoint 5xx, malformed JOB_GET_URL, a bug in stop_job), every stop signal is silently dropped, cancelled jobs keep running, and customers keep getting billed while the worker looks perfectly healthy. The sibling get_jobs loop logs the same catch-all at error (with the error type), and it's no more billing-critical than this one.
Suggest matching that: log.error with type(error).__name__, and ideally a consecutive-failure counter/metric so a dead stop channel is alertable rather than invisible. The broad except is correct here (the loop must survive) — it just needs to be loud.
| """ | ||
| while self.is_alive(): | ||
| try: | ||
| job_ids = await self.stop_signals_fetcher(session) |
There was a problem hiding this comment.
This loop has no backpressure on its normal path. The await asyncio.sleep(0) in the finally is a cooperative yield (it lets other tasks run) — correct to have, but it adds no delay. So the polling cadence is governed entirely by how long self.stop_signals_fetcher(session) takes to return, and the design depends on the server holding that GET open (long-poll). get_stop_signals returns [] immediately on HTTP 204, and nothing here enforces hold-open behavior. If the stop channel returns quickly (server not yet long-polling, a buffering proxy, or a misconfigured endpoint), this becomes a 100%-CPU hot loop firing back-to-back GETs.
The error branches already add real backoff (429→5s, generic→1s), so the concern is only the success/empty path. Suggest a floor delay when no ids were returned, e.g. await asyncio.sleep(1) on the empty result, so the loop can't busy-spin if the server doesn't long-poll.
|
|
||
| response.raise_for_status() | ||
|
|
||
| if response.content_type != "application/json": |
There was a problem hiding this comment.
Four server-contract violations here all return [] with zero logging:
- non-JSON content-type on a 2xx (
:71) - JSON parse failure (
:76) - valid JSON but non-dict payload (
:79) - non-string ids dropped by the
isinstance(job_id, str)filter (:82)
Each one means a stop signal silently never fires, so cancelled jobs keep billing — and there's no trace to diagnose schema drift between server and worker. The guards themselves are good defensive code; they just need to be observable. Suggest a log.warn before each return (include the status/content-type/payload-type), and for the id filter, log when len(filtered) != len(raw) so a type change in jobsToStop surfaces instead of vanishing.
| A list of request ids to stop. Empty when the poll returned no signals. | ||
| """ | ||
| stop_url = _job_stop_url() | ||
| if not stop_url: |
There was a problem hiding this comment.
When _job_stop_url() returns None, the entire stop feature is silently disabled for the worker's whole lifetime. Note JOB_GET_URL is built as str(os.environ.get("RUNPOD_WEBHOOK_GET_JOB")).replace(...) — if that env var is unset it becomes the literal string "None", which has no /job-take/, so _job_stop_url() returns None and this branch makes the stop loop a permanent no-op that logs nothing. Every cancelled job on a misconfigured worker bills in full, invisibly.
This is exactly the "validate config at startup, fail fast/loud" case. Suggest validating once at worker startup (so it's not per-poll log spam) and emitting a single clear diagnostic when the stop URL can't be derived, rather than silently returning [] forever.
| log.error(f"Error handling job: {err}", job["id"]) | ||
| raise err | ||
|
|
||
| except asyncio.CancelledError: |
There was a problem hiding this comment.
This except asyncio.CancelledError is placed after except Exception. It works only because CancelledError derives from BaseException (not Exception) on Python 3.8+, so it isn't shadowed — but it reads as dead code, and it silently breaks the moment someone widens the first clause to BaseException or wraps the cancellation. It's also inconsistent with get_jobs, which orders CancelledError first with an explanatory comment.
Suggest reordering except asyncio.CancelledError above except Exception (and mirroring the # CancelledError is a BaseException comment from get_jobs). While here: raise err on the line above rebinds the traceback — prefer a bare raise to preserve the original origin.
|
|
||
| ## Stopping Individual Jobs | ||
|
|
||
| A worker can process more than one job concurrently. When a single request is cancelled, expires, or times out, the Runpod server signals the worker to stop just that request without affecting the worker's other in-progress jobs. The worker long-polls a dedicated stop channel so these signals arrive with low latency, and it cancels the task running the matching job, so a stopped job no longer consumes worker time. |
There was a problem hiding this comment.
"long-polls a dedicated stop channel so these signals arrive with low latency" describes server behavior as if the SDK guarantees it. The client only issues a single GET; whether the connection is held open vs. returns 204 immediately is entirely the server's contract, and monitor_stop_signals has no enforced floor delay (see the busy-spin comment), so if the server doesn't long-poll this becomes continuous polling rather than low-latency long-poll.
Suggest attributing the behavior to the server, e.g. "The worker continuously polls a dedicated stop channel; the server is expected to hold each request open (long-poll) until a stop signal is available or the poll times out." Same applies to the get_stop_signals docstring.
Separately at line 65: the cleanup advice should note that a handler catching asyncio.CancelledError must re-raise after cleanup (as the SDK's own handle_job does) — otherwise the stop is swallowed and the job is reported completed.
| jobstop_task = asyncio.create_task(self.monitor_stop_signals(session)) | ||
|
|
||
| tasks = [jobtake_task, jobrun_task] | ||
| tasks = [jobtake_task, jobrun_task, jobstop_task] |
There was a problem hiding this comment.
The comment two lines down still says "Concurrently run both tasks and wait for both to finish," but this list now holds three tasks (jobtake_task, jobrun_task, jobstop_task). Suggest updating it — or, to survive the next task addition, dropping the count entirely: "Run the worker's concurrent loops until shutdown."
|
|
||
| # Ensure all remaining tasks finish before stopping | ||
| await asyncio.gather(*tasks) | ||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
Switching to return_exceptions=True is the right call now that stopped tasks raise CancelledError during the final drain. The side effect is that genuine handler exceptions raised at shutdown are now collected into the (unused) result list and silently discarded. CancelledError from stopped jobs is fine to ignore here, but a real handler error shouldn't vanish. Consider inspecting the results and logging non-CancelledError exceptions:
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
log.error(f"run_jobs | Task failed during shutdown drain: {result}")|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_monitor_stop_signals_stops_jobs(job_scaler: PatchScaler): |
There was a problem hiding this comment.
Two test-coverage notes for this area:
-
This test monkeypatches both collaborators (
scaler.stop_jobandscaler.stop_signals_fetcher), so the only production code it exercises is the loop's id-forwarding — it's mocks talking to mocks, not integration. The real end-to-end wiring is already well covered bytest_stop_job_cancels_inflight_task; consider a variant here where the fetcher returns ids and the realstop_jobruns against a populatedjobs_tasks, asserting the underlying task is cancelled. -
Several new branches are untested:
get_stop_signals'raise_for_status()5xx path, the wrong-content-type branch, and the JSON parse (ContentTypeError/ValueError) catch; plusmonitor_stop_signals'TooManyRequests→sleep(5) branch and theCancelledError→re-raise. The siblingget_jobsuite already covers its 500/content-type cases — worth matching. For the backoff branches, patchasyncio.sleepso they're deterministic rather than wall-clock-dependent.
A serverless worker that takes more than one job concurrently had no way to stop processing an individual request once it started. The only available lever was killing the entire worker, which also terminates the other healthy in-progress jobs on that worker. This is the root cause behind cancelled requests continuing to run and incur charges when a worker is handling several jobs at once.
This gives the worker a notion of stopping a single request. The worker now tracks each in-progress job by id and can cancel just that job's task, leaving its siblings untouched. Stop signal arrives via a new job-stop long-polling channel similar to the job-take long polling endpoint.
Handlers need no changes; async handlers holding resources can clean up by catching
asyncio.CancelledError.relies on https://github.com/runpod/ai-api/pull/881
Closes SLS-41.