Skip to content

feat: add per-job stop capability to serverless worker#510

Open
KAJdev wants to merge 2 commits into
mainfrom
zeke/sls-41-add-stop-job-capability-to-runpod-python-sdk
Open

feat: add per-job stop capability to serverless worker#510
KAJdev wants to merge 2 commits into
mainfrom
zeke/sls-41-add-stop-job-capability-to-runpod-python-sdk

Conversation

@KAJdev

@KAJdev KAJdev commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

A serverless worker that takes more than one job concurrently had no way to stop processing an individual request once it started. The only available lever was killing the entire worker, which also terminates the other healthy in-progress jobs on that worker. This is the root cause behind cancelled requests continuing to run and incur charges when a worker is handling several jobs at once.

This gives the worker a notion of stopping a single request. The worker now tracks each in-progress job by id and can cancel just that job's task, leaving its siblings untouched. Stop signal arrives via a new job-stop long-polling channel similar to the job-take long polling endpoint.

Handlers need no changes; async handlers holding resources can clean up by catching asyncio.CancelledError.

relies on https://github.com/runpod/ai-api/pull/881

Closes SLS-41.

@promptless

promptless Bot commented Jun 5, 2026

Copy link
Copy Markdown

Promptless prepared a documentation update related to this change.

Triggered by PR #510

Added documentation for the per-job stop capability to the main docs site. The update explains that workers handling multiple jobs concurrently can now stop individual jobs without affecting siblings, and includes guidance on catching asyncio.CancelledError for resource cleanup in async handlers.

Review: Document per-job stop capability for concurrent workers

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds per-job cancellation support to the serverless worker so a single in-flight job can be stopped without terminating the whole worker (and other concurrent jobs). This introduces a new stop-signal long-poll channel and tracks running job tasks by job id so they can be cancelled individually.

Changes:

  • Track in-progress jobs as asyncio.Tasks keyed by job id and add stop_job() + monitor_stop_signals() to cancel individual jobs.
  • Add a new get_stop_signals() job module helper that long-polls a derived job-stop endpoint.
  • Add tests covering per-job stop behavior and stop-signal polling, plus documentation describing cancellation behavior for handlers.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
runpod/serverless/modules/rp_scale.py Track per-job tasks, long-poll stop signals, and cancel individual jobs.
runpod/serverless/modules/rp_job.py Add stop-channel URL derivation and get_stop_signals() long-poll helper.
tests/test_serverless/test_rp_scale.py Add integration-style async tests for stopping jobs and stop-signal monitoring.
tests/test_serverless/test_modules/test_job.py Add unit tests for stop URL derivation and stop-signal parsing/behavior.
docs/serverless/worker.md Document “Stopping Individual Jobs” and handler cleanup via asyncio.CancelledError.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +36 to +39
base_url = JOB_GET_URL.split("?")[0]
if "/job-take/" not in base_url:
return None
return base_url.replace("/job-take/", "/job-stop/")
Comment on lines +156 to +161
STOP_TAKE_URL = "http://mock.url/v2/ep/job-take/pod?gpu=x"

def test_job_stop_url_derived_from_job_take(self):
with patch("runpod.serverless.modules.rp_job.JOB_GET_URL", self.STOP_TAKE_URL):
assert rp_job._job_stop_url() == "http://mock.url/v2/ep/job-stop/pod"

Comment on lines +276 to +279
try:
job_ids = await self.stop_signals_fetcher(session)
for job_id in job_ids:
await self.stop_job(job_id)

@deanq deanq left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review pass (per-job stop capability). 9 inline comments below — 2 critical (silent error logging on the stop loop; no backpressure on the poll's success path), 4 important (silent swallows in get_stop_signals, silent feature-disable on unset JOB_GET_URL, except-clause ordering, docs overpromising long-poll), and 3 suggestions/nits. Nothing blocking the design, which is sound.

except asyncio.CancelledError:
raise
except Exception as error:
log.debug(f"JobScaler.monitor_stop_signals | Error: {error}.")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch-all logs at debug, which is suppressed at the default production log level. monitor_stop_signals is the only loop that stops billed jobs — if get_stop_signals starts failing persistently (DNS/TLS error, stop endpoint 5xx, malformed JOB_GET_URL, a bug in stop_job), every stop signal is silently dropped, cancelled jobs keep running, and customers keep getting billed while the worker looks perfectly healthy. The sibling get_jobs loop logs the same catch-all at error (with the error type), and it's no more billing-critical than this one.

Suggest matching that: log.error with type(error).__name__, and ideally a consecutive-failure counter/metric so a dead stop channel is alertable rather than invisible. The broad except is correct here (the loop must survive) — it just needs to be loud.

"""
while self.is_alive():
try:
job_ids = await self.stop_signals_fetcher(session)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop has no backpressure on its normal path. The await asyncio.sleep(0) in the finally is a cooperative yield (it lets other tasks run) — correct to have, but it adds no delay. So the polling cadence is governed entirely by how long self.stop_signals_fetcher(session) takes to return, and the design depends on the server holding that GET open (long-poll). get_stop_signals returns [] immediately on HTTP 204, and nothing here enforces hold-open behavior. If the stop channel returns quickly (server not yet long-polling, a buffering proxy, or a misconfigured endpoint), this becomes a 100%-CPU hot loop firing back-to-back GETs.

The error branches already add real backoff (429→5s, generic→1s), so the concern is only the success/empty path. Suggest a floor delay when no ids were returned, e.g. await asyncio.sleep(1) on the empty result, so the loop can't busy-spin if the server doesn't long-poll.


response.raise_for_status()

if response.content_type != "application/json":

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Four server-contract violations here all return [] with zero logging:

  • non-JSON content-type on a 2xx (:71)
  • JSON parse failure (:76)
  • valid JSON but non-dict payload (:79)
  • non-string ids dropped by the isinstance(job_id, str) filter (:82)

Each one means a stop signal silently never fires, so cancelled jobs keep billing — and there's no trace to diagnose schema drift between server and worker. The guards themselves are good defensive code; they just need to be observable. Suggest a log.warn before each return (include the status/content-type/payload-type), and for the id filter, log when len(filtered) != len(raw) so a type change in jobsToStop surfaces instead of vanishing.

A list of request ids to stop. Empty when the poll returned no signals.
"""
stop_url = _job_stop_url()
if not stop_url:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When _job_stop_url() returns None, the entire stop feature is silently disabled for the worker's whole lifetime. Note JOB_GET_URL is built as str(os.environ.get("RUNPOD_WEBHOOK_GET_JOB")).replace(...) — if that env var is unset it becomes the literal string "None", which has no /job-take/, so _job_stop_url() returns None and this branch makes the stop loop a permanent no-op that logs nothing. Every cancelled job on a misconfigured worker bills in full, invisibly.

This is exactly the "validate config at startup, fail fast/loud" case. Suggest validating once at worker startup (so it's not per-poll log spam) and emitting a single clear diagnostic when the stop URL can't be derived, rather than silently returning [] forever.

log.error(f"Error handling job: {err}", job["id"])
raise err

except asyncio.CancelledError:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This except asyncio.CancelledError is placed after except Exception. It works only because CancelledError derives from BaseException (not Exception) on Python 3.8+, so it isn't shadowed — but it reads as dead code, and it silently breaks the moment someone widens the first clause to BaseException or wraps the cancellation. It's also inconsistent with get_jobs, which orders CancelledError first with an explanatory comment.

Suggest reordering except asyncio.CancelledError above except Exception (and mirroring the # CancelledError is a BaseException comment from get_jobs). While here: raise err on the line above rebinds the traceback — prefer a bare raise to preserve the original origin.

Comment thread docs/serverless/worker.md

## Stopping Individual Jobs

A worker can process more than one job concurrently. When a single request is cancelled, expires, or times out, the Runpod server signals the worker to stop just that request without affecting the worker's other in-progress jobs. The worker long-polls a dedicated stop channel so these signals arrive with low latency, and it cancels the task running the matching job, so a stopped job no longer consumes worker time.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"long-polls a dedicated stop channel so these signals arrive with low latency" describes server behavior as if the SDK guarantees it. The client only issues a single GET; whether the connection is held open vs. returns 204 immediately is entirely the server's contract, and monitor_stop_signals has no enforced floor delay (see the busy-spin comment), so if the server doesn't long-poll this becomes continuous polling rather than low-latency long-poll.

Suggest attributing the behavior to the server, e.g. "The worker continuously polls a dedicated stop channel; the server is expected to hold each request open (long-poll) until a stop signal is available or the poll times out." Same applies to the get_stop_signals docstring.

Separately at line 65: the cleanup advice should note that a handler catching asyncio.CancelledError must re-raise after cleanup (as the SDK's own handle_job does) — otherwise the stop is swallowed and the job is reported completed.

jobstop_task = asyncio.create_task(self.monitor_stop_signals(session))

tasks = [jobtake_task, jobrun_task]
tasks = [jobtake_task, jobrun_task, jobstop_task]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment two lines down still says "Concurrently run both tasks and wait for both to finish," but this list now holds three tasks (jobtake_task, jobrun_task, jobstop_task). Suggest updating it — or, to survive the next task addition, dropping the count entirely: "Run the worker's concurrent loops until shutdown."


# Ensure all remaining tasks finish before stopping
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching to return_exceptions=True is the right call now that stopped tasks raise CancelledError during the final drain. The side effect is that genuine handler exceptions raised at shutdown are now collected into the (unused) result list and silently discarded. CancelledError from stopped jobs is fine to ignore here, but a real handler error shouldn't vanish. Consider inspecting the results and logging non-CancelledError exceptions:

results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
    if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
        log.error(f"run_jobs | Task failed during shutdown drain: {result}")



@pytest.mark.asyncio
async def test_monitor_stop_signals_stops_jobs(job_scaler: PatchScaler):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two test-coverage notes for this area:

  1. This test monkeypatches both collaborators (scaler.stop_job and scaler.stop_signals_fetcher), so the only production code it exercises is the loop's id-forwarding — it's mocks talking to mocks, not integration. The real end-to-end wiring is already well covered by test_stop_job_cancels_inflight_task; consider a variant here where the fetcher returns ids and the real stop_job runs against a populated jobs_tasks, asserting the underlying task is cancelled.

  2. Several new branches are untested: get_stop_signals' raise_for_status() 5xx path, the wrong-content-type branch, and the JSON parse (ContentTypeError/ValueError) catch; plus monitor_stop_signals' TooManyRequests→sleep(5) branch and the CancelledError→re-raise. The sibling get_job suite already covers its 500/content-type cases — worth matching. For the backoff branches, patch asyncio.sleep so they're deterministic rather than wall-clock-dependent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants