Skip to content

Commit e6c49ce

Browse files
committed
feat: cancel in-flight subagent tasks by id
Closes the last gap in the "subagent task as task dashboard" story: a parent session can now interrupt a delegated child run without canceling the whole parent run. Core - `SubagentStatus` gains a `Cancelled` variant. Late `SubagentEnd` events from the cancelled child do not downgrade it back to `Failed`. - `InMemorySubagentTaskTracker` now also stores `CancellationToken`s per task. `register_canceller` / `clear_canceller` / `cancel(id)` bracket the in-flight token lifetime; `cancel` fires the token and flips the snapshot status atomically. - `TaskExecutor` gains `with_subagent_tracker(...)`. When set, each task registers its token, then runs the child loop through `AgentLoop::execute_with_session(... Some(&token))` so the cancellation propagates into LLM streaming and tool execution. - `register_task_with_mcp` grows an optional tracker parameter so the session bootstrap path can share a single Arc with the executor and the live `AgentSession`. - `AgentSession::cancel_subagent_task(task_id)` exposes the operation to callers. SDKs - Node: `Session.cancelSubagentTask(taskId): Promise<boolean>` via the same get_runtime().spawn pattern used by other run-control methods. - Python: `Session.cancel_subagent_task(task_id) -> bool` via the py.allow_threads / tokio block_on pattern. Tests - Tracker-level unit tests for the four interesting cases: cancel fires the token + flips status, cancel returns False on unknown ids, late SubagentEnd doesn't downgrade Cancelled, and clear_canceller disarms future cancel calls. - Integration test in agent_api/tests.rs drives a synthetic subagent lifecycle through `RuntimeEventSink`, registers a canceller, and asserts the public `cancel_subagent_task` API + the Cancelled terminal state survive a late SubagentEnd. - Node + Python smoke tests assert cancelling an unknown task id resolves to false / False.
1 parent 9691e77 commit e6c49ce

12 files changed

Lines changed: 272 additions & 8 deletions

File tree

core/src/agent_api.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,15 @@ impl AgentSession {
594594
.collect()
595595
}
596596

597+
/// Cancel an in-flight delegated subagent task by id. Returns `true`
598+
/// when a cancellation token was found and fired, `false` when the
599+
/// task id is unknown or the task has already finished. The eventual
600+
/// `SubagentEnd` from the cancelled child loop won't downgrade the
601+
/// terminal status — it stays `Cancelled`.
602+
pub async fn cancel_subagent_task(&self, task_id: &str) -> bool {
603+
self.subagent_tasks.cancel(task_id).await
604+
}
605+
597606
/// Return a snapshot of the session's conversation history.
598607
pub fn history(&self) -> Vec<Message> {
599608
SessionView::from_session(self).history()

core/src/agent_api/capabilities.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub(super) struct SessionCapabilities {
3535
pub(super) context_providers: Vec<Arc<dyn ContextProvider>>,
3636
pub(super) skill_registry: Arc<SkillRegistry>,
3737
pub(super) agent_registry: Arc<AgentRegistry>,
38+
pub(super) subagent_tasks: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
3839
}
3940

4041
pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> SessionCapabilities {
@@ -60,12 +61,14 @@ pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> S
6061
.set_search_config(search_config.clone());
6162
}
6263

64+
let subagent_tasks = Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new());
6365
let agent_registry = register_task_capability(
6466
input.code_config,
6567
input.opts,
6668
input.workspace,
6769
Arc::clone(&input.llm_client),
6870
&tool_executor,
71+
Arc::clone(&subagent_tasks),
6972
);
7073

7174
// Register generate_object tool (structured JSON output)
@@ -90,6 +93,7 @@ pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> S
9093
context_providers,
9194
skill_registry,
9295
agent_registry,
96+
subagent_tasks,
9397
}
9498
}
9599

@@ -136,6 +140,7 @@ fn register_task_capability(
136140
workspace: &Path,
137141
llm_client: Arc<dyn LlmClient>,
138142
tool_executor: &Arc<ToolExecutor>,
143+
subagent_tasks: Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>,
139144
) -> Arc<AgentRegistry> {
140145
use crate::child_run::ChildRunContext;
141146
use crate::subagent::load_agents_from_dir;
@@ -177,6 +182,7 @@ fn register_task_capability(
177182
workspace.display().to_string(),
178183
opts.mcp_manager.clone(),
179184
Some(parent_context),
185+
Some(subagent_tasks),
180186
);
181187
registry
182188
}

core/src/agent_api/session_builder.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub(super) fn build_agent_session(
103103
let tool_defs = capabilities.tool_defs;
104104
let context_providers = capabilities.context_providers;
105105
let effective_registry = capabilities.skill_registry;
106+
let subagent_tasks = capabilities.subagent_tasks;
106107

107108
let prompt_slots = opts
108109
.prompt_slots
@@ -219,7 +220,7 @@ pub(super) fn build_agent_session(
219220
cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
220221
current_run_id: Arc::new(tokio::sync::Mutex::new(None)),
221222
run_store: Arc::new(crate::run::InMemoryRunStore::new()),
222-
subagent_tasks: Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()),
223+
subagent_tasks,
223224
active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
224225
trace_sink,
225226
verification_reports: Arc::new(RwLock::new(Vec::new())),

core/src/agent_api/tests.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2468,3 +2468,59 @@ async fn subagent_tasks_scope_to_parent_session() {
24682468
assert!(session_b.subagent_tasks().await.is_empty());
24692469
assert!(session_b.subagent_task("task-from-a").await.is_none());
24702470
}
2471+
2472+
#[tokio::test]
2473+
async fn cancel_subagent_task_marks_snapshot_cancelled() {
2474+
use super::runtime_events::RuntimeEventSink;
2475+
use crate::agent::AgentEvent;
2476+
use crate::subagent_task_tracker::SubagentStatus;
2477+
use tokio_util::sync::CancellationToken;
2478+
2479+
let agent = Agent::from_config(test_config()).await.unwrap();
2480+
let session = agent.session("/tmp/test-ws-subagent-cancel", None).unwrap();
2481+
let run = session
2482+
.run_store
2483+
.create_run(session.session_id(), "parent")
2484+
.await;
2485+
let sink = RuntimeEventSink::from_session(&session, &run.id);
2486+
2487+
let task_id = "task-to-cancel".to_string();
2488+
sink.observe(&AgentEvent::SubagentStart {
2489+
task_id: task_id.clone(),
2490+
session_id: format!("task-run-{}", task_id),
2491+
parent_session_id: session.session_id().to_string(),
2492+
agent: "explore".to_string(),
2493+
description: "long task".to_string(),
2494+
})
2495+
.await;
2496+
2497+
// Simulate what TaskExecutor would do: register a cancellation token
2498+
// for this in-flight task so the public API has something to fire.
2499+
let token = CancellationToken::new();
2500+
session
2501+
.subagent_tasks
2502+
.register_canceller(&task_id, token.clone())
2503+
.await;
2504+
2505+
assert!(session.cancel_subagent_task(&task_id).await);
2506+
assert!(token.is_cancelled());
2507+
2508+
let snap = session.subagent_task(&task_id).await.unwrap();
2509+
assert_eq!(snap.status, SubagentStatus::Cancelled);
2510+
2511+
// A late SubagentEnd from the cancelled child must not downgrade.
2512+
sink.observe(&AgentEvent::SubagentEnd {
2513+
task_id: task_id.clone(),
2514+
session_id: format!("task-run-{}", task_id),
2515+
agent: "explore".to_string(),
2516+
output: "Task cancelled by caller".to_string(),
2517+
success: false,
2518+
})
2519+
.await;
2520+
let snap = session.subagent_task(&task_id).await.unwrap();
2521+
assert_eq!(snap.status, SubagentStatus::Cancelled);
2522+
2523+
// Cancelling again or against an unknown id is a no-op.
2524+
assert!(!session.cancel_subagent_task(&task_id).await);
2525+
assert!(!session.cancel_subagent_task("task-unknown").await);
2526+
}

core/src/subagent_task_tracker.rs

Lines changed: 111 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ use crate::agent::AgentEvent;
99
use serde::{Deserialize, Serialize};
1010
use std::collections::HashMap;
1111
use tokio::sync::RwLock;
12+
use tokio_util::sync::CancellationToken;
1213

1314
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1415
#[serde(rename_all = "snake_case")]
1516
pub enum SubagentStatus {
1617
Running,
1718
Completed,
1819
Failed,
20+
Cancelled,
1921
}
2022

2123
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -47,13 +49,52 @@ pub struct SubagentTaskSnapshot {
4749
#[derive(Debug, Default)]
4850
pub struct InMemorySubagentTaskTracker {
4951
tasks: RwLock<HashMap<String, SubagentTaskSnapshot>>,
52+
cancellers: RwLock<HashMap<String, CancellationToken>>,
5053
}
5154

5255
impl InMemorySubagentTaskTracker {
5356
pub fn new() -> Self {
5457
Self::default()
5558
}
5659

60+
/// Register a `CancellationToken` for a running task so callers can
61+
/// trigger cancellation through `cancel(task_id)`. The task executor
62+
/// is expected to remove the entry on exit via `clear_canceller`.
63+
pub async fn register_canceller(&self, task_id: &str, token: CancellationToken) {
64+
self.cancellers
65+
.write()
66+
.await
67+
.insert(task_id.to_string(), token);
68+
}
69+
70+
pub async fn clear_canceller(&self, task_id: &str) {
71+
self.cancellers.write().await.remove(task_id);
72+
}
73+
74+
/// Fire the registered token and mark the snapshot as `Cancelled`.
75+
/// Returns `true` if a token was found (caller can interpret as
76+
/// "cancellation initiated"), `false` if the task id was unknown or
77+
/// the task already finished. The eventual `SubagentEnd` event won't
78+
/// overwrite the Cancelled status — see `record_event`.
79+
pub async fn cancel(&self, task_id: &str) -> bool {
80+
let token = self.cancellers.write().await.remove(task_id);
81+
match token {
82+
Some(token) => {
83+
token.cancel();
84+
let now = now_ms();
85+
let mut tasks = self.tasks.write().await;
86+
if let Some(entry) = tasks.get_mut(task_id) {
87+
if entry.status == SubagentStatus::Running {
88+
entry.status = SubagentStatus::Cancelled;
89+
entry.updated_ms = now;
90+
}
91+
}
92+
true
93+
}
94+
None => false,
95+
}
96+
}
97+
5798
/// Apply a single agent event to the tracker. Non-subagent events are ignored.
5899
pub async fn record_event(&self, event: &AgentEvent) {
59100
match event {
@@ -148,11 +189,16 @@ impl InMemorySubagentTaskTracker {
148189
success: None,
149190
progress: Vec::new(),
150191
});
151-
entry.status = if *success {
152-
SubagentStatus::Completed
153-
} else {
154-
SubagentStatus::Failed
155-
};
192+
// Preserve a pre-set Cancelled status (set by `cancel()`)
193+
// — a late SubagentEnd from the cancelled child loop is
194+
// expected and must not downgrade the terminal state.
195+
if entry.status != SubagentStatus::Cancelled {
196+
entry.status = if *success {
197+
SubagentStatus::Completed
198+
} else {
199+
SubagentStatus::Failed
200+
};
201+
}
156202
entry.updated_ms = now;
157203
entry.finished_ms = Some(now);
158204
entry.output = Some(output.clone());
@@ -333,4 +379,64 @@ mod tests {
333379
.await;
334380
assert!(tracker.list().await.is_empty());
335381
}
382+
383+
#[tokio::test]
384+
async fn cancel_fires_token_and_marks_snapshot_cancelled() {
385+
let tracker = InMemorySubagentTaskTracker::new();
386+
tracker
387+
.record_event(&start_event("task-c", "parent", "child"))
388+
.await;
389+
390+
let token = CancellationToken::new();
391+
tracker.register_canceller("task-c", token.clone()).await;
392+
assert!(!token.is_cancelled());
393+
394+
let fired = tracker.cancel("task-c").await;
395+
assert!(fired, "cancel should report success");
396+
assert!(token.is_cancelled(), "registered token should be triggered");
397+
398+
let snap = tracker.get("task-c").await.unwrap();
399+
assert_eq!(snap.status, SubagentStatus::Cancelled);
400+
}
401+
402+
#[tokio::test]
403+
async fn cancel_returns_false_for_unknown_task() {
404+
let tracker = InMemorySubagentTaskTracker::new();
405+
assert!(!tracker.cancel("task-does-not-exist").await);
406+
}
407+
408+
#[tokio::test]
409+
async fn late_subagent_end_does_not_downgrade_cancelled_status() {
410+
let tracker = InMemorySubagentTaskTracker::new();
411+
tracker
412+
.record_event(&start_event("task-d", "parent", "child"))
413+
.await;
414+
let token = CancellationToken::new();
415+
tracker.register_canceller("task-d", token).await;
416+
assert!(tracker.cancel("task-d").await);
417+
418+
// The cancelled child loop will still emit a (likely failed)
419+
// SubagentEnd. The terminal status should remain Cancelled.
420+
tracker
421+
.record_event(&end_event("task-d", "child", false))
422+
.await;
423+
let snap = tracker.get("task-d").await.unwrap();
424+
assert_eq!(snap.status, SubagentStatus::Cancelled);
425+
assert!(snap.finished_ms.is_some());
426+
assert_eq!(snap.success, Some(false));
427+
}
428+
429+
#[tokio::test]
430+
async fn clear_canceller_disarms_future_cancel_calls() {
431+
let tracker = InMemorySubagentTaskTracker::new();
432+
tracker
433+
.record_event(&start_event("task-e", "parent", "child"))
434+
.await;
435+
let token = CancellationToken::new();
436+
tracker.register_canceller("task-e", token.clone()).await;
437+
tracker.clear_canceller("task-e").await;
438+
439+
assert!(!tracker.cancel("task-e").await);
440+
assert!(!token.is_cancelled());
441+
}
336442
}

core/src/tools/builtin/mod.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,32 @@ pub fn register_task(
9393
agent_registry: Arc<crate::subagent::AgentRegistry>,
9494
workspace: String,
9595
) {
96-
register_task_with_mcp(registry, llm_client, agent_registry, workspace, None, None);
96+
register_task_with_mcp(
97+
registry,
98+
llm_client,
99+
agent_registry,
100+
workspace,
101+
None,
102+
None,
103+
None,
104+
);
97105
}
98106

99107
/// Register the task delegation tools with optional MCP manager and parent context.
100108
///
101109
/// When `mcp_manager` is provided, delegated child sessions will have access
102110
/// to all MCP tools from connected servers.
103111
/// When `parent_context` is provided, child runs inherit parent capabilities.
112+
/// When `subagent_tracker` is provided, each task registers a
113+
/// `CancellationToken` against it so callers can cancel by `task_id`.
104114
pub fn register_task_with_mcp(
105115
registry: &Arc<ToolRegistry>,
106116
llm_client: Arc<dyn crate::llm::LlmClient>,
107117
agent_registry: Arc<crate::subagent::AgentRegistry>,
108118
workspace: String,
109119
mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
110120
parent_context: Option<crate::child_run::ChildRunContext>,
121+
subagent_tracker: Option<Arc<crate::subagent_task_tracker::InMemorySubagentTaskTracker>>,
111122
) {
112123
use crate::tools::task::{ParallelTaskTool, TaskExecutor, TaskTool};
113124
let mut executor = match mcp_manager {
@@ -117,6 +128,9 @@ pub fn register_task_with_mcp(
117128
if let Some(ctx) = parent_context {
118129
executor = executor.with_parent_context(ctx);
119130
}
131+
if let Some(tracker) = subagent_tracker {
132+
executor = executor.with_subagent_tracker(tracker);
133+
}
120134
let executor = Arc::new(executor);
121135
registry.register_builtin(Arc::new(TaskTool::new(Arc::clone(&executor))));
122136
registry.register_builtin(Arc::new(ParallelTaskTool::new(Arc::clone(&executor))));

0 commit comments

Comments
 (0)