Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 228 additions & 148 deletions codex-rs/app-server/src/bespoke_event_handling.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1773,6 +1773,9 @@ impl Session {

let show_raw_agent_reasoning = self.show_raw_agent_reasoning();
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
self.services
.rollout_thread_trace
.record_tool_call_event(turn_context.sub_id.clone(), &legacy);
let legacy_event = Event {
id: turn_context.sub_id.clone(),
msg: legacy,
Expand Down
98 changes: 48 additions & 50 deletions codex-rs/core/src/tasks/user_shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ use crate::tools::runtimes::RuntimePathPrepends;
use crate::tools::runtimes::apply_package_path_prepend;
use crate::tools::runtimes::maybe_wrap_shell_lc_with_snapshot;
use crate::tools::runtimes::strip_managed_proxy_env;
use crate::turn_timing::now_unix_timestamp_ms;
use crate::user_shell_command::user_shell_command_record_item;
use codex_protocol::exec_output::ExecToolCallOutput;
use codex_protocol::exec_output::StreamOutput;
use codex_protocol::items::CommandExecutionItem;
use codex_protocol::items::CommandExecutionStatus;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandBeginEvent;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::ExecCommandStatus;
use codex_protocol::protocol::TurnStartedEvent;
use codex_sandboxing::SandboxType;
use codex_shell_command::parse_command::parse_command;
Expand Down Expand Up @@ -181,18 +180,23 @@ pub(crate) async fn execute_user_shell_command(

let parsed_cmd = parse_command(&display_command);
session
.send_event(
.emit_turn_item_started(
turn_context.as_ref(),
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: call_id.clone(),
&TurnItem::CommandExecution(CommandExecutionItem {
id: call_id.clone(),
process_id: None,
turn_id: turn_context.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
command: display_command.clone(),
cwd: cwd.clone().into(),
parsed_cmd: parsed_cmd.clone(),
source: ExecCommandSource::UserShell,
interaction_input: None,
status: CommandExecutionStatus::InProgress,
stdout: None,
stderr: None,
aggregated_output: None,
exit_code: None,
duration: None,
formatted_output: None,
}),
)
.await;
Expand Down Expand Up @@ -259,57 +263,53 @@ pub(crate) async fn execute_user_shell_command(
)
.await;
session
.send_event(
.emit_turn_item_completed(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
TurnItem::CommandExecution(CommandExecutionItem {
id: call_id,
process_id: None,
turn_id: turn_context.sub_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
command: display_command.clone(),
cwd: cwd.clone().into(),
parsed_cmd: parsed_cmd.clone(),
source: ExecCommandSource::UserShell,
interaction_input: None,
stdout: String::new(),
stderr: aborted_message.clone(),
aggregated_output: aborted_message.clone(),
exit_code: -1,
duration: Duration::ZERO,
formatted_output: aborted_message,
status: ExecCommandStatus::Failed,
status: CommandExecutionStatus::Failed,
stdout: Some(String::new()),
stderr: Some(aborted_message.clone()),
aggregated_output: Some(aborted_message.clone()),
exit_code: Some(-1),
duration: Some(Duration::ZERO),
formatted_output: Some(aborted_message),
}),
)
.await;
}
Ok(Ok(output)) => {
session
.send_event(
.emit_turn_item_completed(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: call_id.clone(),
TurnItem::CommandExecution(CommandExecutionItem {
id: call_id.clone(),
process_id: None,
turn_id: turn_context.sub_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
command: display_command.clone(),
cwd: cwd.clone().into(),
parsed_cmd: parsed_cmd.clone(),
source: ExecCommandSource::UserShell,
interaction_input: None,
stdout: output.stdout.text.clone(),
stderr: output.stderr.text.clone(),
aggregated_output: output.aggregated_output.text.clone(),
exit_code: output.exit_code,
duration: output.duration,
formatted_output: format_exec_output_str(
&output,
turn_context.model_info.truncation_policy.into(),
),
status: if output.exit_code == 0 {
ExecCommandStatus::Completed
CommandExecutionStatus::Completed
} else {
ExecCommandStatus::Failed
CommandExecutionStatus::Failed
},
stdout: Some(output.stdout.text.clone()),
stderr: Some(output.stderr.text.clone()),
aggregated_output: Some(output.aggregated_output.text.clone()),
exit_code: Some(output.exit_code),
duration: Some(output.duration),
formatted_output: Some(format_exec_output_str(
&output,
turn_context.model_info.truncation_policy.into(),
)),
}),
)
.await;
Expand All @@ -329,28 +329,26 @@ pub(crate) async fn execute_user_shell_command(
timed_out: false,
};
session
.send_event(
.emit_turn_item_completed(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
TurnItem::CommandExecution(CommandExecutionItem {
id: call_id,
process_id: None,
turn_id: turn_context.sub_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
command: display_command,
cwd: cwd.into(),
parsed_cmd,
source: ExecCommandSource::UserShell,
interaction_input: None,
stdout: exec_output.stdout.text.clone(),
stderr: exec_output.stderr.text.clone(),
aggregated_output: exec_output.aggregated_output.text.clone(),
exit_code: exec_output.exit_code,
duration: exec_output.duration,
formatted_output: format_exec_output_str(
status: CommandExecutionStatus::Failed,
stdout: Some(exec_output.stdout.text.clone()),
stderr: Some(exec_output.stderr.text.clone()),
aggregated_output: Some(exec_output.aggregated_output.text.clone()),
exit_code: Some(exec_output.exit_code),
duration: Some(exec_output.duration),
formatted_output: Some(format_exec_output_str(
&exec_output,
turn_context.model_info.truncation_policy.into(),
),
status: ExecCommandStatus::Failed,
)),
}),
)
.await;
Expand Down
84 changes: 67 additions & 17 deletions codex-rs/core/src/tools/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use codex_apply_patch::AppliedPatchDelta;
use codex_protocol::error::CodexErr;
use codex_protocol::error::SandboxErr;
use codex_protocol::exec_output::ExecToolCallOutput;
use codex_protocol::items::CommandExecutionItem;
use codex_protocol::items::CommandExecutionStatus;
use codex_protocol::items::FileChangeItem;
use codex_protocol::items::TurnItem;
use codex_protocol::parse_command::ParsedCommand;
Expand Down Expand Up @@ -102,19 +104,43 @@ pub(crate) async fn emit_exec_command_begin(
interaction_input: Option<String>,
process_id: Option<&str>,
) {
if matches!(source, ExecCommandSource::UnifiedExecInteraction) {
ctx.session
.send_event(
ctx.turn,
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: ctx.call_id.to_string(),
process_id: process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
command: command.to_vec(),
cwd: cwd.clone(),
parsed_cmd: parsed_cmd.to_vec(),
source,
interaction_input,
}),
)
.await;
return;
}
ctx.session
.send_event(
.emit_turn_item_started(
ctx.turn,
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: ctx.call_id.to_string(),
&TurnItem::CommandExecution(CommandExecutionItem {
Comment thread
owenlin0 marked this conversation as resolved.
id: ctx.call_id.to_string(),
process_id: process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
command: command.to_vec(),
cwd: cwd.clone(),
parsed_cmd: parsed_cmd.to_vec(),
source,
interaction_input,
status: CommandExecutionStatus::InProgress,
stdout: None,
stderr: None,
aggregated_output: None,
exit_code: None,
duration: None,
formatted_output: None,
}),
)
.await;
Expand Down Expand Up @@ -542,26 +568,50 @@ async fn emit_exec_end(
exec_input: ExecCommandInput<'_>,
exec_result: ExecCommandResult,
) {
if matches!(exec_input.source, ExecCommandSource::UnifiedExecInteraction) {
ctx.session
.send_event(
ctx.turn,
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: ctx.call_id.to_string(),
process_id: exec_input.process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
command: exec_input.command.to_vec(),
cwd: exec_input.cwd.clone(),
parsed_cmd: exec_input.parsed_cmd.to_vec(),
source: exec_input.source,
interaction_input: exec_input.interaction_input.map(str::to_owned),
stdout: exec_result.stdout,
stderr: exec_result.stderr,
aggregated_output: exec_result.aggregated_output,
exit_code: exec_result.exit_code,
duration: exec_result.duration,
formatted_output: exec_result.formatted_output,
status: exec_result.status,
}),
)
.await;
return;
}
ctx.session
.send_event(
.emit_turn_item_completed(
ctx.turn,
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: ctx.call_id.to_string(),
TurnItem::CommandExecution(CommandExecutionItem {
id: ctx.call_id.to_string(),
process_id: exec_input.process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
command: exec_input.command.to_vec(),
cwd: exec_input.cwd.clone(),
parsed_cmd: exec_input.parsed_cmd.to_vec(),
source: exec_input.source,
interaction_input: exec_input.interaction_input.map(str::to_owned),
stdout: exec_result.stdout,
stderr: exec_result.stderr,
aggregated_output: exec_result.aggregated_output,
exit_code: exec_result.exit_code,
duration: exec_result.duration,
formatted_output: exec_result.formatted_output,
status: exec_result.status,
status: exec_result.status.into(),
stdout: Some(exec_result.stdout),
stderr: Some(exec_result.stderr),
aggregated_output: Some(exec_result.aggregated_output),
exit_code: Some(exec_result.exit_code),
duration: Some(exec_result.duration),
formatted_output: Some(exec_result.formatted_output),
}),
)
.await;
Expand Down
Loading
Loading