diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index c63103161f6c..a3cb301225e3 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -22,7 +22,6 @@ use codex_app_server_protocol::CommandExecutionSource; use codex_app_server_protocol::CommandExecutionStatus; use codex_app_server_protocol::DeprecationNoticeNotification; use codex_app_server_protocol::DynamicToolCallParams; -use codex_app_server_protocol::DynamicToolCallStatus; use codex_app_server_protocol::ErrorNotification; use codex_app_server_protocol::ExecPolicyAmendment as V2ExecPolicyAmendment; use codex_app_server_protocol::FileChangeApprovalDecision; @@ -90,6 +89,8 @@ use codex_core::ThreadManager; use codex_core::review_format::format_review_findings_block; use codex_core::review_prompts; use codex_protocol::ThreadId; +use codex_protocol::items::CollabAgentTool as CoreCollabAgentTool; +use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::items::parse_hook_prompt_message; use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile; use codex_protocol::plan_tool::UpdatePlanArgs; @@ -822,51 +823,8 @@ pub(crate) async fn apply_bespoke_event_handling( on_request_permissions_response(pending_response, conversation, thread_state).await; }); } - EventMsg::DynamicToolCallRequest(request) => { - let call_id = request.call_id; - let turn_id = request.turn_id; - let namespace = request.namespace; - let tool = request.tool; - let arguments = request.arguments; - let item = ThreadItem::DynamicToolCall { - id: call_id.clone(), - namespace: namespace.clone(), - tool: tool.clone(), - arguments: arguments.clone(), - status: DynamicToolCallStatus::InProgress, - content_items: None, - success: None, - duration_ms: None, - }; - let notification = ItemStartedNotification { - thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), - started_at_ms: request.started_at_ms, - item, - }; - outgoing - .send_server_notification(ServerNotification::ItemStarted(notification)) - .await; - let params = DynamicToolCallParams { - thread_id: conversation_id.to_string(), - turn_id: turn_id.clone(), - call_id: call_id.clone(), - namespace, - tool: tool.clone(), - arguments: arguments.clone(), - }; - let (_pending_request_id, rx) = outgoing - .send_request(ServerRequestPayload::DynamicToolCall(params)) - .await; - tokio::spawn(async move { - crate::dynamic_tools::on_call_response(call_id, rx, conversation).await; - }); - } - EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) => { - // Deprecated MCP tool-call events are still fanned out for legacy clients. - // App-server v2 receives the canonical TurnItem::McpToolCall lifecycle instead. - } - msg @ (EventMsg::DynamicToolCallResponse(_) + EventMsg::DynamicToolCallRequest(_) + | EventMsg::DynamicToolCallResponse(_) | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentSpawnEnd(_) | EventMsg::CollabAgentInteractionBegin(_) @@ -874,9 +832,23 @@ pub(crate) async fn apply_bespoke_event_handling( | EventMsg::CollabWaitingBegin(_) | EventMsg::CollabWaitingEnd(_) | EventMsg::CollabCloseBegin(_) + | EventMsg::CollabCloseEnd(_) | EventMsg::CollabResumeBegin(_) | EventMsg::CollabResumeEnd(_) - | EventMsg::AgentMessageContentDelta(_) + | EventMsg::SubAgentActivity(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandEnd(_) => { + // Deprecated item lifecycle events are still fanned out for raw-event and rollout + // compatibility consumers. + // App-server v2 receives canonical TurnItem lifecycle instead, and dispatches + // dynamic tool requests from canonical DynamicToolCall starts. + } + EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) => { + // Deprecated MCP tool-call events are still fanned out for raw-event and rollout + // compatibility consumers. + // App-server v2 receives the canonical TurnItem::McpToolCall lifecycle instead. + } + msg @ (EventMsg::AgentMessageContentDelta(_) | EventMsg::PlanDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) @@ -888,43 +860,9 @@ pub(crate) async fn apply_bespoke_event_handling( ); outgoing.send_server_notification(notification).await; } - EventMsg::SubAgentActivity(activity) => { - if activity.kind == SubAgentActivityKind::Interrupted - && thread_manager - .get_thread(activity.agent_thread_id) - .await - .is_err() - { - thread_watch_manager - .remove_thread(&activity.agent_thread_id.to_string()) - .await; - } - let notification = item_event_to_server_notification( - EventMsg::SubAgentActivity(activity), - &conversation_id.to_string(), - &event_turn_id, - ); - outgoing.send_server_notification(notification).await; - } - EventMsg::CollabCloseEnd(end_event) => { - if thread_manager - .get_thread(end_event.receiver_thread_id) - .await - .is_err() - { - thread_watch_manager - .remove_thread(&end_event.receiver_thread_id.to_string()) - .await; - } - let notification = item_event_to_server_notification( - EventMsg::CollabCloseEnd(end_event), - &conversation_id.to_string(), - &event_turn_id, - ); - outgoing.send_server_notification(notification).await; - } EventMsg::ContextCompacted(..) => { - // Core still fans out this deprecated event for legacy clients; + // Core still fans out this deprecated event for raw-event and rollout compatibility + // consumers; // v2 clients receive the canonical ContextCompaction item instead. } EventMsg::DeprecationNotice(event) => { @@ -1027,10 +965,61 @@ pub(crate) async fn apply_bespoke_event_handling( .send_server_notification(ServerNotification::ItemCompleted(completed)) .await; } - msg @ (EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) - | EventMsg::PatchApplyUpdated(_) - | EventMsg::TerminalInteraction(_)) => { + EventMsg::ItemStarted(event) => { + let should_emit = match &event.item { + CoreTurnItem::CommandExecution(item) => thread_state + .lock() + .await + .turn_summary + .command_execution_started + .insert(item.id.clone()), + _ => true, + }; + let dynamic_tool_call_params = match &event.item { + CoreTurnItem::DynamicToolCall(item) => Some(DynamicToolCallParams { + thread_id: conversation_id.to_string(), + turn_id: event.turn_id.clone(), + call_id: item.id.clone(), + namespace: item.namespace.clone(), + tool: item.tool.clone(), + arguments: item.arguments.clone(), + }), + _ => None, + }; + if should_emit { + let notification = item_event_to_server_notification( + EventMsg::ItemStarted(event), + &conversation_id.to_string(), + &event_turn_id, + ); + outgoing.send_server_notification(notification).await; + } + if let Some(params) = dynamic_tool_call_params { + let call_id = params.call_id.clone(); + let (_pending_request_id, rx) = outgoing + .send_request(ServerRequestPayload::DynamicToolCall(params)) + .await; + tokio::spawn(async move { + crate::dynamic_tools::on_call_response(call_id, rx, conversation).await; + }); + } + } + EventMsg::ItemCompleted(event) => { + apply_canonical_item_completed_side_effects( + &thread_manager, + &thread_watch_manager, + &thread_state, + &event.item, + ) + .await; + let notification = item_event_to_server_notification( + EventMsg::ItemCompleted(event), + &conversation_id.to_string(), + &event_turn_id, + ); + outgoing.send_server_notification(notification).await; + } + msg @ (EventMsg::PatchApplyUpdated(_) | EventMsg::TerminalInteraction(_)) => { let notification = item_event_to_server_notification( msg, &conversation_id.to_string(), @@ -1103,36 +1092,10 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) => { - // Core still fans out these deprecated events for legacy clients; + // Core still fans out these deprecated events for raw-event and rollout compatibility + // consumers; // v2 clients receive the canonical FileChange item instead. } - EventMsg::ExecCommandBegin(exec_command_begin_event) => { - if matches!( - exec_command_begin_event.source, - codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction - ) { - // TerminalInteraction is the v2 surface for unified exec - // stdin/poll events. Suppress the legacy CommandExecution - // item so clients do not render the same wait twice. - return; - } - let item_id = exec_command_begin_event.call_id.clone(); - let first_start = { - let mut state = thread_state.lock().await; - state - .turn_summary - .command_execution_started - .insert(item_id.clone()) - }; - if first_start { - let notification = item_event_to_server_notification( - EventMsg::ExecCommandBegin(exec_command_begin_event), - &conversation_id.to_string(), - &event_turn_id, - ); - outgoing.send_server_notification(notification).await; - } - } EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => { let notification = item_event_to_server_notification( EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event), @@ -1141,31 +1104,6 @@ pub(crate) async fn apply_bespoke_event_handling( ); outgoing.send_server_notification(notification).await; } - EventMsg::ExecCommandEnd(exec_command_end_event) => { - let call_id = exec_command_end_event.call_id.clone(); - { - let mut state = thread_state.lock().await; - state - .turn_summary - .command_execution_started - .remove(&call_id); - } - if matches!( - exec_command_end_event.source, - codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction - ) { - // The paired begin event is suppressed above; keep the - // completion out of v2 as well so no orphan legacy item is - // emitted for unified exec interactions. - return; - } - let notification = item_event_to_server_notification( - EventMsg::ExecCommandEnd(exec_command_end_event), - &conversation_id.to_string(), - &event_turn_id, - ); - outgoing.send_server_notification(notification).await; - } // If this is a TurnAborted, reply to any pending interrupt requests. EventMsg::TurnAborted(turn_aborted_event) => { // All per-thread requests are bound to a turn, so abort them. @@ -1369,6 +1307,52 @@ async fn emit_turn_completed_with_status( .await; } +async fn apply_canonical_item_completed_side_effects( + thread_manager: &Arc, + thread_watch_manager: &ThreadWatchManager, + thread_state: &Arc>, + item: &CoreTurnItem, +) { + match item { + CoreTurnItem::CommandExecution(item) => { + thread_state + .lock() + .await + .turn_summary + .command_execution_started + .remove(&item.id); + } + CoreTurnItem::SubAgentActivity(activity) + if activity.kind == SubAgentActivityKind::Interrupted => + { + remove_missing_thread_watch( + thread_manager, + thread_watch_manager, + activity.agent_thread_id, + ) + .await; + } + CoreTurnItem::CollabAgentToolCall(item) if item.tool == CoreCollabAgentTool::CloseAgent => { + for thread_id in &item.receiver_thread_ids { + remove_missing_thread_watch(thread_manager, thread_watch_manager, *thread_id).await; + } + } + _ => {} + } +} + +async fn remove_missing_thread_watch( + thread_manager: &Arc, + thread_watch_manager: &ThreadWatchManager, + thread_id: ThreadId, +) { + if thread_manager.get_thread(thread_id).await.is_err() { + thread_watch_manager + .remove_thread(&thread_id.to_string()) + .await; + } +} + #[allow(clippy::too_many_arguments)] async fn start_command_execution_item( conversation_id: &ThreadId, @@ -2190,10 +2174,15 @@ mod tests { use codex_app_server_protocol::AutoReviewDecisionSource; use codex_app_server_protocol::GuardianApprovalReviewStatus; use codex_app_server_protocol::JSONRPCErrorError; + use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::TurnPlanStepStatus; use codex_login::CodexAuth; use codex_protocol::AgentPath; + use codex_protocol::items::DynamicToolCallItem; + use codex_protocol::items::DynamicToolCallStatus as CoreDynamicToolCallStatus; use codex_protocol::items::HookPromptFragment; + use codex_protocol::items::SubAgentActivityItem; + use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::items::build_hook_prompt_message; use codex_protocol::models::FileSystemPermissions as CoreFileSystemPermissions; use codex_protocol::models::NetworkPermissions as CoreNetworkPermissions; @@ -2210,11 +2199,12 @@ mod tests { use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GuardianAssessmentEvent; use codex_protocol::protocol::GuardianAssessmentStatus; + use codex_protocol::protocol::ItemCompletedEvent; + use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; - use codex_protocol::protocol::SubAgentActivityEvent; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::UserMessageEvent; @@ -3437,13 +3427,17 @@ mod tests { apply_bespoke_event_handling( Event { id: "turn-1".to_string(), - msg: EventMsg::SubAgentActivity(SubAgentActivityEvent { - event_id: "activity-1".to_string(), - occurred_at_ms: 42, - agent_thread_id: child_thread_id, - agent_path: AgentPath::try_from("/root/worker") - .expect("agent path should parse"), - kind: SubAgentActivityKind::Interrupted, + msg: EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: conversation_id, + turn_id: "turn-1".to_string(), + item: CoreTurnItem::SubAgentActivity(SubAgentActivityItem { + id: "activity-1".to_string(), + kind: SubAgentActivityKind::Interrupted, + agent_thread_id: child_thread_id, + agent_path: AgentPath::try_from("/root/worker") + .expect("agent path should parse"), + }), + completed_at_ms: 42, }), }, conversation_id, @@ -3487,6 +3481,92 @@ mod tests { Ok(()) } + #[tokio::test] + async fn canonical_dynamic_tool_start_emits_item_and_requests_client() -> Result<()> { + let codex_home = TempDir::new()?; + let config = load_default_config_for_test(&codex_home).await; + let thread_manager = Arc::new( + codex_core::test_support::thread_manager_with_models_provider_and_home( + CodexAuth::create_dummy_chatgpt_auth_for_testing(), + config.model_provider.clone(), + config.codex_home.to_path_buf(), + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + ), + ); + let codex_core::NewThread { + thread_id: conversation_id, + thread: conversation, + .. + } = thread_manager.start_thread(config).await?; + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + conversation_id, + ); + + apply_bespoke_event_handling( + Event { + id: "turn-1".to_string(), + msg: EventMsg::ItemStarted(ItemStartedEvent { + thread_id: conversation_id, + turn_id: "turn-1".to_string(), + item: CoreTurnItem::DynamicToolCall(DynamicToolCallItem { + id: "dynamic-1".to_string(), + namespace: Some("apps".to_string()), + tool: "lookup".to_string(), + arguments: json!({"id": "123"}), + status: CoreDynamicToolCallStatus::InProgress, + content_items: None, + success: None, + error: None, + duration: None, + }), + started_at_ms: 42, + }), + }, + conversation_id, + conversation, + thread_manager, + outgoing, + new_thread_state(), + ThreadWatchManager::new(), + Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)), + "test-provider".to_string(), + ) + .await; + + let item_started = recv_broadcast_message(&mut rx).await?; + let OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) = + item_started + else { + bail!("unexpected message: {item_started:?}"); + }; + assert_eq!(payload.item.id(), "dynamic-1"); + + let request = recv_broadcast_message(&mut rx).await?; + let OutgoingMessage::Request(ServerRequest::DynamicToolCall { params, .. }) = request + else { + bail!("unexpected message: {request:?}"); + }; + assert_eq!( + params, + DynamicToolCallParams { + thread_id: conversation_id.to_string(), + turn_id: "turn-1".to_string(), + call_id: "dynamic-1".to_string(), + namespace: Some("apps".to_string()), + tool: "lookup".to_string(), + arguments: json!({"id": "123"}), + } + ); + Ok(()) + } + #[tokio::test] async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> { let conversation_id = ThreadId::new(); diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index f45616eed824..03321304a368 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -1773,6 +1773,9 @@ impl Session { let show_raw_agent_reasoning = self.show_raw_agent_reasoning(); for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) { + self.services + .rollout_thread_trace + .record_tool_call_event(turn_context.sub_id.clone(), &legacy); let legacy_event = Event { id: turn_context.sub_id.clone(), msg: legacy, diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index d588e5a35611..a15c53e16b44 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -25,16 +25,15 @@ use crate::tools::runtimes::RuntimePathPrepends; use crate::tools::runtimes::apply_package_path_prepend; use crate::tools::runtimes::maybe_wrap_shell_lc_with_snapshot; use crate::tools::runtimes::strip_managed_proxy_env; -use crate::turn_timing::now_unix_timestamp_ms; use crate::user_shell_command::user_shell_command_record_item; use codex_protocol::exec_output::ExecToolCallOutput; use codex_protocol::exec_output::StreamOutput; +use codex_protocol::items::CommandExecutionItem; +use codex_protocol::items::CommandExecutionStatus; +use codex_protocol::items::TurnItem; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::ExecCommandBeginEvent; -use codex_protocol::protocol::ExecCommandEndEvent; use codex_protocol::protocol::ExecCommandSource; -use codex_protocol::protocol::ExecCommandStatus; use codex_protocol::protocol::TurnStartedEvent; use codex_sandboxing::SandboxType; use codex_shell_command::parse_command::parse_command; @@ -181,18 +180,23 @@ pub(crate) async fn execute_user_shell_command( let parsed_cmd = parse_command(&display_command); session - .send_event( + .emit_turn_item_started( turn_context.as_ref(), - EventMsg::ExecCommandBegin(ExecCommandBeginEvent { - call_id: call_id.clone(), + &TurnItem::CommandExecution(CommandExecutionItem { + id: call_id.clone(), process_id: None, - turn_id: turn_context.sub_id.clone(), - started_at_ms: now_unix_timestamp_ms(), command: display_command.clone(), cwd: cwd.clone().into(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, interaction_input: None, + status: CommandExecutionStatus::InProgress, + stdout: None, + stderr: None, + aggregated_output: None, + exit_code: None, + duration: None, + formatted_output: None, }), ) .await; @@ -259,57 +263,53 @@ pub(crate) async fn execute_user_shell_command( ) .await; session - .send_event( + .emit_turn_item_completed( turn_context.as_ref(), - EventMsg::ExecCommandEnd(ExecCommandEndEvent { - call_id, + TurnItem::CommandExecution(CommandExecutionItem { + id: call_id, process_id: None, - turn_id: turn_context.sub_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), command: display_command.clone(), cwd: cwd.clone().into(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, interaction_input: None, - stdout: String::new(), - stderr: aborted_message.clone(), - aggregated_output: aborted_message.clone(), - exit_code: -1, - duration: Duration::ZERO, - formatted_output: aborted_message, - status: ExecCommandStatus::Failed, + status: CommandExecutionStatus::Failed, + stdout: Some(String::new()), + stderr: Some(aborted_message.clone()), + aggregated_output: Some(aborted_message.clone()), + exit_code: Some(-1), + duration: Some(Duration::ZERO), + formatted_output: Some(aborted_message), }), ) .await; } Ok(Ok(output)) => { session - .send_event( + .emit_turn_item_completed( turn_context.as_ref(), - EventMsg::ExecCommandEnd(ExecCommandEndEvent { - call_id: call_id.clone(), + TurnItem::CommandExecution(CommandExecutionItem { + id: call_id.clone(), process_id: None, - turn_id: turn_context.sub_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), command: display_command.clone(), cwd: cwd.clone().into(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, interaction_input: None, - stdout: output.stdout.text.clone(), - stderr: output.stderr.text.clone(), - aggregated_output: output.aggregated_output.text.clone(), - exit_code: output.exit_code, - duration: output.duration, - formatted_output: format_exec_output_str( - &output, - turn_context.model_info.truncation_policy.into(), - ), status: if output.exit_code == 0 { - ExecCommandStatus::Completed + CommandExecutionStatus::Completed } else { - ExecCommandStatus::Failed + CommandExecutionStatus::Failed }, + stdout: Some(output.stdout.text.clone()), + stderr: Some(output.stderr.text.clone()), + aggregated_output: Some(output.aggregated_output.text.clone()), + exit_code: Some(output.exit_code), + duration: Some(output.duration), + formatted_output: Some(format_exec_output_str( + &output, + turn_context.model_info.truncation_policy.into(), + )), }), ) .await; @@ -329,28 +329,26 @@ pub(crate) async fn execute_user_shell_command( timed_out: false, }; session - .send_event( + .emit_turn_item_completed( turn_context.as_ref(), - EventMsg::ExecCommandEnd(ExecCommandEndEvent { - call_id, + TurnItem::CommandExecution(CommandExecutionItem { + id: call_id, process_id: None, - turn_id: turn_context.sub_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), command: display_command, cwd: cwd.into(), parsed_cmd, source: ExecCommandSource::UserShell, interaction_input: None, - stdout: exec_output.stdout.text.clone(), - stderr: exec_output.stderr.text.clone(), - aggregated_output: exec_output.aggregated_output.text.clone(), - exit_code: exec_output.exit_code, - duration: exec_output.duration, - formatted_output: format_exec_output_str( + status: CommandExecutionStatus::Failed, + stdout: Some(exec_output.stdout.text.clone()), + stderr: Some(exec_output.stderr.text.clone()), + aggregated_output: Some(exec_output.aggregated_output.text.clone()), + exit_code: Some(exec_output.exit_code), + duration: Some(exec_output.duration), + formatted_output: Some(format_exec_output_str( &exec_output, turn_context.model_info.truncation_policy.into(), - ), - status: ExecCommandStatus::Failed, + )), }), ) .await; diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index 757125a03d6d..5b30c6a1568d 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -8,6 +8,8 @@ use codex_apply_patch::AppliedPatchDelta; use codex_protocol::error::CodexErr; use codex_protocol::error::SandboxErr; use codex_protocol::exec_output::ExecToolCallOutput; +use codex_protocol::items::CommandExecutionItem; +use codex_protocol::items::CommandExecutionStatus; use codex_protocol::items::FileChangeItem; use codex_protocol::items::TurnItem; use codex_protocol::parse_command::ParsedCommand; @@ -102,19 +104,43 @@ pub(crate) async fn emit_exec_command_begin( interaction_input: Option, process_id: Option<&str>, ) { + if matches!(source, ExecCommandSource::UnifiedExecInteraction) { + ctx.session + .send_event( + ctx.turn, + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: ctx.call_id.to_string(), + process_id: process_id.map(str::to_owned), + turn_id: ctx.turn.sub_id.clone(), + started_at_ms: now_unix_timestamp_ms(), + command: command.to_vec(), + cwd: cwd.clone(), + parsed_cmd: parsed_cmd.to_vec(), + source, + interaction_input, + }), + ) + .await; + return; + } ctx.session - .send_event( + .emit_turn_item_started( ctx.turn, - EventMsg::ExecCommandBegin(ExecCommandBeginEvent { - call_id: ctx.call_id.to_string(), + &TurnItem::CommandExecution(CommandExecutionItem { + id: ctx.call_id.to_string(), process_id: process_id.map(str::to_owned), - turn_id: ctx.turn.sub_id.clone(), - started_at_ms: now_unix_timestamp_ms(), command: command.to_vec(), cwd: cwd.clone(), parsed_cmd: parsed_cmd.to_vec(), source, interaction_input, + status: CommandExecutionStatus::InProgress, + stdout: None, + stderr: None, + aggregated_output: None, + exit_code: None, + duration: None, + formatted_output: None, }), ) .await; @@ -542,26 +568,50 @@ async fn emit_exec_end( exec_input: ExecCommandInput<'_>, exec_result: ExecCommandResult, ) { + if matches!(exec_input.source, ExecCommandSource::UnifiedExecInteraction) { + ctx.session + .send_event( + ctx.turn, + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: ctx.call_id.to_string(), + process_id: exec_input.process_id.map(str::to_owned), + turn_id: ctx.turn.sub_id.clone(), + completed_at_ms: now_unix_timestamp_ms(), + command: exec_input.command.to_vec(), + cwd: exec_input.cwd.clone(), + parsed_cmd: exec_input.parsed_cmd.to_vec(), + source: exec_input.source, + interaction_input: exec_input.interaction_input.map(str::to_owned), + stdout: exec_result.stdout, + stderr: exec_result.stderr, + aggregated_output: exec_result.aggregated_output, + exit_code: exec_result.exit_code, + duration: exec_result.duration, + formatted_output: exec_result.formatted_output, + status: exec_result.status, + }), + ) + .await; + return; + } ctx.session - .send_event( + .emit_turn_item_completed( ctx.turn, - EventMsg::ExecCommandEnd(ExecCommandEndEvent { - call_id: ctx.call_id.to_string(), + TurnItem::CommandExecution(CommandExecutionItem { + id: ctx.call_id.to_string(), process_id: exec_input.process_id.map(str::to_owned), - turn_id: ctx.turn.sub_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), command: exec_input.command.to_vec(), cwd: exec_input.cwd.clone(), parsed_cmd: exec_input.parsed_cmd.to_vec(), source: exec_input.source, interaction_input: exec_input.interaction_input.map(str::to_owned), - stdout: exec_result.stdout, - stderr: exec_result.stderr, - aggregated_output: exec_result.aggregated_output, - exit_code: exec_result.exit_code, - duration: exec_result.duration, - formatted_output: exec_result.formatted_output, - status: exec_result.status, + status: exec_result.status.into(), + stdout: Some(exec_result.stdout), + stderr: Some(exec_result.stderr), + aggregated_output: Some(exec_result.aggregated_output), + exit_code: Some(exec_result.exit_code), + duration: Some(exec_result.duration), + formatted_output: Some(exec_result.formatted_output), }), ) .await; diff --git a/codex-rs/core/src/tools/handlers/dynamic.rs b/codex-rs/core/src/tools/handlers/dynamic.rs index 28af3434ff12..4b8fa037974e 100644 --- a/codex-rs/core/src/tools/handlers/dynamic.rs +++ b/codex-rs/core/src/tools/handlers/dynamic.rs @@ -9,14 +9,13 @@ use crate::tools::handlers::parse_arguments; use crate::tools::registry::CoreToolRuntime; use crate::tools::registry::ToolExecutor; use crate::tools::registry::ToolExposure; -use crate::turn_timing::now_unix_timestamp_ms; -use codex_protocol::dynamic_tools::DynamicToolCallRequest; use codex_protocol::dynamic_tools::DynamicToolFunctionSpec; use codex_protocol::dynamic_tools::DynamicToolNamespaceSpec; use codex_protocol::dynamic_tools::DynamicToolResponse; +use codex_protocol::items::DynamicToolCallItem; +use codex_protocol::items::DynamicToolCallStatus; +use codex_protocol::items::TurnItem; use codex_protocol::models::FunctionCallOutputContentItem; -use codex_protocol::protocol::DynamicToolCallResponseEvent; -use codex_protocol::protocol::EventMsg; use codex_tools::ResponsesApiNamespace; use codex_tools::ResponsesApiNamespaceTool; use codex_tools::ToolName; @@ -178,7 +177,6 @@ async fn request_dynamic_tool( ) -> Option { let namespace = tool_name.namespace; let tool = tool_name.name; - let turn_id = turn_context.sub_id.clone(); let (tx_response, rx_response) = oneshot::channel(); let event_id = call_id.clone(); let prev_entry = { @@ -196,45 +194,55 @@ async fn request_dynamic_tool( } let started_at = Instant::now(); - let started_at_ms = now_unix_timestamp_ms(); - let event = EventMsg::DynamicToolCallRequest(DynamicToolCallRequest { - call_id: call_id.clone(), - turn_id: turn_id.clone(), - started_at_ms, - namespace: namespace.clone(), - tool: tool.clone(), - arguments: arguments.clone(), - }); - session.send_event(turn_context, event).await; + session + .emit_turn_item_started( + turn_context, + &TurnItem::DynamicToolCall(DynamicToolCallItem { + id: call_id.clone(), + namespace: namespace.clone(), + tool: tool.clone(), + arguments: arguments.clone(), + status: DynamicToolCallStatus::InProgress, + content_items: None, + success: None, + error: None, + duration: None, + }), + ) + .await; let response = rx_response.await.ok(); - let response_event = match &response { - Some(response) => EventMsg::DynamicToolCallResponse(DynamicToolCallResponseEvent { - call_id, - turn_id, - completed_at_ms: now_unix_timestamp_ms(), + let item = match &response { + Some(response) => DynamicToolCallItem { + id: call_id, namespace, tool, arguments, - content_items: response.content_items.clone(), - success: response.success, + status: if response.success { + DynamicToolCallStatus::Completed + } else { + DynamicToolCallStatus::Failed + }, + content_items: Some(response.content_items.clone()), + success: Some(response.success), error: None, - duration: started_at.elapsed(), - }), - None => EventMsg::DynamicToolCallResponse(DynamicToolCallResponseEvent { - call_id, - turn_id, - completed_at_ms: now_unix_timestamp_ms(), + duration: Some(started_at.elapsed()), + }, + None => DynamicToolCallItem { + id: call_id, namespace, tool, arguments, - content_items: Vec::new(), - success: false, + status: DynamicToolCallStatus::Failed, + content_items: Some(Vec::new()), + success: Some(false), error: Some("dynamic tool call was cancelled before receiving a response".to_string()), - duration: started_at.elapsed(), - }), + duration: Some(started_at.elapsed()), + }, }; - session.send_event(turn_context, response_event).await; + session + .emit_turn_item_completed(turn_context, TurnItem::DynamicToolCall(item)) + .await; response } diff --git a/codex-rs/core/src/tools/handlers/multi_agents.rs b/codex-rs/core/src/tools/handlers/multi_agents.rs index 8bc96d19c2a1..0b2e831bb0f2 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents.rs @@ -20,19 +20,13 @@ use crate::tools::handlers::parse_arguments; use crate::tools::registry::CoreToolRuntime; use crate::tools::registry::ToolExecutor; use codex_protocol::ThreadId; +use codex_protocol::items::CollabAgentTool; +use codex_protocol::items::CollabAgentToolCallItem; +use codex_protocol::items::CollabAgentToolCallStatus; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::openai_models::ReasoningEffort; -use codex_protocol::protocol::CollabAgentInteractionBeginEvent; -use codex_protocol::protocol::CollabAgentInteractionEndEvent; use codex_protocol::protocol::CollabAgentRef; -use codex_protocol::protocol::CollabAgentSpawnBeginEvent; -use codex_protocol::protocol::CollabAgentSpawnEndEvent; -use codex_protocol::protocol::CollabCloseBeginEvent; -use codex_protocol::protocol::CollabCloseEndEvent; -use codex_protocol::protocol::CollabResumeBeginEvent; -use codex_protocol::protocol::CollabResumeEndEvent; -use codex_protocol::protocol::CollabWaitingBeginEvent; -use codex_protocol::protocol::CollabWaitingEndEvent; use codex_protocol::user_input::UserInput; use codex_tools::ToolName; use codex_tools::ToolSearchInfo; @@ -91,6 +85,37 @@ mod send_input; mod spawn; pub(crate) mod wait; +pub(crate) async fn emit_collab_tool_call_started( + session: &Session, + turn: &TurnContext, + item: CollabAgentToolCallItem, +) { + session + .emit_turn_item_started(turn, &TurnItem::CollabAgentToolCall(item)) + .await; +} + +pub(crate) async fn emit_collab_tool_call_completed( + session: &Session, + turn: &TurnContext, + item: CollabAgentToolCallItem, +) { + session + .emit_turn_item_completed(turn, TurnItem::CollabAgentToolCall(item)) + .await; +} + +pub(crate) fn collab_tool_call_status( + status: &AgentStatus, + receiver_thread_id: Option, +) -> CollabAgentToolCallStatus { + match status { + AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed, + _ if receiver_thread_id.is_some() => CollabAgentToolCallStatus::Completed, + _ => CollabAgentToolCallStatus::Failed, + } +} + #[cfg(test)] #[path = "multi_agents_tests.rs"] mod tests; diff --git a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs index d1017adb65c9..2bf89ce488c8 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs @@ -1,6 +1,5 @@ use super::*; use crate::tools::handlers::multi_agents_spec::create_close_agent_tool_v1; -use crate::turn_timing::now_unix_timestamp_ms; use codex_protocol::error::CodexErr; use codex_tools::ToolSpec; @@ -43,18 +42,23 @@ async fn handle_close_agent( let receiver_agent = session.services.agent_control.get_agent_metadata(agent_id); let known_agent = receiver_agent.is_some(); let receiver_agent = receiver_agent.unwrap_or_default(); - session - .send_event( - &turn, - CollabCloseBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id: agent_id, - } - .into(), - ) - .await; + emit_collab_tool_call_started( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::CloseAgent, + status: CollabAgentToolCallStatus::InProgress, + sender_thread_id: session.thread_id, + receiver_thread_ids: vec![agent_id], + receiver_agents: Vec::new(), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: Default::default(), + }, + ) + .await; let status = match session .services .agent_control @@ -67,21 +71,27 @@ async fn handle_close_agent( } Err(err) => { let status = session.services.agent_control.get_status(agent_id).await; - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id: call_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id(), - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname.clone(), - receiver_agent_role: receiver_agent.agent_role.clone(), - status, - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::CloseAgent, + status: collab_tool_call_status(&status, Some(agent_id)), + sender_thread_id: session.thread_id(), + receiver_thread_ids: vec![agent_id], + receiver_agents: vec![CollabAgentRef { + thread_id: agent_id, + agent_nickname: receiver_agent.agent_nickname.clone(), + agent_role: receiver_agent.agent_role.clone(), + }], + prompt: None, + model: None, + reasoning_effort: None, + agents_states: [(agent_id, status)].into_iter().collect(), + }, + ) + .await; return Err(collab_agent_error(agent_id, err)); } }; @@ -89,21 +99,27 @@ async fn handle_close_agent( .await .map_err(|err| collab_agent_error(agent_id, err)) .map(|_| ()); - session - .send_event( - &turn, - CollabCloseEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id: agent_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::CloseAgent, + status: collab_tool_call_status(&status, Some(agent_id)), + sender_thread_id: session.thread_id, + receiver_thread_ids: vec![agent_id], + receiver_agents: vec![CollabAgentRef { + thread_id: agent_id, + agent_nickname: receiver_agent.agent_nickname, + agent_role: receiver_agent.agent_role, + }], + prompt: None, + model: None, + reasoning_effort: None, + agents_states: [(agent_id, status.clone())].into_iter().collect(), + }, + ) + .await; result?; Ok(CloseAgentResult { diff --git a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs index fa464ddbbb26..e9bdfddd9b07 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/resume_agent.rs @@ -1,7 +1,6 @@ use super::*; use crate::agent::next_thread_spawn_depth; use crate::tools::handlers::multi_agents_spec::create_resume_agent_tool; -use crate::turn_timing::now_unix_timestamp_ms; use codex_tools::ToolSpec; use std::sync::Arc; @@ -56,20 +55,27 @@ async fn handle_resume_agent( )); } - session - .send_event( - &turn, - CollabResumeBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname.clone(), - receiver_agent_role: receiver_agent.agent_role.clone(), - } - .into(), - ) - .await; + emit_collab_tool_call_started( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::ResumeAgent, + status: CollabAgentToolCallStatus::InProgress, + sender_thread_id: session.thread_id, + receiver_thread_ids: vec![receiver_thread_id], + receiver_agents: vec![CollabAgentRef { + thread_id: receiver_thread_id, + agent_nickname: receiver_agent.agent_nickname.clone(), + agent_role: receiver_agent.agent_role.clone(), + }], + prompt: None, + model: None, + reasoning_effort: None, + agents_states: Default::default(), + }, + ) + .await; let mut status = session .services @@ -112,21 +118,27 @@ async fn handle_resume_agent( } else { (receiver_agent, None) }; - session - .send_event( - &turn, - CollabResumeEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id(), - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - status: status.clone(), - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::ResumeAgent, + status: collab_tool_call_status(&status, Some(receiver_thread_id)), + sender_thread_id: session.thread_id(), + receiver_thread_ids: vec![receiver_thread_id], + receiver_agents: vec![CollabAgentRef { + thread_id: receiver_thread_id, + agent_nickname: receiver_agent.agent_nickname, + agent_role: receiver_agent.agent_role, + }], + prompt: None, + model: None, + reasoning_effort: None, + agents_states: [(receiver_thread_id, status.clone())].into_iter().collect(), + }, + ) + .await; if let Some(err) = error { return Err(err); diff --git a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs index 375bb50ac7cb..731c39195efc 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/send_input.rs @@ -1,7 +1,6 @@ use super::*; use crate::agent::control::render_input_preview; use crate::tools::handlers::multi_agents_spec::create_send_input_tool_v1; -use crate::turn_timing::now_unix_timestamp_ms; use codex_tools::ToolSpec; pub(crate) struct Handler; @@ -66,19 +65,23 @@ impl Handler { .await .map_err(|err| collab_agent_error(receiver_thread_id, err))?; } - session - .send_event( - &turn, - CollabAgentInteractionBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id, - prompt: prompt.clone(), - } - .into(), - ) - .await; + emit_collab_tool_call_started( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::SendInput, + status: CollabAgentToolCallStatus::InProgress, + sender_thread_id: session.thread_id, + receiver_thread_ids: vec![receiver_thread_id], + receiver_agents: Vec::new(), + prompt: Some(prompt.clone()), + model: None, + reasoning_effort: None, + agents_states: Default::default(), + }, + ) + .await; let agent_control = session.services.agent_control.clone(); let result = agent_control .send_input(receiver_thread_id, input_items) @@ -89,22 +92,27 @@ impl Handler { .agent_control .get_status(receiver_thread_id) .await; - session - .send_event( - &turn, - CollabAgentInteractionEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_id, - receiver_agent_nickname: receiver_agent.agent_nickname, - receiver_agent_role: receiver_agent.agent_role, - prompt, - status, - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::SendInput, + status: collab_tool_call_status(&status, Some(receiver_thread_id)), + sender_thread_id: session.thread_id, + receiver_thread_ids: vec![receiver_thread_id], + receiver_agents: vec![CollabAgentRef { + thread_id: receiver_thread_id, + agent_nickname: receiver_agent.agent_nickname, + agent_role: receiver_agent.agent_role, + }], + prompt: Some(prompt), + model: None, + reasoning_effort: None, + agents_states: [(receiver_thread_id, status)].into_iter().collect(), + }, + ) + .await; let submission_id = result?; Ok(boxed_tool_output(SendInputResult { submission_id })) diff --git a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs index 204385934e2b..d935a2697f6f 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/spawn.rs @@ -8,7 +8,6 @@ use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::apply_role_to_config; use crate::tools::handlers::multi_agents_spec::SpawnAgentToolOptions; use crate::tools::handlers::multi_agents_spec::create_spawn_agent_tool_v1; -use crate::turn_timing::now_unix_timestamp_ms; use codex_tools::ToolSpec; #[derive(Default)] @@ -70,20 +69,23 @@ async fn handle_spawn_agent( "Agent depth limit reached. Solve the task yourself.".to_string(), )); } - session - .send_event( - &turn, - CollabAgentSpawnBeginEvent { - call_id: call_id.clone(), - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - prompt: prompt.clone(), - model: args.model.clone().unwrap_or_default(), - reasoning_effort: args.reasoning_effort.clone().unwrap_or_default(), - } - .into(), - ) - .await; + emit_collab_tool_call_started( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::SpawnAgent, + status: CollabAgentToolCallStatus::InProgress, + sender_thread_id: session.thread_id, + receiver_thread_ids: Vec::new(), + receiver_agents: Vec::new(), + prompt: Some(prompt.clone()), + model: Some(args.model.clone().unwrap_or_default()), + reasoning_effort: Some(args.reasoning_effort.clone().unwrap_or_default()), + agents_states: Default::default(), + }, + ) + .await; let mut config = build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?; if let Some(service_tier) = args.service_tier.as_ref() { @@ -177,24 +179,35 @@ async fn handle_spawn_agent( .and_then(|snapshot| snapshot.reasoning_effort.clone()) .unwrap_or(args.reasoning_effort.unwrap_or_default()); let nickname = new_agent_nickname.clone(); - session - .send_event( - &turn, - CollabAgentSpawnEndEvent { - call_id, - completed_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - new_thread_id, - new_agent_nickname, - new_agent_role, - prompt, - model: effective_model, - reasoning_effort: effective_reasoning_effort, - status, - } - .into(), - ) - .await; + let receiver_thread_ids = new_thread_id.into_iter().collect(); + let receiver_agents = new_thread_id + .map(|thread_id| CollabAgentRef { + thread_id, + agent_nickname: new_agent_nickname, + agent_role: new_agent_role, + }) + .into_iter() + .collect(); + let agents_states = new_thread_id + .map(|thread_id| [(thread_id, status.clone())].into_iter().collect()) + .unwrap_or_default(); + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::SpawnAgent, + status: collab_tool_call_status(&status, new_thread_id), + sender_thread_id: session.thread_id, + receiver_thread_ids, + receiver_agents, + prompt: Some(prompt), + model: Some(effective_model), + reasoning_effort: Some(effective_reasoning_effort), + agents_states, + }, + ) + .await; let new_thread_id = result?.thread_id; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( diff --git a/codex-rs/core/src/tools/handlers/multi_agents/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents/wait.rs index be22b369082f..5441bd8c4bfd 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/wait.rs @@ -2,7 +2,6 @@ use super::*; use crate::agent::status::is_final; use crate::tools::handlers::multi_agents_spec::WaitAgentTimeoutOptions; use crate::tools::handlers::multi_agents_spec::create_wait_agent_tool_v1; -use crate::turn_timing::now_unix_timestamp_ms; use codex_protocol::error::CodexErr; use codex_tools::ToolSpec; use futures::FutureExt; @@ -96,19 +95,23 @@ impl Handler { ms => ms.clamp(MIN_WAIT_TIMEOUT_MS, MAX_WAIT_TIMEOUT_MS), }; - session - .send_event( - &turn, - CollabWaitingBeginEvent { - started_at_ms: now_unix_timestamp_ms(), - sender_thread_id: session.thread_id, - receiver_thread_ids: receiver_thread_ids.clone(), - receiver_agents: receiver_agents.clone(), - call_id: call_id.clone(), - } - .into(), - ) - .await; + emit_collab_tool_call_started( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::Wait, + status: CollabAgentToolCallStatus::InProgress, + sender_thread_id: session.thread_id, + receiver_thread_ids: receiver_thread_ids.clone(), + receiver_agents: receiver_agents.clone(), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: Default::default(), + }, + ) + .await; let mut status_rxs = Vec::with_capacity(receiver_thread_ids.len()); let mut initial_final_statuses = Vec::new(); @@ -127,22 +130,23 @@ impl Handler { Err(err) => { let mut statuses = HashMap::with_capacity(1); statuses.insert(*id, session.services.agent_control.get_status(*id).await); - session - .send_event( - &turn, - CollabWaitingEndEvent { - sender_thread_id: session.thread_id, - call_id: call_id.clone(), - completed_at_ms: now_unix_timestamp_ms(), - agent_statuses: build_wait_agent_statuses( - &statuses, - &receiver_agents, - ), - statuses, - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::Wait, + status: wait_tool_call_status(&statuses), + sender_thread_id: session.thread_id, + receiver_thread_ids: statuses.keys().copied().collect(), + receiver_agents: wait_receiver_agents(&statuses, &receiver_agents), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: statuses, + }, + ) + .await; return Err(collab_agent_error(*id, err)); } } @@ -182,7 +186,6 @@ impl Handler { let timed_out = statuses.is_empty(); let statuses_by_id = statuses.clone().into_iter().collect::>(); - let agent_statuses = build_wait_agent_statuses(&statuses_by_id, &receiver_agents); let result = WaitAgentResult { status: statuses .into_iter() @@ -196,24 +199,70 @@ impl Handler { timed_out, }; - session - .send_event( - &turn, - CollabWaitingEndEvent { - sender_thread_id: session.thread_id, - call_id, - completed_at_ms: now_unix_timestamp_ms(), - agent_statuses, - statuses: statuses_by_id, - } - .into(), - ) - .await; + emit_collab_tool_call_completed( + &session, + &turn, + CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::Wait, + status: wait_tool_call_status(&statuses_by_id), + sender_thread_id: session.thread_id, + receiver_thread_ids: statuses_by_id.keys().copied().collect(), + receiver_agents: wait_receiver_agents(&statuses_by_id, &receiver_agents), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: statuses_by_id, + }, + ) + .await; Ok(boxed_tool_output(result)) } } +fn wait_tool_call_status(statuses: &HashMap) -> CollabAgentToolCallStatus { + if statuses + .values() + .any(|status| matches!(status, AgentStatus::Errored(_) | AgentStatus::NotFound)) + { + CollabAgentToolCallStatus::Failed + } else { + CollabAgentToolCallStatus::Completed + } +} + +fn wait_receiver_agents( + statuses: &HashMap, + receiver_agents: &[CollabAgentRef], +) -> Vec { + if statuses.is_empty() { + return Vec::new(); + } + + let mut agents = Vec::with_capacity(statuses.len()); + let mut seen = HashMap::with_capacity(receiver_agents.len()); + for receiver_agent in receiver_agents { + seen.insert(receiver_agent.thread_id, ()); + if statuses.contains_key(&receiver_agent.thread_id) { + agents.push(receiver_agent.clone()); + } + } + + let mut extras = statuses + .keys() + .filter(|thread_id| !seen.contains_key(thread_id)) + .map(|thread_id| CollabAgentRef { + thread_id: *thread_id, + agent_nickname: None, + agent_role: None, + }) + .collect::>(); + extras.sort_by_key(|agent| agent.thread_id.to_string()); + agents.extend(extras); + agents +} + impl CoreToolRuntime for Handler { fn matches_kind(&self, payload: &ToolPayload) -> bool { matches!(payload, ToolPayload::Function { .. }) diff --git a/codex-rs/core/src/tools/handlers/multi_agents_common.rs b/codex-rs/core/src/tools/handlers/multi_agents_common.rs index 8f91ce5b8ee2..a4dbe074aa46 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_common.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_common.rs @@ -1,4 +1,3 @@ -use crate::agent::AgentStatus; use crate::config::Config; use crate::config::DEFAULT_MULTI_AGENT_V2_MIN_WAIT_TIMEOUT_MS; use crate::config::HARD_MAX_MULTI_AGENT_V2_TIMEOUT_MS; @@ -16,15 +15,12 @@ use codex_protocol::models::BaseInstructions; use codex_protocol::models::ResponseInputItem; use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::openai_models::ReasoningEffortPreset; -use codex_protocol::protocol::CollabAgentRef; -use codex_protocol::protocol::CollabAgentStatusEntry; use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::user_input::UserInput; use serde::Serialize; use serde_json::Value as JsonValue; -use std::collections::HashMap; /// Minimum wait timeout to prevent tight polling loops from burning CPU. pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = DEFAULT_MULTI_AGENT_V2_MIN_WAIT_TIMEOUT_MS; @@ -72,43 +68,6 @@ where }) } -pub(crate) fn build_wait_agent_statuses( - statuses: &HashMap, - receiver_agents: &[CollabAgentRef], -) -> Vec { - if statuses.is_empty() { - return Vec::new(); - } - - let mut entries = Vec::with_capacity(statuses.len()); - let mut seen = HashMap::with_capacity(receiver_agents.len()); - for receiver_agent in receiver_agents { - seen.insert(receiver_agent.thread_id, ()); - if let Some(status) = statuses.get(&receiver_agent.thread_id) { - entries.push(CollabAgentStatusEntry { - thread_id: receiver_agent.thread_id, - agent_nickname: receiver_agent.agent_nickname.clone(), - agent_role: receiver_agent.agent_role.clone(), - status: status.clone(), - }); - } - } - - let mut extras = statuses - .iter() - .filter(|(thread_id, _)| !seen.contains_key(thread_id)) - .map(|(thread_id, status)| CollabAgentStatusEntry { - thread_id: *thread_id, - agent_nickname: None, - agent_role: None, - status: status.clone(), - }) - .collect::>(); - extras.sort_by_key(|entry| entry.thread_id.to_string()); - entries.extend(extras); - entries -} - pub(crate) fn collab_spawn_error(err: CodexErr) -> FunctionCallError { match err { CodexErr::UnsupportedOperation(message) if message == "thread manager dropped" => { diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2.rs index cae3497a1319..e59c777006cf 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2.rs @@ -12,12 +12,14 @@ use crate::tools::handlers::parse_arguments; use crate::tools::registry::CoreToolRuntime; use crate::tools::registry::ToolExecutor; use codex_protocol::AgentPath; +use codex_protocol::items::CollabAgentTool; +use codex_protocol::items::CollabAgentToolCallItem; +use codex_protocol::items::CollabAgentToolCallStatus; +use codex_protocol::items::SubAgentActivityItem; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseInputItem; use codex_protocol::openai_models::ReasoningEffort; -use codex_protocol::protocol::CollabWaitingBeginEvent; -use codex_protocol::protocol::CollabWaitingEndEvent; use codex_protocol::protocol::InterAgentCommunication; -use codex_protocol::protocol::SubAgentActivityEvent; use codex_protocol::protocol::SubAgentActivityKind; use codex_protocol::user_input::UserInput; use codex_tools::ToolName; @@ -40,6 +42,16 @@ mod send_message; mod spawn; pub(crate) mod wait; +pub(crate) async fn emit_sub_agent_activity( + session: &crate::session::session::Session, + turn: &crate::session::turn_context::TurnContext, + item: SubAgentActivityItem, +) { + session + .emit_turn_item_completed(turn, TurnItem::SubAgentActivity(item)) + .await; +} + pub(super) fn communication_from_tool_message( author: AgentPath, recipient: AgentPath, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs index aeed055b4eff..00eb149ca166 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/interrupt_agent.rs @@ -1,6 +1,5 @@ use super::*; use crate::tools::handlers::multi_agents_spec::create_interrupt_agent_tool_v2; -use crate::turn_timing::now_unix_timestamp_ms; use codex_protocol::error::CodexErr; use codex_tools::ToolSpec; @@ -71,19 +70,17 @@ async fn handle_interrupt_agent( Err(err) => Err(collab_agent_error(agent_id, err)), }; result?; - session - .send_event( - &turn, - SubAgentActivityEvent { - event_id: call_id, - occurred_at_ms: now_unix_timestamp_ms(), - agent_thread_id: agent_id, - agent_path: receiver_agent_path, - kind: SubAgentActivityKind::Interrupted, - } - .into(), - ) - .await; + emit_sub_agent_activity( + &session, + &turn, + SubAgentActivityItem { + id: call_id, + agent_thread_id: agent_id, + agent_path: receiver_agent_path, + kind: SubAgentActivityKind::Interrupted, + }, + ) + .await; Ok(InterruptAgentResult { previous_status: status, diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs index 7fdb80e8bcb2..70ab5606a42f 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs @@ -5,7 +5,6 @@ use super::*; use crate::tools::context::FunctionToolOutput; -use crate::turn_timing::now_unix_timestamp_ms; use codex_protocol::protocol::InterAgentCommunication; #[derive(Clone, Copy, PartialEq, Eq)] @@ -108,19 +107,17 @@ pub(crate) async fn handle_message_string_tool( .await .map_err(|err| collab_agent_error(receiver_thread_id, err)); result?; - session - .send_event( - &turn, - SubAgentActivityEvent { - event_id: call_id, - occurred_at_ms: now_unix_timestamp_ms(), - agent_thread_id: receiver_thread_id, - agent_path: receiver_agent_path, - kind: SubAgentActivityKind::Interacted, - } - .into(), - ) - .await; + emit_sub_agent_activity( + &session, + &turn, + SubAgentActivityItem { + id: call_id, + agent_thread_id: receiver_thread_id, + agent_path: receiver_agent_path, + kind: SubAgentActivityKind::Interacted, + }, + ) + .await; Ok(FunctionToolOutput::from_text(String::new(), Some(true))) } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index 2186a4a6d605..76d5014a79e7 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -6,7 +6,6 @@ use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::apply_role_to_config; use crate::tools::handlers::multi_agents_spec::SpawnAgentToolOptions; use crate::tools::handlers::multi_agents_spec::create_spawn_agent_tool_v2; -use crate::turn_timing::now_unix_timestamp_ms; use codex_protocol::AgentPath; use codex_protocol::protocol::Op; use codex_tools::ToolSpec; @@ -144,19 +143,17 @@ async fn handle_spawn_agent( .as_ref() .and_then(|snapshot| snapshot.session_source.get_nickname()) .or(spawned_agent.metadata.agent_nickname); - session - .send_event( - &turn, - SubAgentActivityEvent { - event_id: call_id, - occurred_at_ms: now_unix_timestamp_ms(), - agent_thread_id: new_thread_id, - agent_path: new_agent_path.clone(), - kind: SubAgentActivityKind::Started, - } - .into(), - ) - .await; + emit_sub_agent_activity( + &session, + &turn, + SubAgentActivityItem { + id: call_id, + agent_thread_id: new_thread_id, + agent_path: new_agent_path.clone(), + kind: SubAgentActivityKind::Started, + }, + ) + .await; let role_tag = role_name.unwrap_or(DEFAULT_ROLE_NAME); turn.session_telemetry.counter( "codex.multi_agent.spawn", diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs index e97f47238b67..4e2eced26037 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/wait.rs @@ -2,7 +2,6 @@ use super::*; use crate::session::InputQueueActivity; use crate::tools::handlers::multi_agents_spec::WaitAgentTimeoutOptions; use crate::tools::handlers::multi_agents_spec::create_wait_agent_tool_v2; -use crate::turn_timing::now_unix_timestamp_ms; use codex_tools::ToolSpec; use std::collections::HashMap; use std::time::Duration; @@ -76,16 +75,20 @@ impl Handler { .await; session - .send_event( + .emit_turn_item_started( &turn, - CollabWaitingBeginEvent { - started_at_ms: now_unix_timestamp_ms(), + &TurnItem::CollabAgentToolCall(CollabAgentToolCallItem { + id: call_id.clone(), + tool: CollabAgentTool::Wait, + status: CollabAgentToolCallStatus::InProgress, sender_thread_id: session.thread_id, receiver_thread_ids: Vec::new(), receiver_agents: Vec::new(), - call_id: call_id.clone(), - } - .into(), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: Default::default(), + }), ) .await; @@ -94,16 +97,20 @@ impl Handler { let result = WaitAgentResult::from_outcome(outcome); session - .send_event( + .emit_turn_item_completed( &turn, - CollabWaitingEndEvent { + TurnItem::CollabAgentToolCall(CollabAgentToolCallItem { + id: call_id, + tool: CollabAgentTool::Wait, + status: CollabAgentToolCallStatus::Completed, sender_thread_id: session.thread_id, - call_id, - completed_at_ms: now_unix_timestamp_ms(), - agent_statuses: Vec::new(), - statuses: HashMap::new(), - } - .into(), + receiver_thread_ids: Vec::new(), + receiver_agents: Vec::new(), + prompt: None, + model: None, + reasoning_effort: None, + agents_states: HashMap::new(), + }), ) .await; diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index ef22918cd299..ce7f3a3bfdfa 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -577,9 +577,8 @@ impl UnifiedExecProcessManager { } } } else { - // Short‑lived command: emit ExecCommandEnd immediately using the - // same helper as the background watcher, so all end events share - // one implementation. + // Short-lived command: emit the completed command item immediately + // using the same helper as the background watcher. let finish_result = finish_deferred_network_approval_after_process_exit_for_session( Some(&context.session), deferred_network_approval.take(), diff --git a/codex-rs/core/src/unified_exec/process_manager_tests.rs b/codex-rs/core/src/unified_exec/process_manager_tests.rs index 096cbe0a1359..20a930b5525c 100644 --- a/codex-rs/core/src/unified_exec/process_manager_tests.rs +++ b/codex-rs/core/src/unified_exec/process_manager_tests.rs @@ -315,21 +315,24 @@ async fn failed_initial_end_for_unstored_process_uses_fallback_output() { let event = tokio::time::timeout(Duration::from_secs(1), rx_event.recv()) .await - .expect("timed out waiting for failed exec end event") + .expect("timed out waiting for failed command execution item") .expect("event channel closed"); - let codex_protocol::protocol::EventMsg::ExecCommandEnd(end_event) = event.msg else { - panic!("expected ExecCommandEnd event"); + let codex_protocol::protocol::EventMsg::ItemCompleted(completed_event) = event.msg else { + panic!("expected ItemCompleted event"); }; - assert_eq!(end_event.call_id, "call-unified-denied"); + let codex_protocol::items::TurnItem::CommandExecution(item) = completed_event.item else { + panic!("expected CommandExecution item"); + }; + assert_eq!(item.id, "call-unified-denied"); assert_eq!( - end_event.status, - codex_protocol::protocol::ExecCommandStatus::Failed + item.status, + codex_protocol::items::CommandExecutionStatus::Failed ); - assert_eq!(end_event.exit_code, -1); - assert_eq!(end_event.process_id.as_deref(), Some("123")); + assert_eq!(item.exit_code, Some(-1)); + assert_eq!(item.process_id.as_deref(), Some("123")); assert_eq!( - end_event.aggregated_output, - "PRE_DENIAL_MARKER\nNetwork access denied" + item.aggregated_output.as_deref(), + Some("PRE_DENIAL_MARKER\nNetwork access denied") ); } diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index c31957344637..2c9d073f0dce 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -10,26 +10,13 @@ use crate::models::ResponseItem; use crate::models::WebSearchAction; use crate::openai_models::ReasoningEffort as ReasoningEffortConfig; use crate::parse_command::ParsedCommand; -use crate::protocol::AgentMessageEvent; -use crate::protocol::AgentReasoningEvent; -use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::AgentStatus; use crate::protocol::CollabAgentRef; -use crate::protocol::ContextCompactedEvent; -use crate::protocol::EventMsg; use crate::protocol::ExecCommandSource; +use crate::protocol::ExecCommandStatus; use crate::protocol::FileChange; -use crate::protocol::ImageGenerationEndEvent; -use crate::protocol::McpInvocation; -use crate::protocol::McpToolCallBeginEvent; -use crate::protocol::McpToolCallEndEvent; -use crate::protocol::PatchApplyBeginEvent; -use crate::protocol::PatchApplyEndEvent; use crate::protocol::PatchApplyStatus; use crate::protocol::SubAgentActivityKind; -use crate::protocol::UserMessageEvent; -use crate::protocol::ViewImageToolCallEvent; -use crate::protocol::WebSearchEndEvent; use crate::user_input::ByteRange; use crate::user_input::TextElement; use crate::user_input::UserInput; @@ -151,6 +138,16 @@ pub enum CommandExecutionStatus { Declined, } +impl From for CommandExecutionStatus { + fn from(value: ExecCommandStatus) -> Self { + match value { + ExecCommandStatus::Completed => Self::Completed, + ExecCommandStatus::Failed => Self::Failed, + ExecCommandStatus::Declined => Self::Declined, + } + } +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema, PartialEq)] pub struct CommandExecutionItem { pub id: String, @@ -387,10 +384,6 @@ impl ContextCompactionItem { id: uuid::Uuid::new_v4().to_string(), } } - - pub fn as_legacy_event(&self) -> EventMsg { - EventMsg::ContextCompacted(ContextCompactedEvent {}) - } } impl Default for ContextCompactionItem { @@ -408,20 +401,6 @@ impl UserMessageItem { } } - pub fn as_legacy_event(&self) -> EventMsg { - // Legacy user-message events flatten only text inputs into `message` and - // rebase text element ranges onto that concatenated text. - EventMsg::UserMessage(UserMessageEvent { - client_id: self.client_id.clone(), - message: self.message(), - images: Some(self.image_urls()), - image_details: self.image_details(), - local_images: self.local_image_paths(), - local_image_details: self.local_image_details(), - text_elements: self.text_elements(), - }) - } - pub fn message(&self) -> String { self.content .iter() @@ -609,134 +588,6 @@ impl AgentMessageItem { memory_citation: None, } } - - pub fn as_legacy_events(&self) -> Vec { - self.content - .iter() - .map(|c| match c { - AgentMessageContent::Text { text } => EventMsg::AgentMessage(AgentMessageEvent { - message: text.clone(), - phase: self.phase.clone(), - memory_citation: self.memory_citation.clone(), - }), - }) - .collect() - } -} - -impl ReasoningItem { - pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { - let mut events = Vec::new(); - for summary in &self.summary_text { - events.push(EventMsg::AgentReasoning(AgentReasoningEvent { - text: summary.clone(), - })); - } - - if show_raw_agent_reasoning { - for entry in &self.raw_content { - events.push(EventMsg::AgentReasoningRawContent( - AgentReasoningRawContentEvent { - text: entry.clone(), - }, - )); - } - } - - events - } -} - -impl WebSearchItem { - pub fn as_legacy_event(&self) -> EventMsg { - EventMsg::WebSearchEnd(WebSearchEndEvent { - call_id: self.id.clone(), - query: self.query.clone(), - action: self.action.clone(), - }) - } -} - -impl ImageGenerationItem { - pub fn as_legacy_event(&self) -> EventMsg { - EventMsg::ImageGenerationEnd(ImageGenerationEndEvent { - call_id: self.id.clone(), - status: self.status.clone(), - revised_prompt: self.revised_prompt.clone(), - result: self.result.clone(), - saved_path: self.saved_path.clone(), - }) - } -} - -impl FileChangeItem { - pub fn as_legacy_begin_event(&self, turn_id: String) -> EventMsg { - EventMsg::PatchApplyBegin(PatchApplyBeginEvent { - call_id: self.id.clone(), - turn_id, - auto_approved: self.auto_approved.unwrap_or(false), - changes: self.changes.clone(), - }) - } - - pub fn as_legacy_end_event(&self, turn_id: String) -> Option { - let status = self.status.clone()?; - Some(EventMsg::PatchApplyEnd(PatchApplyEndEvent { - call_id: self.id.clone(), - turn_id, - stdout: self.stdout.clone().unwrap_or_default(), - stderr: self.stderr.clone().unwrap_or_default(), - success: status == PatchApplyStatus::Completed, - changes: self.changes.clone(), - status, - })) - } -} - -impl McpToolCallItem { - pub fn as_legacy_begin_event(&self) -> EventMsg { - EventMsg::McpToolCallBegin(McpToolCallBeginEvent { - call_id: self.id.clone(), - invocation: McpInvocation { - server: self.server.clone(), - tool: self.tool.clone(), - arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), - }, - connector_id: self.connector_id.clone(), - mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), - link_id: self.link_id.clone(), - app_name: self.app_name.clone(), - template_id: self.template_id.clone(), - action_name: self.action_name.clone(), - plugin_id: self.plugin_id.clone(), - }) - } - - pub fn as_legacy_end_event(&self) -> Option { - let result = match (&self.result, &self.error) { - (Some(result), _) => Ok(result.clone()), - (None, Some(error)) => Err(error.message.clone()), - (None, None) => return None, - }; - - Some(EventMsg::McpToolCallEnd(McpToolCallEndEvent { - call_id: self.id.clone(), - invocation: McpInvocation { - server: self.server.clone(), - tool: self.tool.clone(), - arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), - }, - mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), - connector_id: self.connector_id.clone(), - link_id: self.link_id.clone(), - app_name: self.app_name.clone(), - template_id: self.template_id.clone(), - action_name: self.action_name.clone(), - plugin_id: self.plugin_id.clone(), - duration: self.duration?, - result, - })) - } } impl TurnItem { @@ -760,35 +611,6 @@ impl TurnItem { TurnItem::ContextCompaction(item) => item.id.clone(), } } - - pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { - match self { - TurnItem::UserMessage(item) => vec![item.as_legacy_event()], - TurnItem::HookPrompt(_) => Vec::new(), - TurnItem::AgentMessage(item) => item.as_legacy_events(), - TurnItem::Plan(_) => Vec::new(), - TurnItem::CommandExecution(_) - | TurnItem::DynamicToolCall(_) - | TurnItem::CollabAgentToolCall(_) => Vec::new(), - TurnItem::SubAgentActivity(_) => Vec::new(), - TurnItem::WebSearch(item) => vec![item.as_legacy_event()], - TurnItem::ImageView(item) => { - vec![EventMsg::ViewImageToolCall(ViewImageToolCallEvent { - call_id: item.id.clone(), - path: item.path.clone(), - })] - } - TurnItem::Sleep(_) => Vec::new(), - TurnItem::ImageGeneration(item) => vec![item.as_legacy_event()], - TurnItem::FileChange(item) => item - .as_legacy_end_event(String::new()) - .into_iter() - .collect(), - TurnItem::McpToolCall(item) => item.as_legacy_end_event().into_iter().collect(), - TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), - TurnItem::ContextCompaction(item) => vec![item.as_legacy_event()], - } - } } #[cfg(test)] diff --git a/codex-rs/protocol/src/legacy_events.rs b/codex-rs/protocol/src/legacy_events.rs new file mode 100644 index 000000000000..c12a2747a29a --- /dev/null +++ b/codex-rs/protocol/src/legacy_events.rs @@ -0,0 +1,593 @@ +use crate::ThreadId; +use crate::dynamic_tools::DynamicToolCallRequest; +use crate::items::AgentMessageContent; +use crate::items::AgentMessageItem; +use crate::items::CollabAgentTool; +use crate::items::CollabAgentToolCallItem; +use crate::items::CollabAgentToolCallStatus; +use crate::items::CommandExecutionItem; +use crate::items::CommandExecutionStatus; +use crate::items::ContextCompactionItem; +use crate::items::DynamicToolCallItem; +use crate::items::DynamicToolCallStatus; +use crate::items::FileChangeItem; +use crate::items::ImageGenerationItem; +use crate::items::McpToolCallItem; +use crate::items::ReasoningItem; +use crate::items::SubAgentActivityItem; +use crate::items::TurnItem; +use crate::items::UserMessageItem; +use crate::items::WebSearchItem; +use crate::protocol::AgentMessageContentDeltaEvent; +use crate::protocol::AgentMessageEvent; +use crate::protocol::AgentReasoningEvent; +use crate::protocol::AgentReasoningRawContentEvent; +use crate::protocol::AgentStatus; +use crate::protocol::CollabAgentInteractionBeginEvent; +use crate::protocol::CollabAgentInteractionEndEvent; +use crate::protocol::CollabAgentSpawnBeginEvent; +use crate::protocol::CollabAgentSpawnEndEvent; +use crate::protocol::CollabAgentStatusEntry; +use crate::protocol::CollabCloseBeginEvent; +use crate::protocol::CollabCloseEndEvent; +use crate::protocol::CollabResumeBeginEvent; +use crate::protocol::CollabResumeEndEvent; +use crate::protocol::CollabWaitingBeginEvent; +use crate::protocol::CollabWaitingEndEvent; +use crate::protocol::ContextCompactedEvent; +use crate::protocol::DynamicToolCallResponseEvent; +use crate::protocol::EventMsg; +use crate::protocol::ExecCommandBeginEvent; +use crate::protocol::ExecCommandEndEvent; +use crate::protocol::ExecCommandStatus; +use crate::protocol::ImageGenerationBeginEvent; +use crate::protocol::ImageGenerationEndEvent; +use crate::protocol::ItemCompletedEvent; +use crate::protocol::ItemStartedEvent; +use crate::protocol::McpInvocation; +use crate::protocol::McpToolCallBeginEvent; +use crate::protocol::McpToolCallEndEvent; +use crate::protocol::PatchApplyBeginEvent; +use crate::protocol::PatchApplyEndEvent; +use crate::protocol::PatchApplyStatus; +use crate::protocol::ReasoningContentDeltaEvent; +use crate::protocol::ReasoningRawContentDeltaEvent; +use crate::protocol::SubAgentActivityEvent; +use crate::protocol::UserMessageEvent; +use crate::protocol::ViewImageToolCallEvent; +use crate::protocol::WebSearchBeginEvent; +use crate::protocol::WebSearchEndEvent; + +/// Converts canonical item lifecycle events back into the legacy raw event stream used by +/// compatibility consumers that have not migrated to `TurnItem`. +pub trait HasLegacyEvent { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec; +} + +impl ContextCompactionItem { + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::ContextCompacted(ContextCompactedEvent {}) + } +} + +impl UserMessageItem { + pub fn as_legacy_event(&self) -> EventMsg { + // Legacy user-message events flatten only text inputs into `message` and + // rebase text element ranges onto that concatenated text. + EventMsg::UserMessage(UserMessageEvent { + client_id: self.client_id.clone(), + message: self.message(), + images: Some(self.image_urls()), + image_details: self.image_details(), + local_images: self.local_image_paths(), + local_image_details: self.local_image_details(), + text_elements: self.text_elements(), + }) + } +} + +impl AgentMessageItem { + pub fn as_legacy_events(&self) -> Vec { + self.content + .iter() + .map(|c| match c { + AgentMessageContent::Text { text } => EventMsg::AgentMessage(AgentMessageEvent { + message: text.clone(), + phase: self.phase.clone(), + memory_citation: self.memory_citation.clone(), + }), + }) + .collect() + } +} + +impl ReasoningItem { + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + let mut events = Vec::new(); + for summary in &self.summary_text { + events.push(EventMsg::AgentReasoning(AgentReasoningEvent { + text: summary.clone(), + })); + } + + if show_raw_agent_reasoning { + for entry in &self.raw_content { + events.push(EventMsg::AgentReasoningRawContent( + AgentReasoningRawContentEvent { + text: entry.clone(), + }, + )); + } + } + + events + } +} + +impl CommandExecutionItem { + pub(crate) fn as_legacy_begin_event(&self, turn_id: String, started_at_ms: i64) -> EventMsg { + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: self.id.clone(), + process_id: self.process_id.clone(), + turn_id, + started_at_ms, + command: self.command.clone(), + cwd: self.cwd.clone(), + parsed_cmd: self.parsed_cmd.clone(), + source: self.source, + interaction_input: self.interaction_input.clone(), + }) + } + + pub(crate) fn as_legacy_end_event( + &self, + turn_id: String, + completed_at_ms: i64, + ) -> Option { + let status = match self.status { + CommandExecutionStatus::InProgress => return None, + CommandExecutionStatus::Completed => ExecCommandStatus::Completed, + CommandExecutionStatus::Failed => ExecCommandStatus::Failed, + CommandExecutionStatus::Declined => ExecCommandStatus::Declined, + }; + Some(EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: self.id.clone(), + process_id: self.process_id.clone(), + turn_id, + completed_at_ms, + command: self.command.clone(), + cwd: self.cwd.clone(), + parsed_cmd: self.parsed_cmd.clone(), + source: self.source, + interaction_input: self.interaction_input.clone(), + stdout: self.stdout.clone().unwrap_or_default(), + stderr: self.stderr.clone().unwrap_or_default(), + aggregated_output: self.aggregated_output.clone().unwrap_or_default(), + exit_code: self.exit_code.unwrap_or_default(), + duration: self.duration.unwrap_or_default(), + formatted_output: self.formatted_output.clone().unwrap_or_default(), + status, + })) + } +} + +impl DynamicToolCallItem { + pub(crate) fn as_legacy_request_event(&self, turn_id: String, started_at_ms: i64) -> EventMsg { + EventMsg::DynamicToolCallRequest(DynamicToolCallRequest { + call_id: self.id.clone(), + turn_id, + started_at_ms, + namespace: self.namespace.clone(), + tool: self.tool.clone(), + arguments: self.arguments.clone(), + }) + } + + pub(crate) fn as_legacy_response_event( + &self, + turn_id: String, + completed_at_ms: i64, + ) -> Option { + if matches!(self.status, DynamicToolCallStatus::InProgress) { + return None; + } + Some(EventMsg::DynamicToolCallResponse( + DynamicToolCallResponseEvent { + call_id: self.id.clone(), + turn_id, + completed_at_ms, + namespace: self.namespace.clone(), + tool: self.tool.clone(), + arguments: self.arguments.clone(), + content_items: self.content_items.clone().unwrap_or_default(), + success: self.success.unwrap_or(false), + error: self.error.clone(), + duration: self.duration.unwrap_or_default(), + }, + )) + } +} + +impl CollabAgentToolCallItem { + pub(crate) fn as_legacy_begin_event(&self, started_at_ms: i64) -> Option { + let receiver_thread_id = self.receiver_thread_ids.first().copied(); + match self.tool { + CollabAgentTool::SpawnAgent => Some(EventMsg::CollabAgentSpawnBegin( + CollabAgentSpawnBeginEvent { + call_id: self.id.clone(), + started_at_ms, + sender_thread_id: self.sender_thread_id, + prompt: self.prompt.clone().unwrap_or_default(), + model: self.model.clone().unwrap_or_default(), + reasoning_effort: self.reasoning_effort.clone().unwrap_or_default(), + }, + )), + CollabAgentTool::SendInput => receiver_thread_id.map(|receiver_thread_id| { + EventMsg::CollabAgentInteractionBegin(CollabAgentInteractionBeginEvent { + call_id: self.id.clone(), + started_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + prompt: self.prompt.clone().unwrap_or_default(), + }) + }), + CollabAgentTool::ResumeAgent => receiver_thread_id.map(|receiver_thread_id| { + let (receiver_agent_nickname, receiver_agent_role) = + self.receiver_agent_identity(receiver_thread_id); + EventMsg::CollabResumeBegin(CollabResumeBeginEvent { + call_id: self.id.clone(), + started_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + receiver_agent_nickname, + receiver_agent_role, + }) + }), + CollabAgentTool::Wait => Some(EventMsg::CollabWaitingBegin(CollabWaitingBeginEvent { + started_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_ids: self.receiver_thread_ids.clone(), + receiver_agents: self.receiver_agents.clone(), + call_id: self.id.clone(), + })), + CollabAgentTool::CloseAgent => receiver_thread_id.map(|receiver_thread_id| { + EventMsg::CollabCloseBegin(CollabCloseBeginEvent { + call_id: self.id.clone(), + started_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + }) + }), + } + } + + pub(crate) fn as_legacy_end_event(&self, completed_at_ms: i64) -> Option { + if matches!(self.status, CollabAgentToolCallStatus::InProgress) { + return None; + } + let receiver_thread_id = self.receiver_thread_ids.first().copied(); + match self.tool { + CollabAgentTool::SpawnAgent => { + let (new_agent_nickname, new_agent_role) = receiver_thread_id + .map(|thread_id| self.receiver_agent_identity(thread_id)) + .unwrap_or_default(); + Some(EventMsg::CollabAgentSpawnEnd(CollabAgentSpawnEndEvent { + call_id: self.id.clone(), + completed_at_ms, + sender_thread_id: self.sender_thread_id, + new_thread_id: receiver_thread_id, + new_agent_nickname, + new_agent_role, + prompt: self.prompt.clone().unwrap_or_default(), + model: self.model.clone().unwrap_or_default(), + reasoning_effort: self.reasoning_effort.clone().unwrap_or_default(), + status: receiver_thread_id + .map(|thread_id| self.agent_status(thread_id)) + .unwrap_or(AgentStatus::NotFound), + })) + } + CollabAgentTool::SendInput => receiver_thread_id.map(|receiver_thread_id| { + let (receiver_agent_nickname, receiver_agent_role) = + self.receiver_agent_identity(receiver_thread_id); + EventMsg::CollabAgentInteractionEnd(CollabAgentInteractionEndEvent { + call_id: self.id.clone(), + completed_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + receiver_agent_nickname, + receiver_agent_role, + prompt: self.prompt.clone().unwrap_or_default(), + status: self.agent_status(receiver_thread_id), + }) + }), + CollabAgentTool::ResumeAgent => receiver_thread_id.map(|receiver_thread_id| { + let (receiver_agent_nickname, receiver_agent_role) = + self.receiver_agent_identity(receiver_thread_id); + EventMsg::CollabResumeEnd(CollabResumeEndEvent { + call_id: self.id.clone(), + completed_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + receiver_agent_nickname, + receiver_agent_role, + status: self.agent_status(receiver_thread_id), + }) + }), + CollabAgentTool::Wait => Some(EventMsg::CollabWaitingEnd(CollabWaitingEndEvent { + sender_thread_id: self.sender_thread_id, + call_id: self.id.clone(), + completed_at_ms, + agent_statuses: self + .receiver_agents + .iter() + .map(|agent| CollabAgentStatusEntry { + thread_id: agent.thread_id, + agent_nickname: agent.agent_nickname.clone(), + agent_role: agent.agent_role.clone(), + status: self.agent_status(agent.thread_id), + }) + .collect(), + statuses: self.agents_states.clone(), + })), + CollabAgentTool::CloseAgent => receiver_thread_id.map(|receiver_thread_id| { + let (receiver_agent_nickname, receiver_agent_role) = + self.receiver_agent_identity(receiver_thread_id); + EventMsg::CollabCloseEnd(CollabCloseEndEvent { + call_id: self.id.clone(), + completed_at_ms, + sender_thread_id: self.sender_thread_id, + receiver_thread_id, + receiver_agent_nickname, + receiver_agent_role, + status: self.agent_status(receiver_thread_id), + }) + }), + } + } + + fn receiver_agent_identity(&self, thread_id: ThreadId) -> (Option, Option) { + let receiver_agent = self + .receiver_agents + .iter() + .find(|agent| agent.thread_id == thread_id); + ( + receiver_agent.and_then(|agent| agent.agent_nickname.clone()), + receiver_agent.and_then(|agent| agent.agent_role.clone()), + ) + } + + fn agent_status(&self, thread_id: ThreadId) -> AgentStatus { + self.agents_states + .get(&thread_id) + .cloned() + .unwrap_or(AgentStatus::NotFound) + } +} + +impl SubAgentActivityItem { + pub(crate) fn as_legacy_event(&self, occurred_at_ms: i64) -> EventMsg { + EventMsg::SubAgentActivity(SubAgentActivityEvent { + event_id: self.id.clone(), + occurred_at_ms, + agent_thread_id: self.agent_thread_id, + agent_path: self.agent_path.clone(), + kind: self.kind, + }) + } +} + +impl WebSearchItem { + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::WebSearchEnd(WebSearchEndEvent { + call_id: self.id.clone(), + query: self.query.clone(), + action: self.action.clone(), + }) + } +} + +impl ImageGenerationItem { + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::ImageGenerationEnd(ImageGenerationEndEvent { + call_id: self.id.clone(), + status: self.status.clone(), + revised_prompt: self.revised_prompt.clone(), + result: self.result.clone(), + saved_path: self.saved_path.clone(), + }) + } +} + +impl FileChangeItem { + pub fn as_legacy_begin_event(&self, turn_id: String) -> EventMsg { + EventMsg::PatchApplyBegin(PatchApplyBeginEvent { + call_id: self.id.clone(), + turn_id, + auto_approved: self.auto_approved.unwrap_or(false), + changes: self.changes.clone(), + }) + } + + pub fn as_legacy_end_event(&self, turn_id: String) -> Option { + let status = self.status.clone()?; + Some(EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id: self.id.clone(), + turn_id, + stdout: self.stdout.clone().unwrap_or_default(), + stderr: self.stderr.clone().unwrap_or_default(), + success: status == PatchApplyStatus::Completed, + changes: self.changes.clone(), + status, + })) + } +} + +impl McpToolCallItem { + pub fn as_legacy_begin_event(&self) -> EventMsg { + EventMsg::McpToolCallBegin(McpToolCallBeginEvent { + call_id: self.id.clone(), + invocation: McpInvocation { + server: self.server.clone(), + tool: self.tool.clone(), + arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), + }, + connector_id: self.connector_id.clone(), + mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), + link_id: self.link_id.clone(), + app_name: self.app_name.clone(), + template_id: self.template_id.clone(), + action_name: self.action_name.clone(), + plugin_id: self.plugin_id.clone(), + }) + } + + pub fn as_legacy_end_event(&self) -> Option { + let result = match (&self.result, &self.error) { + (Some(result), _) => Ok(result.clone()), + (None, Some(error)) => Err(error.message.clone()), + (None, None) => return None, + }; + + Some(EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id: self.id.clone(), + invocation: McpInvocation { + server: self.server.clone(), + tool: self.tool.clone(), + arguments: (!self.arguments.is_null()).then(|| self.arguments.clone()), + }, + mcp_app_resource_uri: self.mcp_app_resource_uri.clone(), + connector_id: self.connector_id.clone(), + link_id: self.link_id.clone(), + app_name: self.app_name.clone(), + template_id: self.template_id.clone(), + action_name: self.action_name.clone(), + plugin_id: self.plugin_id.clone(), + duration: self.duration?, + result, + })) + } +} + +impl TurnItem { + pub fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match self { + TurnItem::UserMessage(item) => vec![item.as_legacy_event()], + TurnItem::HookPrompt(_) => Vec::new(), + TurnItem::AgentMessage(item) => item.as_legacy_events(), + TurnItem::Plan(_) => Vec::new(), + TurnItem::CommandExecution(_) + | TurnItem::DynamicToolCall(_) + | TurnItem::CollabAgentToolCall(_) => Vec::new(), + TurnItem::SubAgentActivity(_) => Vec::new(), + TurnItem::WebSearch(item) => vec![item.as_legacy_event()], + TurnItem::ImageView(item) => { + vec![EventMsg::ViewImageToolCall(ViewImageToolCallEvent { + call_id: item.id.clone(), + path: item.path.clone(), + })] + } + TurnItem::Sleep(_) => Vec::new(), + TurnItem::ImageGeneration(item) => vec![item.as_legacy_event()], + TurnItem::FileChange(item) => item + .as_legacy_end_event(String::new()) + .into_iter() + .collect(), + TurnItem::McpToolCall(item) => item.as_legacy_end_event().into_iter().collect(), + TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), + TurnItem::ContextCompaction(item) => vec![item.as_legacy_event()], + } + } +} + +impl HasLegacyEvent for ItemStartedEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + match &self.item { + TurnItem::WebSearch(item) => vec![EventMsg::WebSearchBegin(WebSearchBeginEvent { + call_id: item.id.clone(), + })], + TurnItem::ImageView(_) => Vec::new(), + TurnItem::ImageGeneration(item) => { + vec![EventMsg::ImageGenerationBegin(ImageGenerationBeginEvent { + call_id: item.id.clone(), + })] + } + TurnItem::FileChange(item) => vec![item.as_legacy_begin_event(self.turn_id.clone())], + TurnItem::McpToolCall(item) => vec![item.as_legacy_begin_event()], + TurnItem::CommandExecution(item) => { + vec![item.as_legacy_begin_event(self.turn_id.clone(), self.started_at_ms)] + } + TurnItem::DynamicToolCall(item) => { + vec![item.as_legacy_request_event(self.turn_id.clone(), self.started_at_ms)] + } + TurnItem::CollabAgentToolCall(item) => item + .as_legacy_begin_event(self.started_at_ms) + .into_iter() + .collect(), + _ => Vec::new(), + } + } +} + +impl HasLegacyEvent for ItemCompletedEvent { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match &self.item { + TurnItem::FileChange(item) => item + .as_legacy_end_event(self.turn_id.clone()) + .into_iter() + .collect(), + TurnItem::CommandExecution(item) => item + .as_legacy_end_event(self.turn_id.clone(), self.completed_at_ms) + .into_iter() + .collect(), + TurnItem::DynamicToolCall(item) => item + .as_legacy_response_event(self.turn_id.clone(), self.completed_at_ms) + .into_iter() + .collect(), + TurnItem::CollabAgentToolCall(item) => item + .as_legacy_end_event(self.completed_at_ms) + .into_iter() + .collect(), + TurnItem::SubAgentActivity(item) => { + vec![item.as_legacy_event(self.completed_at_ms)] + } + _ => self.item.as_legacy_events(show_raw_agent_reasoning), + } + } +} + +impl HasLegacyEvent for AgentMessageContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + Vec::new() + } +} + +impl HasLegacyEvent for ReasoningContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + Vec::new() + } +} + +impl HasLegacyEvent for ReasoningRawContentDeltaEvent { + fn as_legacy_events(&self, _: bool) -> Vec { + Vec::new() + } +} + +impl HasLegacyEvent for EventMsg { + fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { + match self { + EventMsg::ItemStarted(event) => event.as_legacy_events(show_raw_agent_reasoning), + EventMsg::ItemCompleted(event) => event.as_legacy_events(show_raw_agent_reasoning), + EventMsg::AgentMessageContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + EventMsg::ReasoningContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + EventMsg::ReasoningRawContentDelta(event) => { + event.as_legacy_events(show_raw_agent_reasoning) + } + _ => Vec::new(), + } + } +} diff --git a/codex-rs/protocol/src/lib.rs b/codex-rs/protocol/src/lib.rs index c50e95d37f97..636c64ed7bfa 100644 --- a/codex-rs/protocol/src/lib.rs +++ b/codex-rs/protocol/src/lib.rs @@ -16,6 +16,7 @@ pub mod dynamic_tools; pub mod error; pub mod exec_output; pub mod items; +mod legacy_events; pub mod mcp; pub mod mcp_approval_meta; pub mod memory_citation; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 96a793bcd38f..df331e7e03d9 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -84,6 +84,7 @@ pub use crate::approvals::NetworkApprovalContext; pub use crate::approvals::NetworkApprovalProtocol; pub use crate::approvals::NetworkPolicyAmendment; pub use crate::approvals::NetworkPolicyRuleAction; +pub use crate::legacy_events::HasLegacyEvent; pub use crate::permissions::FileSystemAccessMode; pub use crate::permissions::FileSystemPath; pub use crate::permissions::FileSystemSandboxEntry; @@ -1794,25 +1795,6 @@ pub struct ItemStartedEvent { pub started_at_ms: i64, } -impl HasLegacyEvent for ItemStartedEvent { - fn as_legacy_events(&self, _: bool) -> Vec { - match &self.item { - TurnItem::WebSearch(item) => vec![EventMsg::WebSearchBegin(WebSearchBeginEvent { - call_id: item.id.clone(), - })], - TurnItem::ImageView(_) => Vec::new(), - TurnItem::ImageGeneration(item) => { - vec![EventMsg::ImageGenerationBegin(ImageGenerationBeginEvent { - call_id: item.id.clone(), - })] - } - TurnItem::FileChange(item) => vec![item.as_legacy_begin_event(self.turn_id.clone())], - TurnItem::McpToolCall(item) => vec![item.as_legacy_begin_event()], - _ => Vec::new(), - } - } -} - #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ItemCompletedEvent { pub thread_id: ThreadId, @@ -1829,22 +1811,6 @@ const fn default_item_completed_at_ms() -> i64 { 0 } -pub trait HasLegacyEvent { - fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec; -} - -impl HasLegacyEvent for ItemCompletedEvent { - fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { - match &self.item { - TurnItem::FileChange(item) => item - .as_legacy_end_event(self.turn_id.clone()) - .into_iter() - .collect(), - _ => self.item.as_legacy_events(show_raw_agent_reasoning), - } - } -} - #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct AgentMessageContentDeltaEvent { pub thread_id: String, @@ -1853,12 +1819,6 @@ pub struct AgentMessageContentDeltaEvent { pub delta: String, } -impl HasLegacyEvent for AgentMessageContentDeltaEvent { - fn as_legacy_events(&self, _: bool) -> Vec { - Vec::new() - } -} - #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct PlanDeltaEvent { pub thread_id: String, @@ -1878,12 +1838,6 @@ pub struct ReasoningContentDeltaEvent { pub summary_index: i64, } -impl HasLegacyEvent for ReasoningContentDeltaEvent { - fn as_legacy_events(&self, _: bool) -> Vec { - Vec::new() - } -} - #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ReasoningRawContentDeltaEvent { pub thread_id: String, @@ -1895,31 +1849,6 @@ pub struct ReasoningRawContentDeltaEvent { pub content_index: i64, } -impl HasLegacyEvent for ReasoningRawContentDeltaEvent { - fn as_legacy_events(&self, _: bool) -> Vec { - Vec::new() - } -} - -impl HasLegacyEvent for EventMsg { - fn as_legacy_events(&self, show_raw_agent_reasoning: bool) -> Vec { - match self { - EventMsg::ItemStarted(event) => event.as_legacy_events(show_raw_agent_reasoning), - EventMsg::ItemCompleted(event) => event.as_legacy_events(show_raw_agent_reasoning), - EventMsg::AgentMessageContentDelta(event) => { - event.as_legacy_events(show_raw_agent_reasoning) - } - EventMsg::ReasoningContentDelta(event) => { - event.as_legacy_events(show_raw_agent_reasoning) - } - EventMsg::ReasoningRawContentDelta(event) => { - event.as_legacy_events(show_raw_agent_reasoning) - } - _ => Vec::new(), - } - } -} - #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct ExitedReviewModeEvent { pub review_output: Option, @@ -4377,6 +4306,10 @@ pub struct CollabResumeEndEvent { #[cfg(test)] mod tests { use super::*; + use crate::items::CommandExecutionItem; + use crate::items::CommandExecutionStatus; + use crate::items::DynamicToolCallItem; + use crate::items::DynamicToolCallStatus; use crate::items::FileChangeItem; use crate::items::ImageGenerationItem; use crate::items::McpToolCallItem; @@ -5358,6 +5291,139 @@ mod tests { } } + #[test] + fn command_execution_item_lifecycle_emits_legacy_exec_events() { + let cwd = PathUri::from_abs_path(&test_path_buf("/tmp").abs()); + let started = ItemStartedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + started_at_ms: 10, + item: TurnItem::CommandExecution(CommandExecutionItem { + id: "exec-1".into(), + process_id: Some("pid-1".into()), + command: vec!["echo".into(), "done".into()], + cwd: cwd.clone(), + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo done".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + status: CommandExecutionStatus::InProgress, + stdout: None, + stderr: None, + aggregated_output: None, + exit_code: None, + duration: None, + formatted_output: None, + }), + }; + let completed = ItemCompletedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + completed_at_ms: 20, + item: TurnItem::CommandExecution(CommandExecutionItem { + id: "exec-1".into(), + process_id: Some("pid-1".into()), + command: vec!["echo".into(), "done".into()], + cwd, + parsed_cmd: vec![ParsedCommand::Unknown { + cmd: "echo done".into(), + }], + source: ExecCommandSource::Agent, + interaction_input: None, + status: CommandExecutionStatus::Completed, + stdout: Some("done\n".into()), + stderr: Some(String::new()), + aggregated_output: Some("done\n".into()), + exit_code: Some(0), + duration: Some(Duration::from_millis(5)), + formatted_output: Some("done\n".into()), + }), + }; + + assert!(matches!( + started.as_legacy_events(/*show_raw_agent_reasoning*/ false).as_slice(), + [EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id, + turn_id, + started_at_ms: 10, + .. + })] if call_id == "exec-1" && turn_id == "turn-1" + )); + assert!(matches!( + completed + .as_legacy_events(/*show_raw_agent_reasoning*/ false) + .as_slice(), + [EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id, + turn_id, + completed_at_ms: 20, + aggregated_output, + .. + })] if call_id == "exec-1" && turn_id == "turn-1" && aggregated_output == "done\n" + )); + } + + #[test] + fn dynamic_tool_call_item_lifecycle_emits_legacy_dynamic_tool_events() { + let started = ItemStartedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + started_at_ms: 10, + item: TurnItem::DynamicToolCall(DynamicToolCallItem { + id: "dynamic-1".into(), + namespace: Some("apps".into()), + tool: "lookup".into(), + arguments: json!({"id": "123"}), + status: DynamicToolCallStatus::InProgress, + content_items: None, + success: None, + error: None, + duration: None, + }), + }; + let completed = ItemCompletedEvent { + thread_id: ThreadId::new(), + turn_id: "turn-1".into(), + completed_at_ms: 20, + item: TurnItem::DynamicToolCall(DynamicToolCallItem { + id: "dynamic-1".into(), + namespace: Some("apps".into()), + tool: "lookup".into(), + arguments: json!({"id": "123"}), + status: DynamicToolCallStatus::Completed, + content_items: Some(vec![DynamicToolCallOutputContentItem::InputText { + text: "ok".into(), + }]), + success: Some(true), + error: None, + duration: Some(Duration::from_millis(5)), + }), + }; + + assert!(matches!( + started.as_legacy_events(/*show_raw_agent_reasoning*/ false).as_slice(), + [EventMsg::DynamicToolCallRequest(DynamicToolCallRequest { + call_id, + turn_id, + started_at_ms: 10, + .. + })] if call_id == "dynamic-1" && turn_id == "turn-1" + )); + assert!(matches!( + completed + .as_legacy_events(/*show_raw_agent_reasoning*/ false) + .as_slice(), + [EventMsg::DynamicToolCallResponse(DynamicToolCallResponseEvent { + call_id, + turn_id, + completed_at_ms: 20, + success: true, + .. + })] if call_id == "dynamic-1" && turn_id == "turn-1" + )); + } + #[test] fn item_started_event_requires_started_at_ms() { let mut value = serde_json::to_value(ItemStartedEvent { @@ -5386,6 +5452,7 @@ mod tests { let event = serde_json::from_value::(value).unwrap(); assert_eq!(event.completed_at_ms, 0); } + #[test] fn rollback_failed_error_does_not_affect_turn_status() { let event = ErrorEvent {