Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 135 additions & 29 deletions codex-rs/app-server-protocol/src/protocol/thread_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ impl ThreadHistoryChangeAccumulator {
pub struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_item_index: i64,
current_rollout_index: usize,
next_rollout_index: usize,
active_change_set: Option<ThreadHistoryChangeSet>,
Expand All @@ -245,7 +244,6 @@ impl ThreadHistoryBuilder {
Self {
turns: Vec::new(),
current_turn: None,
next_item_index: 1,
current_rollout_index: 0,
next_rollout_index: 0,
active_change_set: None,
Expand Down Expand Up @@ -1103,12 +1101,18 @@ impl ThreadHistoryBuilder {
self.push_item_in_current_turn(ThreadItem::ContextCompaction { id });
}

fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) {
fn handle_entered_review_mode(
&mut self,
payload: &codex_protocol::protocol::EnteredReviewModeEvent,
) {
let review = payload
.user_facing_hint
.clone()
.unwrap_or_else(|| "Review requested.".to_string());
let id = self.next_item_id();
let id = payload
.item_id
.clone()
.unwrap_or_else(|| self.next_item_id());
self.push_item_in_current_turn(ThreadItem::EnteredReviewMode { id, review });
}

Expand All @@ -1121,7 +1125,10 @@ impl ThreadHistoryBuilder {
.as_ref()
.map(render_review_output_text)
.unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string());
let id = self.next_item_id();
let id = payload
.item_id
.clone()
.unwrap_or_else(|| self.next_item_id());
self.push_item_in_current_turn(ThreadItem::ExitedReviewMode { id, review });
}

Expand Down Expand Up @@ -1264,9 +1271,6 @@ impl ThreadHistoryBuilder {
} else {
self.turns.truncate(self.turns.len().saturating_sub(n));
}

let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum();
self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX);
}

fn finish_current_turn(&mut self) {
Expand All @@ -1279,13 +1283,7 @@ impl ThreadHistoryBuilder {
}

fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
let id = id.unwrap_or_else(|| {
if self.next_rollout_index == 0 {
Uuid::now_v7().to_string()
} else {
format!("rollout-{}", self.current_rollout_index)
}
});
let id = id.unwrap_or_else(|| Uuid::now_v7().to_string());
PendingTurn {
id,
items: Vec::new(),
Expand Down Expand Up @@ -1402,9 +1400,7 @@ impl ThreadHistoryBuilder {
}

fn next_item_id(&mut self) -> String {
let id = format!("item-{}", self.next_item_index);
self.next_item_index += 1;
id
Uuid::now_v7().to_string()
}

fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
Expand Down Expand Up @@ -1586,10 +1582,111 @@ mod tests {
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
use uuid::Uuid;

#[derive(Default)]
struct SyntheticIdNormalizer {
next_turn_index: usize,
next_item_index: usize,
turn_ids: HashMap<String, String>,
item_ids: HashMap<String, String>,
}

impl SyntheticIdNormalizer {
fn normalize_turn_id(&mut self, turn_id: &mut String) {
if !is_uuid_v7(turn_id) {
return;
}

let normalized = self
.turn_ids
.entry(turn_id.clone())
.or_insert_with(|| {
let id = format!("rollout-{}", self.next_turn_index);
self.next_turn_index += 1;
id
})
.clone();
*turn_id = normalized;
}

fn normalize_item_id(&mut self, item_id: &mut String) {
if !is_uuid_v7(item_id) {
return;
}

let normalized = self
.item_ids
.entry(item_id.clone())
.or_insert_with(|| {
self.next_item_index += 1;
format!("item-{}", self.next_item_index)
})
.clone();
*item_id = normalized;
}
}

fn is_uuid_v7(value: &str) -> bool {
Uuid::parse_str(value).is_ok_and(|uuid| uuid.get_version_num() == 7)
}

fn normalize_synthetic_turns(turns: &mut [Turn]) {
let mut normalizer = SyntheticIdNormalizer::default();
for turn in turns {
normalizer.normalize_turn_id(&mut turn.id);
for item in &mut turn.items {
normalizer.normalize_item_id(thread_item_id_mut(item));
}
}
}

fn normalize_synthetic_change_set(changes: &mut ThreadHistoryChangeSet) {
let mut normalizer = SyntheticIdNormalizer::default();
for item_change in &mut changes.changed_items {
normalizer.normalize_turn_id(&mut item_change.turn_id);
normalizer.normalize_item_id(thread_item_id_mut(&mut item_change.item));
}
for turn_change in &mut changes.changed_turns {
normalizer.normalize_turn_id(&mut turn_change.turn_id);
}
for turn_id in &mut changes.removed_turn_ids {
normalizer.normalize_turn_id(turn_id);
}
}

fn thread_item_id_mut(item: &mut ThreadItem) -> &mut String {
match item {
ThreadItem::UserMessage { id, .. }
| ThreadItem::HookPrompt { id, .. }
| ThreadItem::AgentMessage { id, .. }
| ThreadItem::Plan { id, .. }
| ThreadItem::Reasoning { id, .. }
| ThreadItem::CommandExecution { id, .. }
| ThreadItem::FileChange { id, .. }
| ThreadItem::McpToolCall { id, .. }
| ThreadItem::DynamicToolCall { id, .. }
| ThreadItem::CollabAgentToolCall { id, .. }
| ThreadItem::SubAgentActivity { id, .. }
| ThreadItem::WebSearch { id, .. }
| ThreadItem::ImageView { id, .. }
| ThreadItem::Sleep { id, .. }
| ThreadItem::ImageGeneration { id, .. }
| ThreadItem::EnteredReviewMode { id, .. }
| ThreadItem::ExitedReviewMode { id, .. }
| ThreadItem::ContextCompaction { id, .. } => id,
}
}

fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut turns = super::build_turns_from_rollout_items(items);
normalize_synthetic_turns(&mut turns);
turns
}

#[test]
fn builds_multiple_turns_with_reasoning_items() {
let events = vec![
Expand Down Expand Up @@ -1631,11 +1728,14 @@ mod tests {
for event in &events {
builder.handle_event(event);
}
let turns = builder.finish();
let mut turns = builder.finish();
assert!(is_uuid_v7(&turns[0].id));
assert!(is_uuid_v7(turns[0].items[0].id()));
assert!(is_uuid_v7(&turns[1].id));
normalize_synthetic_turns(&mut turns);
assert_eq!(turns.len(), 2);

let first = &turns[0];
assert!(Uuid::parse_str(&first.id).is_ok());
assert_eq!(first.status, TurnStatus::Completed);
assert_eq!(first.items.len(), 3);
assert_eq!(
Expand Down Expand Up @@ -1674,7 +1774,6 @@ mod tests {
);

let second = &turns[1];
assert!(Uuid::parse_str(&second.id).is_ok());
assert_ne!(first.id, second.id);
assert_eq!(second.items.len(), 2);
assert_eq!(
Expand Down Expand Up @@ -2269,7 +2368,7 @@ mod tests {
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "rollout-0");
assert_eq!(turns[1].id, "rollout-5");
assert_eq!(turns[1].id, "rollout-1");
assert_ne!(turns[0].id, turns[1].id);
assert_eq!(turns[0].status, TurnStatus::Completed);
assert_eq!(turns[1].status, TurnStatus::Completed);
Expand Down Expand Up @@ -3100,7 +3199,8 @@ mod tests {
for event in &events {
builder.handle_event(event);
}
let turns = builder.finish();
let mut turns = builder.finish();
normalize_synthetic_turns(&mut turns);
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].id, "turn-a");
assert_eq!(turns[1].id, "turn-b");
Expand Down Expand Up @@ -3158,9 +3258,10 @@ mod tests {
builder.handle_event(event);
}

let snapshot = builder
let mut snapshot = builder
.active_turn_snapshot()
.expect("active turn snapshot");
normalize_synthetic_turns(std::slice::from_mut(&mut snapshot));
assert_eq!(snapshot.id, turn_id);
assert_eq!(snapshot.status, TurnStatus::InProgress);
assert_eq!(
Expand Down Expand Up @@ -3228,9 +3329,10 @@ mod tests {
builder.handle_event(event);
}

let snapshot = builder
let mut snapshot = builder
.active_turn_snapshot()
.expect("active turn snapshot");
normalize_synthetic_turns(std::slice::from_mut(&mut snapshot));
assert_eq!(snapshot.id, turn_id);
assert_eq!(snapshot.status, TurnStatus::InProgress);
assert_eq!(
Expand Down Expand Up @@ -3862,7 +3964,7 @@ mod tests {
fn changed_rollout_item_reports_new_item_snapshot() {
let mut builder = ThreadHistoryBuilder::new();

let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
EventMsg::UserMessage(UserMessageEvent {
client_id: Some("client-message-1".into()),
message: "hello".into(),
Expand All @@ -3872,6 +3974,7 @@ mod tests {
..Default::default()
}),
));
normalize_synthetic_change_set(&mut changes);

assert_eq!(
changes,
Expand Down Expand Up @@ -3909,7 +4012,7 @@ mod tests {
},
)));

let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
EventMsg::WebSearchEnd(WebSearchEndEvent {
call_id: "search-1".into(),
query: "codex".into(),
Expand All @@ -3919,6 +4022,7 @@ mod tests {
},
}),
));
normalize_synthetic_change_set(&mut changes);

assert_eq!(
changes,
Expand Down Expand Up @@ -3949,11 +4053,12 @@ mod tests {
},
)));

let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg(
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text: "raw content".into(),
}),
));
normalize_synthetic_change_set(&mut changes);

assert_eq!(
changes,
Expand Down Expand Up @@ -4041,7 +4146,7 @@ mod tests {
#[test]
fn changed_rollout_items_dedupe_updated_item_snapshots() {
let mut builder = ThreadHistoryBuilder::new();
let changes = builder.handle_rollout_items_with_changes(&[
let mut changes = builder.handle_rollout_items_with_changes(&[
RolloutItem::EventMsg(EventMsg::WebSearchBegin(WebSearchBeginEvent {
call_id: "search-1".into(),
})),
Expand All @@ -4054,6 +4159,7 @@ mod tests {
},
})),
]);
normalize_synthetic_change_set(&mut changes);

assert_eq!(
changes,
Expand Down
20 changes: 14 additions & 6 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,16 +939,20 @@ pub(crate) async fn apply_bespoke_event_handling(
}
EventMsg::ViewImageToolCall(_) => {}
EventMsg::EnteredReviewMode(review_request) => {
let turn_id = review_request
.turn_id
.unwrap_or_else(|| event_turn_id.clone());
let item_id = review_request.item_id.unwrap_or_else(|| turn_id.clone());
let review = review_request
.user_facing_hint
.unwrap_or_else(|| review_prompts::user_facing_hint(&review_request.target));
let item = ThreadItem::EnteredReviewMode {
id: event_turn_id.clone(),
id: item_id,
review,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
turn_id: turn_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
item: item.clone(),
};
Expand All @@ -957,7 +961,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
turn_id,
completed_at_ms: now_unix_timestamp_ms(),
item,
};
Expand Down Expand Up @@ -1048,17 +1052,21 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::ExitedReviewMode(review_event) => {
let turn_id = review_event
.turn_id
.unwrap_or_else(|| event_turn_id.clone());
let item_id = review_event.item_id.unwrap_or_else(|| turn_id.clone());
let review = match review_event.review_output {
Some(output) => render_review_output_text(&output),
None => REVIEW_FALLBACK_MESSAGE.to_string(),
};
let item = ThreadItem::ExitedReviewMode {
id: event_turn_id.clone(),
id: item_id,
review,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
turn_id: turn_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
item: item.clone(),
};
Expand All @@ -1067,7 +1075,7 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
turn_id,
completed_at_ms: now_unix_timestamp_ms(),
item,
};
Expand Down
Loading
Loading