From bacfb43f2754cfaf6a4f45dfffb1df1dcda136b0 Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 26 Jun 2026 12:37:54 -0700 Subject: [PATCH] feat(rollout): persist canonical items for paginated threads --- .../src/protocol/thread_history.rs | 164 ++++++++++++++---- .../app-server/src/bespoke_event_handling.rs | 20 ++- codex-rs/app-server/src/request_processors.rs | 2 +- .../external_agent_session_import.rs | 2 +- codex-rs/core/src/event_mapping.rs | 8 +- codex-rs/core/src/session/mod.rs | 34 ++-- codex-rs/core/src/session/review.rs | 7 +- codex-rs/core/src/session/session.rs | 12 +- codex-rs/core/src/session/tests.rs | 38 +++- codex-rs/core/src/session/turn_context.rs | 9 + codex-rs/core/src/tasks/review.rs | 6 +- codex-rs/core/tests/suite/review.rs | 12 +- codex-rs/protocol/src/protocol.rs | 63 ++++++- codex-rs/rollout/src/list.rs | 55 ++++-- codex-rs/rollout/src/persistence_metrics.rs | 4 +- .../rollout/src/persistence_metrics_tests.rs | 46 ++++- codex-rs/rollout/src/policy.rs | 46 ++++- codex-rs/rollout/src/search.rs | 20 +++ codex-rs/state/src/extract.rs | 70 ++++++-- codex-rs/thread-store/src/in_memory.rs | 8 +- codex-rs/thread-store/src/live_thread.rs | 25 ++- .../thread-store/src/local/live_writer.rs | 6 +- codex-rs/thread-store/src/local/mod.rs | 15 +- .../src/local/update_thread_metadata.rs | 2 + .../thread-store/src/thread_metadata_sync.rs | 95 ++++++++-- codex-rs/thread-store/src/types.rs | 7 +- 26 files changed, 629 insertions(+), 147 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 2d9c6b6a1a48..de1762a1d22e 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -228,7 +228,6 @@ impl ThreadHistoryChangeAccumulator { pub struct ThreadHistoryBuilder { turns: Vec, current_turn: Option, - next_item_index: i64, current_rollout_index: usize, next_rollout_index: usize, active_change_set: Option, @@ -245,7 +244,6 @@ impl ThreadHistoryBuilder { Self { turns: Vec::new(), current_turn: None, - next_item_index: 1, current_rollout_index: 0, next_rollout_index: 0, active_change_set: None, @@ -1103,12 +1101,18 @@ impl ThreadHistoryBuilder { self.push_item_in_current_turn(ThreadItem::ContextCompaction { id }); } - fn handle_entered_review_mode(&mut self, payload: &codex_protocol::protocol::ReviewRequest) { + fn handle_entered_review_mode( + &mut self, + payload: &codex_protocol::protocol::EnteredReviewModeEvent, + ) { let review = payload .user_facing_hint .clone() .unwrap_or_else(|| "Review requested.".to_string()); - let id = self.next_item_id(); + let id = payload + .item_id + .clone() + .unwrap_or_else(|| self.next_item_id()); self.push_item_in_current_turn(ThreadItem::EnteredReviewMode { id, review }); } @@ -1121,7 +1125,10 @@ impl ThreadHistoryBuilder { .as_ref() .map(render_review_output_text) .unwrap_or_else(|| REVIEW_FALLBACK_MESSAGE.to_string()); - let id = self.next_item_id(); + let id = payload + .item_id + .clone() + .unwrap_or_else(|| self.next_item_id()); self.push_item_in_current_turn(ThreadItem::ExitedReviewMode { id, review }); } @@ -1264,9 +1271,6 @@ impl ThreadHistoryBuilder { } else { self.turns.truncate(self.turns.len().saturating_sub(n)); } - - let item_count: usize = self.turns.iter().map(|t| t.items.len()).sum(); - self.next_item_index = i64::try_from(item_count.saturating_add(1)).unwrap_or(i64::MAX); } fn finish_current_turn(&mut self) { @@ -1279,13 +1283,7 @@ impl ThreadHistoryBuilder { } fn new_turn(&mut self, id: Option) -> PendingTurn { - let id = id.unwrap_or_else(|| { - if self.next_rollout_index == 0 { - Uuid::now_v7().to_string() - } else { - format!("rollout-{}", self.current_rollout_index) - } - }); + let id = id.unwrap_or_else(|| Uuid::now_v7().to_string()); PendingTurn { id, items: Vec::new(), @@ -1402,9 +1400,7 @@ impl ThreadHistoryBuilder { } fn next_item_id(&mut self) -> String { - let id = format!("item-{}", self.next_item_index); - self.next_item_index += 1; - id + Uuid::now_v7().to_string() } fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec { @@ -1586,10 +1582,111 @@ mod tests { use codex_utils_absolute_path::test_support::PathBufExt; use codex_utils_absolute_path::test_support::test_path_buf; use pretty_assertions::assert_eq; + use std::collections::HashMap; use std::path::PathBuf; use std::time::Duration; use uuid::Uuid; + #[derive(Default)] + struct SyntheticIdNormalizer { + next_turn_index: usize, + next_item_index: usize, + turn_ids: HashMap, + item_ids: HashMap, + } + + impl SyntheticIdNormalizer { + fn normalize_turn_id(&mut self, turn_id: &mut String) { + if !is_uuid_v7(turn_id) { + return; + } + + let normalized = self + .turn_ids + .entry(turn_id.clone()) + .or_insert_with(|| { + let id = format!("rollout-{}", self.next_turn_index); + self.next_turn_index += 1; + id + }) + .clone(); + *turn_id = normalized; + } + + fn normalize_item_id(&mut self, item_id: &mut String) { + if !is_uuid_v7(item_id) { + return; + } + + let normalized = self + .item_ids + .entry(item_id.clone()) + .or_insert_with(|| { + self.next_item_index += 1; + format!("item-{}", self.next_item_index) + }) + .clone(); + *item_id = normalized; + } + } + + fn is_uuid_v7(value: &str) -> bool { + Uuid::parse_str(value).is_ok_and(|uuid| uuid.get_version_num() == 7) + } + + fn normalize_synthetic_turns(turns: &mut [Turn]) { + let mut normalizer = SyntheticIdNormalizer::default(); + for turn in turns { + normalizer.normalize_turn_id(&mut turn.id); + for item in &mut turn.items { + normalizer.normalize_item_id(thread_item_id_mut(item)); + } + } + } + + fn normalize_synthetic_change_set(changes: &mut ThreadHistoryChangeSet) { + let mut normalizer = SyntheticIdNormalizer::default(); + for item_change in &mut changes.changed_items { + normalizer.normalize_turn_id(&mut item_change.turn_id); + normalizer.normalize_item_id(thread_item_id_mut(&mut item_change.item)); + } + for turn_change in &mut changes.changed_turns { + normalizer.normalize_turn_id(&mut turn_change.turn_id); + } + for turn_id in &mut changes.removed_turn_ids { + normalizer.normalize_turn_id(turn_id); + } + } + + fn thread_item_id_mut(item: &mut ThreadItem) -> &mut String { + match item { + ThreadItem::UserMessage { id, .. } + | ThreadItem::HookPrompt { id, .. } + | ThreadItem::AgentMessage { id, .. } + | ThreadItem::Plan { id, .. } + | ThreadItem::Reasoning { id, .. } + | ThreadItem::CommandExecution { id, .. } + | ThreadItem::FileChange { id, .. } + | ThreadItem::McpToolCall { id, .. } + | ThreadItem::DynamicToolCall { id, .. } + | ThreadItem::CollabAgentToolCall { id, .. } + | ThreadItem::SubAgentActivity { id, .. } + | ThreadItem::WebSearch { id, .. } + | ThreadItem::ImageView { id, .. } + | ThreadItem::Sleep { id, .. } + | ThreadItem::ImageGeneration { id, .. } + | ThreadItem::EnteredReviewMode { id, .. } + | ThreadItem::ExitedReviewMode { id, .. } + | ThreadItem::ContextCompaction { id, .. } => id, + } + } + + fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { + let mut turns = super::build_turns_from_rollout_items(items); + normalize_synthetic_turns(&mut turns); + turns + } + #[test] fn builds_multiple_turns_with_reasoning_items() { let events = vec![ @@ -1631,11 +1728,14 @@ mod tests { for event in &events { builder.handle_event(event); } - let turns = builder.finish(); + let mut turns = builder.finish(); + assert!(is_uuid_v7(&turns[0].id)); + assert!(is_uuid_v7(turns[0].items[0].id())); + assert!(is_uuid_v7(&turns[1].id)); + normalize_synthetic_turns(&mut turns); assert_eq!(turns.len(), 2); let first = &turns[0]; - assert!(Uuid::parse_str(&first.id).is_ok()); assert_eq!(first.status, TurnStatus::Completed); assert_eq!(first.items.len(), 3); assert_eq!( @@ -1674,7 +1774,6 @@ mod tests { ); let second = &turns[1]; - assert!(Uuid::parse_str(&second.id).is_ok()); assert_ne!(first.id, second.id); assert_eq!(second.items.len(), 2); assert_eq!( @@ -2269,7 +2368,7 @@ mod tests { let turns = build_turns_from_rollout_items(&items); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "rollout-0"); - assert_eq!(turns[1].id, "rollout-5"); + assert_eq!(turns[1].id, "rollout-1"); assert_ne!(turns[0].id, turns[1].id); assert_eq!(turns[0].status, TurnStatus::Completed); assert_eq!(turns[1].status, TurnStatus::Completed); @@ -3100,7 +3199,8 @@ mod tests { for event in &events { builder.handle_event(event); } - let turns = builder.finish(); + let mut turns = builder.finish(); + normalize_synthetic_turns(&mut turns); assert_eq!(turns.len(), 2); assert_eq!(turns[0].id, "turn-a"); assert_eq!(turns[1].id, "turn-b"); @@ -3158,9 +3258,10 @@ mod tests { builder.handle_event(event); } - let snapshot = builder + let mut snapshot = builder .active_turn_snapshot() .expect("active turn snapshot"); + normalize_synthetic_turns(std::slice::from_mut(&mut snapshot)); assert_eq!(snapshot.id, turn_id); assert_eq!(snapshot.status, TurnStatus::InProgress); assert_eq!( @@ -3228,9 +3329,10 @@ mod tests { builder.handle_event(event); } - let snapshot = builder + let mut snapshot = builder .active_turn_snapshot() .expect("active turn snapshot"); + normalize_synthetic_turns(std::slice::from_mut(&mut snapshot)); assert_eq!(snapshot.id, turn_id); assert_eq!(snapshot.status, TurnStatus::InProgress); assert_eq!( @@ -3862,7 +3964,7 @@ mod tests { fn changed_rollout_item_reports_new_item_snapshot() { let mut builder = ThreadHistoryBuilder::new(); - let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( + let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( EventMsg::UserMessage(UserMessageEvent { client_id: Some("client-message-1".into()), message: "hello".into(), @@ -3872,6 +3974,7 @@ mod tests { ..Default::default() }), )); + normalize_synthetic_change_set(&mut changes); assert_eq!( changes, @@ -3909,7 +4012,7 @@ mod tests { }, ))); - let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( + let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( EventMsg::WebSearchEnd(WebSearchEndEvent { call_id: "search-1".into(), query: "codex".into(), @@ -3919,6 +4022,7 @@ mod tests { }, }), )); + normalize_synthetic_change_set(&mut changes); assert_eq!( changes, @@ -3949,11 +4053,12 @@ mod tests { }, ))); - let changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( + let mut changes = builder.handle_rollout_item_with_changes(&RolloutItem::EventMsg( EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text: "raw content".into(), }), )); + normalize_synthetic_change_set(&mut changes); assert_eq!( changes, @@ -4041,7 +4146,7 @@ mod tests { #[test] fn changed_rollout_items_dedupe_updated_item_snapshots() { let mut builder = ThreadHistoryBuilder::new(); - let changes = builder.handle_rollout_items_with_changes(&[ + let mut changes = builder.handle_rollout_items_with_changes(&[ RolloutItem::EventMsg(EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id: "search-1".into(), })), @@ -4054,6 +4159,7 @@ mod tests { }, })), ]); + normalize_synthetic_change_set(&mut changes); assert_eq!( changes, diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index a3cb301225e3..99af5dbe8465 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -939,16 +939,20 @@ pub(crate) async fn apply_bespoke_event_handling( } EventMsg::ViewImageToolCall(_) => {} EventMsg::EnteredReviewMode(review_request) => { + let turn_id = review_request + .turn_id + .unwrap_or_else(|| event_turn_id.clone()); + let item_id = review_request.item_id.unwrap_or_else(|| turn_id.clone()); let review = review_request .user_facing_hint .unwrap_or_else(|| review_prompts::user_facing_hint(&review_request.target)); let item = ThreadItem::EnteredReviewMode { - id: event_turn_id.clone(), + id: item_id, review, }; let started = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), + turn_id: turn_id.clone(), started_at_ms: now_unix_timestamp_ms(), item: item.clone(), }; @@ -957,7 +961,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; let completed = ItemCompletedNotification { thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), + turn_id, completed_at_ms: now_unix_timestamp_ms(), item, }; @@ -1048,17 +1052,21 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::ExitedReviewMode(review_event) => { + let turn_id = review_event + .turn_id + .unwrap_or_else(|| event_turn_id.clone()); + let item_id = review_event.item_id.unwrap_or_else(|| turn_id.clone()); let review = match review_event.review_output { Some(output) => render_review_output_text(&output), None => REVIEW_FALLBACK_MESSAGE.to_string(), }; let item = ThreadItem::ExitedReviewMode { - id: event_turn_id.clone(), + id: item_id, review, }; let started = ItemStartedNotification { thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), + turn_id: turn_id.clone(), started_at_ms: now_unix_timestamp_ms(), item: item.clone(), }; @@ -1067,7 +1075,7 @@ pub(crate) async fn apply_bespoke_event_handling( .await; let completed = ItemCompletedNotification { thread_id: conversation_id.to_string(), - turn_id: event_turn_id.clone(), + turn_id, completed_at_ms: now_unix_timestamp_ms(), item, }; diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index cb0f276d8550..72b5062d176d 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -618,7 +618,7 @@ pub(crate) use self::thread_summary::thread_settings_from_core_snapshot; pub(crate) fn build_api_turns_from_rollout_items(items: &[RolloutItem]) -> Vec { let mut builder = ThreadHistoryBuilder::new(); for item in items { - if is_persisted_rollout_item(item) { + if is_persisted_rollout_item(item, codex_protocol::protocol::ThreadHistoryMode::Legacy) { builder.handle_rollout_item(item); } } diff --git a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs index e0a53a9c983e..b58364fe83bb 100644 --- a/codex-rs/app-server/src/request_processors/external_agent_session_import.rs +++ b/codex-rs/app-server/src/request_processors/external_agent_session_import.rs @@ -230,7 +230,7 @@ impl ExternalAgentSessionImporter { memory_mode, }, }; - rollout_items.retain(is_persisted_rollout_item); + rollout_items.retain(|item| is_persisted_rollout_item(item, ThreadHistoryMode::Legacy)); let title = title .as_deref() .and_then(codex_core::util::normalize_thread_name); diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 392203c07990..ea1efc16474f 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -186,7 +186,9 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { }) .collect(); Some(TurnItem::Reasoning(ReasoningItem { - id: id.clone().unwrap_or_default(), + id: id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), summary_text, raw_content, })) @@ -197,7 +199,9 @@ pub fn parse_turn_item(item: &ResponseItem) -> Option { None => (WebSearchAction::Other, String::new()), }; Some(TurnItem::WebSearch(WebSearchItem { - id: id.clone().unwrap_or_default(), + id: id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), query, action, })) diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 03321304a368..0163adfd87de 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -6,7 +6,6 @@ use std::fmt::Debug; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use std::sync::atomic::AtomicU64; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -111,6 +110,7 @@ use codex_protocol::openai_models::ModelPreset; use codex_protocol::permissions::FileSystemSandboxPolicy; use codex_protocol::permissions::NetworkSandboxPolicy; use codex_protocol::protocol::AdditionalContextEntry; +use codex_protocol::protocol::EnteredReviewModeEvent; use codex_protocol::protocol::FileChange; use codex_protocol::protocol::HasLegacyEvent; use codex_protocol::protocol::InterAgentCommunication; @@ -118,7 +118,6 @@ use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::MultiAgentVersion; use codex_protocol::protocol::RawResponseItemEvent; -use codex_protocol::protocol::ReviewRequest; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; @@ -1193,10 +1192,7 @@ impl Session { } fn next_internal_sub_id(&self) -> String { - let id = self - .next_internal_sub_id - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - format!("auto-compact-{id}") + format!("auto-compact-{}", Uuid::now_v7()) } pub(crate) async fn route_realtime_text_input(self: &Arc, text: String) { @@ -1354,7 +1350,7 @@ impl Session { } InitialHistory::Forked(mut rollout_items) => { let turn_context = self.new_default_turn().await; - if turn_context.config.features.enabled(Feature::ItemIds) { + if turn_context.item_ids_enabled() { for rollout_item in &mut rollout_items { if let RolloutItem::ResponseItem(response_item) = rollout_item { Self::assign_missing_response_item_id(response_item); @@ -1917,7 +1913,7 @@ impl Session { } pub(crate) async fn send_event_raw(&self, event: Event) { - // Persist the event into rollout storage (the store filters as needed). + // Persist the event into rollout storage. LiveThread applies the thread's persistence policy. let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; self.services @@ -2725,7 +2721,7 @@ impl Session { for item in items.to_mut() { item.set_turn_id_if_missing(&turn_context.sub_id); } - if turn_context.config.features.enabled(Feature::ItemIds) { + if turn_context.item_ids_enabled() { Self::assign_missing_response_item_ids(items) } else { items @@ -2781,7 +2777,15 @@ impl Session { items: &[ResponseItem], ) { let items = self.prepare_conversation_items_for_history(turn_context, items); - let items = items.as_ref(); + self.record_prepared_conversation_items(turn_context, items.as_ref()) + .await; + } + + async fn record_prepared_conversation_items( + &self, + turn_context: &TurnContext, + items: &[ResponseItem], + ) { { let mut state = self.state.lock().await; state.current_time_reminder.note_recorded_items(items); @@ -2983,7 +2987,7 @@ impl Session { world_state_baseline: Option>, compacted_item: CompactedItem, ) { - let items = if turn_context.config.features.enabled(Feature::ItemIds) { + let items = if turn_context.item_ids_enabled() { Self::assign_missing_response_item_ids(Cow::Owned(items)).into_owned() } else { items @@ -3777,8 +3781,12 @@ impl Session { turn_context: &TurnContext, response_item: ResponseItem, ) { - // Add to conversation history and persist response item to rollout. - self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + let response_items = self.prepare_conversation_items_for_history( + turn_context, + std::slice::from_ref(&response_item), + ); + let response_item = response_items[0].clone(); + self.record_prepared_conversation_items(turn_context, response_items.as_ref()) .await; // Derive a turn item and emit lifecycle events if applicable. diff --git a/codex-rs/core/src/session/review.rs b/codex-rs/core/src/session/review.rs index 1587290d33f0..bebdad4d733e 100644 --- a/codex-rs/core/src/session/review.rs +++ b/codex-rs/core/src/session/review.rs @@ -117,6 +117,7 @@ pub(super) async fn spawn_review_thread( reasoning_effort, reasoning_summary, session_source, + history_mode: parent_turn_context.history_mode, parent_thread_id: parent_turn_context.parent_thread_id, originator: parent_turn_context.originator.clone(), environments: parent_turn_context.environments.clone(), @@ -165,10 +166,12 @@ pub(super) async fn spawn_review_thread( sess.spawn_task(tc.clone(), input, ReviewTask::new()).await; // Announce entering review mode so UIs can switch modes. - let review_request = ReviewRequest { + let review_event = EnteredReviewModeEvent { target: resolved.target, user_facing_hint: Some(resolved.user_facing_hint), + turn_id: Some(tc.sub_id.clone()), + item_id: Some(uuid::Uuid::new_v4().to_string()), }; - sess.send_event(&tc, EventMsg::EnteredReviewMode(review_request)) + sess.send_event(&tc, EventMsg::EnteredReviewMode(review_event)) .await; } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 5e7046b14886..0a02922724d6 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -45,7 +45,6 @@ pub(crate) struct Session { pub(crate) input_queue: InputQueue, pub(crate) guardian_review_session: GuardianReviewSessionManager, pub(crate) services: SessionServices, - pub(super) next_internal_sub_id: AtomicU64, } #[derive(Clone)] @@ -124,6 +123,13 @@ impl SessionConfiguration { &self.codex_home } + pub(crate) fn item_ids_enabled(&self) -> bool { + self.original_config_do_not_use + .features + .enabled(Feature::ItemIds) + || matches!(self.history_mode, ThreadHistoryMode::Paginated) + } + pub(super) fn permission_profile_state(&self) -> &PermissionProfileState { &self.permission_profile_state } @@ -622,6 +628,7 @@ impl Session { InitialHistory::Resumed(resumed_history) => { let params = ResumeThreadParams { thread_id: resumed_history.conversation_id, + history_mode: session_configuration.history_mode, rollout_path: resumed_history.rollout_path.clone(), history: Some(resumed_history.history.clone()), include_archived: true, @@ -1109,7 +1116,7 @@ impl Session { config.features.enabled(Feature::EnableRequestCompression), config.features.enabled(Feature::RuntimeMetrics), Self::build_model_client_beta_features_header(config.as_ref()), - /*item_ids_enabled*/ config.features.enabled(Feature::ItemIds), + /*item_ids_enabled*/ session_configuration.item_ids_enabled(), attestation_provider, ) .with_prompt_cache_key_override( @@ -1143,7 +1150,6 @@ impl Session { input_queue: InputQueue::new(), guardian_review_session: GuardianReviewSessionManager::default(), services, - next_internal_sub_id: AtomicU64::new(0), }); if let Some(network_policy_decider_session) = network_policy_decider_session { let mut guard = network_policy_decider_session.write().await; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index d6336acd57a8..fb0934d60421 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -65,6 +65,7 @@ use codex_protocol::permissions::FileSystemSandboxPolicy; use codex_protocol::permissions::FileSystemSpecialPath; use codex_protocol::protocol::NonSteerableTurnKind; use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::TurnEnvironmentSelections; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; @@ -253,6 +254,41 @@ fn assign_missing_response_item_ids_assigns_additional_tools_ids() { assert!(items[0].id().is_some_and(|id| id.starts_with("at_"))); } +#[tokio::test] +async fn paginated_turn_context_assigns_missing_response_item_ids_without_feature() { + let (session, mut turn_context) = make_session_and_context().await; + turn_context.history_mode = ThreadHistoryMode::Paginated; + let response_item = user_message("hello"); + + let items = session.prepare_conversation_items_for_history( + &turn_context, + std::slice::from_ref(&response_item), + ); + + assert!( + items[0] + .id() + .is_some_and(|item_id| item_id.starts_with("msg_")) + ); +} + +#[tokio::test] +async fn internal_turn_ids_keep_auto_compact_prefix_and_use_uuidv7_suffixes() { + let (session, _turn_context) = make_session_and_context().await; + + let first = session.next_internal_sub_id(); + let second = session.next_internal_sub_id(); + + assert_ne!(first, second); + for id in [first, second] { + let uuid = id + .strip_prefix("auto-compact-") + .and_then(|uuid| Uuid::parse_str(uuid).ok()) + .expect("internal turn id should include a UUID suffix"); + assert_eq!(uuid.get_version_num(), 7); + } +} + fn assistant_message(text: &str) -> ResponseItem { ResponseItem::Message { id: None, @@ -5505,7 +5541,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { input_queue: super::input_queue::InputQueue::new(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), services, - next_internal_sub_id: AtomicU64::new(0), }; (session, turn_context) @@ -7631,7 +7666,6 @@ where input_queue: super::input_queue::InputQueue::new(), guardian_review_session: crate::guardian::GuardianReviewSessionManager::default(), services, - next_internal_sub_id: AtomicU64::new(0), }); (session, turn_context, rx_event) diff --git a/codex-rs/core/src/session/turn_context.rs b/codex-rs/core/src/session/turn_context.rs index 5efbeb8ef396..be60117bee92 100644 --- a/codex-rs/core/src/session/turn_context.rs +++ b/codex-rs/core/src/session/turn_context.rs @@ -10,6 +10,7 @@ use codex_protocol::ThreadId; use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::openai_models::ModelInfo; use codex_protocol::protocol::MultiAgentVersion; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_sandboxing::compatibility_sandbox_policy_for_permission_profile; use codex_sandboxing::policy_transforms::effective_file_system_sandbox_policy; @@ -111,6 +112,7 @@ pub struct TurnContext { pub(crate) reasoning_effort: Option, pub(crate) reasoning_summary: ReasoningSummaryConfig, pub(crate) session_source: SessionSource, + pub(crate) history_mode: ThreadHistoryMode, pub(crate) parent_thread_id: Option, pub(crate) originator: String, pub(crate) environments: TurnEnvironmentSnapshot, @@ -149,6 +151,11 @@ enum TurnMultiAgentRuntime { } impl TurnContext { + pub(crate) fn item_ids_enabled(&self) -> bool { + self.config.features.enabled(Feature::ItemIds) + || matches!(self.history_mode, ThreadHistoryMode::Paginated) + } + pub(crate) fn permission_profile(&self) -> PermissionProfile { self.permission_profile.clone() } @@ -262,6 +269,7 @@ impl TurnContext { reasoning_effort, reasoning_summary: self.reasoning_summary, session_source: self.session_source.clone(), + history_mode: self.history_mode, parent_thread_id: self.parent_thread_id, originator: self.originator.clone(), environments: self.environments.clone(), @@ -540,6 +548,7 @@ impl Session { reasoning_effort, reasoning_summary, session_source, + history_mode: session_configuration.history_mode, parent_thread_id: session_configuration.parent_thread_id, originator: session_configuration.originator.clone(), environments, diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 19ac0fdc35e5..99597e594ff7 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -256,7 +256,11 @@ pub(crate) async fn exit_review_mode( session .send_event( ctx.as_ref(), - EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }), + EventMsg::ExitedReviewMode(ExitedReviewModeEvent { + turn_id: Some(ctx.sub_id.clone()), + item_id: Some(uuid::Uuid::new_v4().to_string()), + review_output, + }), ) .await; session diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index af44baf6ca56..6d2c33416caf 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -422,7 +422,8 @@ async fn review_uses_custom_review_model_from_config() { matches!( ev, EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: None + review_output: None, + .. }) ) }) @@ -471,7 +472,8 @@ async fn review_uses_session_model_when_review_model_unset() { matches!( ev, EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: None + review_output: None, + .. }) ) }) @@ -587,7 +589,8 @@ async fn review_input_isolated_from_parent_history() { matches!( ev, EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: None + review_output: None, + .. }) ) }) @@ -698,7 +701,8 @@ async fn review_history_surfaces_in_parent_session() { matches!( ev, EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: Some(_) + review_output: Some(_), + .. }) ) }) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index df331e7e03d9..cdaf1052402b 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1425,7 +1425,7 @@ pub enum EventMsg { ShutdownComplete, /// Entered review mode. - EnteredReviewMode(ReviewRequest), + EnteredReviewMode(EnteredReviewModeEvent), /// Exited review mode with an optional final result to apply. ExitedReviewMode(ExitedReviewModeEvent), @@ -1849,8 +1849,28 @@ pub struct ReasoningRawContentDeltaEvent { pub content_index: i64, } +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] +pub struct EnteredReviewModeEvent { + pub target: ReviewTarget, + #[serde(skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub user_facing_hint: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub turn_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub item_id: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)] pub struct ExitedReviewModeEvent { + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub turn_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub item_id: Option, pub review_output: Option, } @@ -5453,6 +5473,47 @@ mod tests { assert_eq!(event.completed_at_ms, 0); } + #[test] + fn review_mode_events_deserialize_legacy_payloads_and_serialize_persisted_ids() { + let entered = serde_json::from_value::(json!({ + "target": { + "type": "custom", + "instructions": "review this" + }, + "user_facing_hint": "hint" + })) + .unwrap(); + assert_eq!(entered.turn_id, None); + assert_eq!(entered.item_id, None); + + let exited = serde_json::from_value::(json!({ + "review_output": null + })) + .unwrap(); + assert_eq!(exited.turn_id, None); + assert_eq!(exited.item_id, None); + + let entered = EnteredReviewModeEvent { + turn_id: Some("turn-1".into()), + item_id: Some("item-1".into()), + ..entered + }; + let exited = ExitedReviewModeEvent { + turn_id: Some("turn-1".into()), + item_id: Some("item-2".into()), + ..exited + }; + + assert_eq!( + serde_json::to_value(entered).unwrap()["item_id"], + json!("item-1") + ); + assert_eq!( + serde_json::to_value(exited).unwrap()["item_id"], + json!("item-2") + ); + } + #[test] fn rollback_failed_error_does_not_affect_turn_status() { let event = ErrorEvent { diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index 1154d36eeecc..d7a141355771 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -22,6 +22,7 @@ use crate::protocol::EventMsg; use crate::state_db; use codex_file_search as file_search; use codex_protocol::ThreadId; +use codex_protocol::items::TurnItem; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::RolloutLine; use codex_protocol::protocol::SessionMetaLine; @@ -1187,9 +1188,7 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result &str { fn event_msg_preview(event: &EventMsg) -> Option { match event { - EventMsg::UserMessage(user) => { - let message = strip_user_message_prefix(user.message.as_str()); - if !message.is_empty() { - return Some(message.to_string()); - } - if user - .images - .as_ref() - .is_some_and(|images| !images.is_empty()) - || !user.local_images.is_empty() - { - return Some("[Image]".to_string()); + EventMsg::UserMessage(user) => user_message_preview(user), + EventMsg::ItemCompleted(event) => match &event.item { + TurnItem::UserMessage(user) => { + let EventMsg::UserMessage(user) = user.as_legacy_event() else { + unreachable!("user message items always convert to user message events"); + }; + user_message_preview(&user) } - None - } + _ => None, + }, EventMsg::ThreadGoalUpdated(event) => { let objective = event.goal.objective.trim(); (!objective.is_empty()).then(|| objective.to_string()) @@ -1282,6 +1276,33 @@ fn event_msg_preview(event: &EventMsg) -> Option { } } +fn is_user_message_event(event: &EventMsg) -> bool { + matches!( + event, + EventMsg::UserMessage(_) + | EventMsg::ItemCompleted(codex_protocol::protocol::ItemCompletedEvent { + item: TurnItem::UserMessage(_), + .. + }) + ) +} + +fn user_message_preview(user: &codex_protocol::protocol::UserMessageEvent) -> Option { + let message = strip_user_message_prefix(user.message.as_str()); + if !message.is_empty() { + return Some(message.to_string()); + } + if user + .images + .as_ref() + .is_some_and(|images| !images.is_empty()) + || !user.local_images.is_empty() + { + return Some("[Image]".to_string()); + } + None +} + /// Read the SessionMetaLine from the head of a rollout file for reuse by /// callers that need the session metadata (e.g. to derive a cwd for config). pub async fn read_session_meta_line(path: &Path) -> io::Result { diff --git a/codex-rs/rollout/src/persistence_metrics.rs b/codex-rs/rollout/src/persistence_metrics.rs index 8c2a7e18d238..f0e11e5052e0 100644 --- a/codex-rs/rollout/src/persistence_metrics.rs +++ b/codex-rs/rollout/src/persistence_metrics.rs @@ -8,6 +8,7 @@ use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::ThreadHistoryMode; use crate::policy::is_persisted_rollout_item; @@ -95,6 +96,7 @@ struct TurnMeasurementUpdate { /// Measures logical JSON sizes while applying the shared rollout persistence policy once. pub fn measure_and_filter_rollout_items( items: &[RolloutItem], + history_mode: ThreadHistoryMode, ) -> (Vec, RolloutPersistenceBatchMeasurement) { let mut persisted = Vec::new(); let mut measurement = RolloutPersistenceBatchMeasurement { @@ -103,7 +105,7 @@ pub fn measure_and_filter_rollout_items( }; for item in items { - let kept = is_persisted_rollout_item(item); + let kept = is_persisted_rollout_item(item, history_mode); let decision = if kept { PersistenceDecision::Kept } else { diff --git a/codex-rs/rollout/src/persistence_metrics_tests.rs b/codex-rs/rollout/src/persistence_metrics_tests.rs index 963805f43dae..6cd7581518df 100644 --- a/codex-rs/rollout/src/persistence_metrics_tests.rs +++ b/codex-rs/rollout/src/persistence_metrics_tests.rs @@ -6,6 +6,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; @@ -65,7 +66,7 @@ fn update_for_batch( state: &mut TurnMeasurementState, items: &[RolloutItem], ) -> super::TurnMeasurementUpdate { - let (_, measurement) = measure_and_filter_rollout_items(items); + let (_, measurement) = measure_and_filter_rollout_items(items, ThreadHistoryMode::Legacy); update_turn_measurements(state, items, &measurement) } @@ -100,7 +101,8 @@ fn mixed_batch_reports_exact_policy_counts_and_bytes() { let dropped = RolloutItem::ResponseItem(ResponseItem::Other); let items = vec![kept.clone(), dropped.clone()]; - let (persisted, measurement) = measure_and_filter_rollout_items(&items); + let (persisted, measurement) = + measure_and_filter_rollout_items(&items, ThreadHistoryMode::Legacy); let kept_bytes = serde_json::to_vec(&kept) .expect("serialize kept item") .len() as u64; @@ -128,7 +130,8 @@ fn mixed_batch_reports_exact_policy_counts_and_bytes() { #[test] fn retained_items_are_byte_identical() { let item = retained_message("a moderately sized payload"); - let (persisted, measurement) = measure_and_filter_rollout_items(std::slice::from_ref(&item)); + let (persisted, measurement) = + measure_and_filter_rollout_items(std::slice::from_ref(&item), ThreadHistoryMode::Legacy); assert_eq!( serde_json::to_vec(&persisted[0]).expect("serialize persisted item"), @@ -155,8 +158,10 @@ fn turn_measurements_span_batches_and_include_items_before_start() { retained_message("second response"), turn_aborted("turn-2"), ]; - let (_, first_expected) = measure_and_filter_rollout_items(&first_turn); - let (_, second_expected) = measure_and_filter_rollout_items(&second_turn); + let (_, first_expected) = + measure_and_filter_rollout_items(&first_turn, ThreadHistoryMode::Legacy); + let (_, second_expected) = + measure_and_filter_rollout_items(&second_turn, ThreadHistoryMode::Legacy); let batches = [ first_turn[..1].to_vec(), first_turn[1..3].to_vec(), @@ -217,7 +222,8 @@ fn invalid_turn_boundaries_reset_partial_measurements() { retained_message("retained turn"), turn_complete("turn-2"), ]; - let (_, expected) = measure_and_filter_rollout_items(&replacement[2..]); + let (_, expected) = + measure_and_filter_rollout_items(&replacement[2..], ThreadHistoryMode::Legacy); let update = update_for_batch(&mut state, &replacement); assert_eq!( @@ -247,7 +253,7 @@ fn filtered_item_completion_includes_its_nested_item_type() { completed_at_ms: 0, })); - let (_, measurement) = measure_and_filter_rollout_items(&[item]); + let (_, measurement) = measure_and_filter_rollout_items(&[item], ThreadHistoryMode::Legacy); assert_eq!( measurement.items[0].rollout_item_type, @@ -258,3 +264,29 @@ fn filtered_item_completion_includes_its_nested_item_type() { super::PersistenceDecision::Dropped ); } + +#[test] +fn paginated_item_completion_is_persisted() { + let item = RolloutItem::EventMsg(EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: ThreadId::default(), + turn_id: "turn".to_string(), + item: TurnItem::UserMessage(UserMessageItem { + id: "item".to_string(), + client_id: None, + content: Vec::new(), + }), + completed_at_ms: 0, + })); + + let (persisted, measurement) = + measure_and_filter_rollout_items(std::slice::from_ref(&item), ThreadHistoryMode::Paginated); + + assert_eq!( + serde_json::to_value(persisted).expect("serialize persisted items"), + serde_json::to_value([item]).expect("serialize expected items") + ); + assert_eq!( + measurement.items[0].decision, + super::PersistenceDecision::Kept + ); +} diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 2390a0fc61a1..9cea8ceda1a7 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -1,14 +1,15 @@ use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::ThreadHistoryMode; /// Whether a rollout `item` should be persisted in rollout files. -pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool { +pub fn is_persisted_rollout_item(item: &RolloutItem, history_mode: ThreadHistoryMode) -> bool { match item { RolloutItem::ResponseItem(item) => should_persist_response_item(item), RolloutItem::InterAgentCommunication(_) | RolloutItem::InterAgentCommunicationMetadata { .. } => true, - RolloutItem::EventMsg(ev) => should_persist_event_msg(ev), + RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, history_mode), // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) @@ -18,10 +19,13 @@ pub fn is_persisted_rollout_item(item: &RolloutItem) -> bool { } /// Return the canonical rollout items that should be persisted for a live append. -pub fn persisted_rollout_items(items: &[RolloutItem]) -> Vec { +pub fn persisted_rollout_items( + items: &[RolloutItem], + history_mode: ThreadHistoryMode, +) -> Vec { let mut persisted = Vec::new(); for item in items { - if is_persisted_rollout_item(item) { + if is_persisted_rollout_item(item, history_mode) { persisted.push(item.clone()); } } @@ -78,7 +82,14 @@ pub fn should_persist_response_item_for_memories(item: &ResponseItem) -> bool { /// Whether an `EventMsg` should be persisted in rollout files. #[inline] -pub fn should_persist_event_msg(ev: &EventMsg) -> bool { +pub fn should_persist_event_msg(ev: &EventMsg, history_mode: ThreadHistoryMode) -> bool { + match history_mode { + ThreadHistoryMode::Legacy => should_persist_legacy_event_msg(ev), + ThreadHistoryMode::Paginated => should_persist_paginated_event_msg(ev), + } +} + +fn should_persist_legacy_event_msg(ev: &EventMsg) -> bool { match ev { EventMsg::UserMessage(_) | EventMsg::AgentMessage(_) @@ -168,3 +179,28 @@ pub fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::CollabResumeBegin(_) => false, } } + +fn should_persist_paginated_event_msg(ev: &EventMsg) -> bool { + match ev { + EventMsg::ItemCompleted(_) => true, + EventMsg::TokenCount(_) + | EventMsg::ThreadGoalUpdated(_) + | EventMsg::EnteredReviewMode(_) + | EventMsg::ExitedReviewMode(_) + | EventMsg::ThreadRolledBack(_) + | EventMsg::TurnAborted(_) + | EventMsg::TurnStarted(_) + | EventMsg::TurnComplete(_) => true, + EventMsg::UserMessage(_) + | EventMsg::AgentMessage(_) + | EventMsg::AgentReasoning(_) + | EventMsg::AgentReasoningRawContent(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::ContextCompacted(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ImageGenerationEnd(_) + | EventMsg::SubAgentActivity(_) => false, + _ => should_persist_legacy_event_msg(ev), + } +} diff --git a/codex-rs/rollout/src/search.rs b/codex-rs/rollout/src/search.rs index 58b46887a98f..7d3e41425d59 100644 --- a/codex-rs/rollout/src/search.rs +++ b/codex-rs/rollout/src/search.rs @@ -4,6 +4,7 @@ use std::io; use std::path::Path; use std::path::PathBuf; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; @@ -267,6 +268,25 @@ fn conversation_text_from_item(item: &RolloutItem) -> Option { Some(agent.message.trim().to_string()) } } + RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => match &event.item { + TurnItem::UserMessage(user) => { + let message = user.message(); + let text = strip_user_message_prefix(message.as_str()); + (!text.is_empty()).then(|| text.to_string()) + } + TurnItem::AgentMessage(agent) => { + let text = agent + .content + .iter() + .map(|content| match content { + codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), + }) + .collect::>() + .join(" "); + (!text.trim().is_empty()).then(|| text.trim().to_string()) + } + _ => None, + }, RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => { let text = content .iter() diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 48e308f627e1..5ed4202b6e6b 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -1,4 +1,6 @@ use crate::model::ThreadMetadata; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -39,6 +41,11 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { RolloutItem::EventMsg( EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadGoalUpdated(_), ) => true, + RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) + if matches!(event.item, TurnItem::UserMessage(_)) => + { + true + } RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::InterAgentCommunication(_) @@ -96,16 +103,11 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) { } } EventMsg::UserMessage(user) => { - let preview = user_message_preview(user); - if metadata.first_user_message.is_none() { - metadata.first_user_message = preview.clone(); - } - set_preview_if_empty(metadata, preview); - if metadata.title.is_empty() { - let title = strip_user_message_prefix(user.message.as_str()); - if !title.is_empty() { - metadata.title = title.to_string(); - } + apply_user_message(metadata, user); + } + EventMsg::ItemCompleted(event) => { + if let TurnItem::UserMessage(user) = &event.item { + apply_user_message_item(metadata, user); } } EventMsg::ThreadGoalUpdated(event) => { @@ -120,6 +122,27 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) { fn apply_response_item(_metadata: &mut ThreadMetadata, _item: &ResponseItem) {} +fn apply_user_message_item(metadata: &mut ThreadMetadata, user: &UserMessageItem) { + let EventMsg::UserMessage(user) = user.as_legacy_event() else { + unreachable!("user message items always convert to user message events"); + }; + apply_user_message(metadata, &user); +} + +fn apply_user_message(metadata: &mut ThreadMetadata, user: &UserMessageEvent) { + let preview = user_message_preview(user); + if metadata.first_user_message.is_none() { + metadata.first_user_message = preview.clone(); + } + set_preview_if_empty(metadata, preview); + if metadata.title.is_empty() { + let title = strip_user_message_prefix(user.message.as_str()); + if !title.is_empty() { + metadata.title = title.to_string(); + } + } +} + fn set_preview_if_empty(metadata: &mut ThreadMetadata, preview: Option) { if metadata.preview.is_none() { metadata.preview = preview; @@ -164,12 +187,15 @@ mod tests { use chrono::DateTime; use chrono::Utc; use codex_protocol::ThreadId; + use codex_protocol::items::TurnItem; + use codex_protocol::items::UserMessageItem; use codex_protocol::models::ContentItem; use codex_protocol::models::PermissionProfile; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::EventMsg; + use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionMeta; @@ -182,6 +208,7 @@ mod tests { use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::protocol::UserMessageEvent; + use codex_protocol::user_input::UserInput; use pretty_assertions::assert_eq; use std::path::PathBuf; @@ -229,6 +256,29 @@ mod tests { assert_eq!(metadata.title, "actual user request"); } + #[test] + fn completed_user_message_items_set_title_and_first_user_message() { + let mut metadata = metadata_for_test(); + let item = RolloutItem::EventMsg(EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id: ThreadId::default(), + turn_id: "turn-1".to_string(), + item: TurnItem::UserMessage(UserMessageItem::new(&[UserInput::Text { + text: format!("{USER_MESSAGE_BEGIN} actual user request"), + text_elements: Vec::new(), + }])), + completed_at_ms: 0, + })); + + apply_rollout_item(&mut metadata, &item, "test-provider"); + + assert_eq!( + metadata.first_user_message.as_deref(), + Some("actual user request") + ); + assert_eq!(metadata.preview.as_deref(), Some("actual user request")); + assert_eq!(metadata.title, "actual user request"); + } + #[test] fn event_msg_image_only_user_message_sets_image_placeholder_preview() { let mut metadata = metadata_for_test(); diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index cb3f198ff3c5..e050e6d40115 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -16,7 +16,6 @@ use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; -use codex_rollout::persisted_rollout_items; use crate::AppendThreadItemsParams; use crate::ArchiveThreadParams; @@ -211,6 +210,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path.clone()), history: None, include_archived: false, @@ -290,6 +290,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Paginated, rollout_path: None, history: None, include_archived: false, @@ -482,8 +483,7 @@ impl InMemoryThreadStore { } async fn append_items(&self, params: AppendThreadItemsParams) -> ThreadStoreResult<()> { - let canonical_items = persisted_rollout_items(params.items.as_slice()); - if canonical_items.is_empty() { + if params.items.is_empty() { return Ok(()); } let mut state = self.state.lock().await; @@ -492,7 +492,7 @@ impl InMemoryThreadStore { .histories .entry(params.thread_id) .or_default() - .extend(canonical_items); + .extend(params.items); Ok(()) } diff --git a/codex-rs/thread-store/src/live_thread.rs b/codex-rs/thread-store/src/live_thread.rs index 793bbf99655d..55bffe8956f3 100644 --- a/codex-rs/thread-store/src/live_thread.rs +++ b/codex-rs/thread-store/src/live_thread.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use codex_protocol::ThreadId; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_rollout::RolloutPersistenceTelemetry; use codex_rollout::measure_and_filter_rollout_items; @@ -32,6 +33,7 @@ use crate::thread_metadata_sync::ThreadMetadataSync; #[derive(Clone)] pub struct LiveThread { thread_id: ThreadId, + history_mode: ThreadHistoryMode, thread_store: Arc, metadata_sync: Arc>, persistence_telemetry: RolloutPersistenceTelemetry, @@ -92,10 +94,12 @@ impl LiveThread { params: CreateThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; + let history_mode = params.history_mode; let metadata_sync = ThreadMetadataSync::for_create(¶ms).await; thread_store.create_thread(params).await?; Ok(Self { thread_id, + history_mode, thread_store, metadata_sync: Arc::new(Mutex::new(metadata_sync)), persistence_telemetry: RolloutPersistenceTelemetry::new(thread_id), @@ -107,6 +111,7 @@ impl LiveThread { params: ResumeThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; + let history_mode = params.history_mode; let should_load_history = params.history.is_none(); let include_archived = params.include_archived; let mut metadata_sync = ThreadMetadataSync::for_resume(¶ms); @@ -132,6 +137,7 @@ impl LiveThread { } Ok(Self { thread_id, + history_mode, thread_store, metadata_sync: Arc::new(Mutex::new(metadata_sync)), persistence_telemetry: RolloutPersistenceTelemetry::new(thread_id), @@ -149,17 +155,20 @@ impl LiveThread { return Ok(()); } let (canonical_items, measurement) = if self.persistence_telemetry.is_enabled() { - let (canonical_items, measurement) = measure_and_filter_rollout_items(items); + let (canonical_items, measurement) = + measure_and_filter_rollout_items(items, self.history_mode); (canonical_items, Some(measurement)) } else { - (persisted_rollout_items(items), None) + (persisted_rollout_items(items, self.history_mode), None) }; - self.thread_store - .append_items(AppendThreadItemsParams { - thread_id: self.thread_id, - items: items.to_vec(), - }) - .await?; + if !canonical_items.is_empty() { + self.thread_store + .append_items(AppendThreadItemsParams { + thread_id: self.thread_id, + items: canonical_items.clone(), + }) + .await?; + } if let Some(measurement) = measurement.as_ref() { self.persistence_telemetry.record_batch(items, measurement); } diff --git a/codex-rs/thread-store/src/local/live_writer.rs b/codex-rs/thread-store/src/local/live_writer.rs index 74e707bb5835..9ea8f61b6073 100644 --- a/codex-rs/thread-store/src/local/live_writer.rs +++ b/codex-rs/thread-store/src/local/live_writer.rs @@ -5,7 +5,6 @@ use codex_protocol::protocol::ThreadMemoryMode; use codex_rollout::RolloutConfig; use codex_rollout::RolloutRecorder; use codex_rollout::RolloutRecorderParams; -use codex_rollout::persisted_rollout_items; use tracing::warn; use super::LocalThreadStore; @@ -115,13 +114,12 @@ pub(super) async fn append_items( store: &LocalThreadStore, params: AppendThreadItemsParams, ) -> ThreadStoreResult<()> { - let canonical_items = persisted_rollout_items(params.items.as_slice()); - if canonical_items.is_empty() { + if params.items.is_empty() { return Ok(()); } let recorder = store.live_recorder(params.thread_id).await?; recorder - .record_canonical_items(canonical_items.as_slice()) + .record_canonical_items(params.items.as_slice()) .await .map_err(thread_store_io_error)?; // LiveThread applies metadata immediately after append_items returns. Wait for the local diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 5939ea606e22..e9c78972908d 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -399,8 +399,8 @@ mod tests { } #[tokio::test] - async fn raw_append_items_does_not_update_sqlite_metadata() { - // This pins the ThreadStore contract: raw appends are history-only. Callers that need + async fn direct_append_items_does_not_update_sqlite_metadata() { + // This pins the ThreadStore contract: direct appends are history-only. Callers that need // metadata updates must use LiveThread or call update_thread_metadata explicitly. let home = TempDir::new().expect("temp dir"); let config = test_config(home.path()); @@ -423,7 +423,7 @@ mod tests { items: vec![user_message_item("raw append")], }) .await - .expect("append raw item"); + .expect("append canonical item"); store.flush_thread(thread_id).await.expect("flush thread"); assert_eq!( @@ -696,6 +696,7 @@ mod tests { store, ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: false, @@ -750,6 +751,7 @@ mod tests { store, ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: false, @@ -879,6 +881,7 @@ mod tests { resumed_store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: None, history: None, include_archived: true, @@ -939,6 +942,7 @@ mod tests { let err = store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: true, @@ -961,6 +965,7 @@ mod tests { let err = store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: true, @@ -990,6 +995,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: true, @@ -1038,6 +1044,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path.clone()), history: None, include_archived: true, @@ -1076,6 +1083,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(rollout_path), history: None, include_archived: true, @@ -1247,6 +1255,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Paginated, rollout_path: Some(rollout_path), history: None, include_archived: false, diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 580df1dc612c..0d579b92725e 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -937,6 +937,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(path.clone()), history: None, include_archived: true, @@ -1785,6 +1786,7 @@ mod tests { store .resume_thread(ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: Some(archived_path.clone()), history: None, include_archived: true, diff --git a/codex-rs/thread-store/src/thread_metadata_sync.rs b/codex-rs/thread-store/src/thread_metadata_sync.rs index 1057715a4a58..5880d8e6661a 100644 --- a/codex-rs/thread-store/src/thread_metadata_sync.rs +++ b/codex-rs/thread-store/src/thread_metadata_sync.rs @@ -9,12 +9,20 @@ use chrono::Utc; use codex_git_utils::collect_git_info; use codex_git_utils::get_git_repo_root; use codex_protocol::ThreadId; +use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GitInfo; +#[cfg(test)] +use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::RolloutItem; +#[cfg(test)] +use codex_protocol::protocol::ThreadHistoryMode; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::protocol::UserMessageEvent; +#[cfg(test)] +use codex_protocol::user_input::UserInput; use crate::CreateThreadParams; use crate::GitInfoPatch; @@ -29,9 +37,9 @@ const THREAD_UPDATED_AT_TOUCH_INTERVAL: Duration = Duration::from_millis(50); /// Live-thread helper that derives metadata updates from canonical rollout items. /// -/// Stores receive raw history plus explicit metadata patches. This helper keeps append-derived -/// metadata observation in the live layer without owning persistence-policy filtering or making -/// `append_items` infer metadata inside a `ThreadStore` implementation. +/// Stores receive canonical durable rollout items plus explicit metadata patches. This helper +/// keeps append-derived metadata observation in the live layer without owning persistence-policy +/// filtering or making `append_items` infer metadata inside a `ThreadStore` implementation. pub(crate) struct ThreadMetadataSync { thread_id: ThreadId, cwd_seen: bool, @@ -249,22 +257,11 @@ impl ThreadMetadataSync { update.permission_profile = Some(turn_ctx.permission_profile()); } RolloutItem::EventMsg(EventMsg::UserMessage(user)) => { - if let Some(preview) = user_message_preview(user) { - if !self.first_user_message_seen { - self.first_user_message_seen = true; - update.first_user_message = Some(preview.clone()); - } - if !self.preview_seen { - self.preview_seen = true; - update.preview = Some(preview); - } - } - if !self.title_seen { - let title = strip_user_message_prefix(user.message.as_str()); - if !title.is_empty() { - self.title_seen = true; - update.title = Some(title.to_string()); - } + self.observe_user_message(user, &mut update); + } + RolloutItem::EventMsg(EventMsg::ItemCompleted(event)) => { + if let TurnItem::UserMessage(user) = &event.item { + self.observe_user_message_item(user, &mut update); } } RolloutItem::EventMsg(EventMsg::TokenCount(token_count)) => { @@ -346,6 +343,39 @@ fn user_message_preview(user: &UserMessageEvent) -> Option { None } +impl ThreadMetadataSync { + fn observe_user_message_item( + &mut self, + user: &UserMessageItem, + update: &mut ThreadMetadataPatch, + ) { + let EventMsg::UserMessage(user) = user.as_legacy_event() else { + unreachable!("user message items always convert to user message events"); + }; + self.observe_user_message(&user, update); + } + + fn observe_user_message(&mut self, user: &UserMessageEvent, update: &mut ThreadMetadataPatch) { + if let Some(preview) = user_message_preview(user) { + if !self.first_user_message_seen { + self.first_user_message_seen = true; + update.first_user_message = Some(preview.clone()); + } + if !self.preview_seen { + self.preview_seen = true; + update.preview = Some(preview); + } + } + if !self.title_seen { + let title = strip_user_message_prefix(user.message.as_str()); + if !title.is_empty() { + self.title_seen = true; + update.title = Some(title.to_string()); + } + } + } +} + fn thread_updated_at_touch() -> ThreadMetadataPatch { ThreadMetadataPatch { updated_at: Some(Utc::now()), @@ -484,6 +514,32 @@ mod tests { assert!(update.patch.updated_at.is_some()); } + #[test] + fn completed_user_message_items_emit_metadata_fields() { + let thread_id = ThreadId::new(); + let mut sync = ThreadMetadataSync::for_resume(&resume_params(thread_id, Vec::new())); + let update = sync + .observe_appended_items(&[RolloutItem::EventMsg(EventMsg::ItemCompleted( + ItemCompletedEvent { + thread_id, + turn_id: "turn-1".to_string(), + item: TurnItem::UserMessage(UserMessageItem::new(&[UserInput::Text { + text: "first user text".to_string(), + text_elements: Vec::new(), + }])), + completed_at_ms: 0, + }, + ))]) + .expect("completed user message metadata update"); + + assert_eq!(update.patch.preview.as_deref(), Some("first user text")); + assert_eq!(update.patch.title.as_deref(), Some("first user text")); + assert_eq!( + update.patch.first_user_message.as_deref(), + Some("first user text") + ); + } + #[test] fn metadata_irrelevant_items_coalesce_updated_at_touches() { let thread_id = ThreadId::new(); @@ -562,6 +618,7 @@ mod tests { fn resume_params(thread_id: ThreadId, history: Vec) -> ResumeThreadParams { ResumeThreadParams { thread_id, + history_mode: ThreadHistoryMode::Legacy, rollout_path: None, history: Some(Arc::new(history)), include_archived: false, diff --git a/codex-rs/thread-store/src/types.rs b/codex-rs/thread-store/src/types.rs index 4e688eca6f1f..eabc5122c26b 100644 --- a/codex-rs/thread-store/src/types.rs +++ b/codex-rs/thread-store/src/types.rs @@ -105,6 +105,8 @@ pub struct CreateThreadParams { pub struct ResumeThreadParams { /// Existing thread id whose future items should be appended. pub thread_id: ThreadId, + /// Persisted thread history contract selected when the thread was created. + pub history_mode: ThreadHistoryMode, /// Known local rollout path when the caller resumed from a specific file. pub rollout_path: Option, /// Known replay history for the resumed thread, if already loaded by the caller. @@ -134,10 +136,7 @@ pub(crate) fn canonical_history_mode_from_rollout_items( pub struct AppendThreadItemsParams { /// Thread id to append to. pub thread_id: ThreadId, - /// Raw rollout items to append in order. - /// - /// Store implementations are responsible for applying the shared rollout persistence policy - /// before writing durable replay history or any implementation-owned projections. + /// Canonical durable rollout items after `LiveThread` applies the shared persistence policy. pub items: Vec, }