From c4fe2bdf11e9d21d7eab043683b8f495bb7b0bfe Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Fri, 26 Jun 2026 14:29:59 -0700 Subject: [PATCH 1/2] [core] Support durable external thread goals --- codex-rs/app-server/src/in_process.rs | 227 ++++++++++++++++++ codex-rs/app-server/src/message_processor.rs | 4 + .../thread_goal_processor.rs | 168 ++++++++++++- codex-rs/core/src/session/mod.rs | 1 - codex-rs/core/src/session/session.rs | 6 +- codex-rs/state/src/runtime/goals.rs | 108 +++++++++ codex-rs/thread-store/src/in_memory.rs | 75 +++++- codex-rs/thread-store/src/local/mod.rs | 4 + codex-rs/thread-store/src/store.rs | 51 ++++ 9 files changed, 636 insertions(+), 8 deletions(-) diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 60ad70569a8a..00442226b8ce 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -810,6 +810,13 @@ mod tests { use codex_app_server_protocol::ConfigRequirementsReadResponse; use codex_app_server_protocol::ExternalAgentConfigImportCompletedNotification; use codex_app_server_protocol::SessionSource as ApiSessionSource; + use codex_app_server_protocol::ThreadGoal; + use codex_app_server_protocol::ThreadGoalClearParams; + use codex_app_server_protocol::ThreadGoalClearResponse; + use codex_app_server_protocol::ThreadGoalGetParams; + use codex_app_server_protocol::ThreadGoalGetResponse; + use codex_app_server_protocol::ThreadGoalSetParams; + use codex_app_server_protocol::ThreadGoalSetResponse; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; @@ -818,7 +825,9 @@ mod tests { use codex_app_server_protocol::TurnItemsView; use codex_app_server_protocol::TurnStatus; use codex_core::config::ConfigBuilder; + use codex_state::StateRuntime; use codex_thread_store::InMemoryThreadStore; + use codex_thread_store::ThreadStore; use pretty_assertions::assert_eq; use std::path::Path; use tempfile::TempDir; @@ -988,6 +997,224 @@ mod tests { .expect("in-process runtime should shutdown cleanly"); } + #[tokio::test] + async fn external_goal_store_allows_goals_for_rolloutless_threads() { + let codex_home = TempDir::new().expect("temp dir"); + std::fs::write( + codex_home.path().join("config.toml"), + "[features]\ngoals = true\n", + ) + .expect("write goals config"); + let config = Arc::new(build_test_config(codex_home.path()).await); + let state_db = StateRuntime::init( + codex_home.path().join("sqlite"), + config.model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let thread_store = Arc::new(InMemoryThreadStore::with_external_thread_goal_state( + state_db, + )); + let args = InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config, + cli_overrides: Vec::new(), + loader_overrides: LoaderOverrides::default(), + strict_config: false, + cloud_config_bundle: CloudConfigBundleLoader::default(), + thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader), + feedback: CodexFeedback::new(), + log_db: None, + state_db: None, + environment_manager: Arc::new(EnvironmentManager::default_for_tests()), + config_warnings: Vec::new(), + session_source: SessionSource::Cli, + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-in-process-test".to_string(), + title: None, + version: "0.0.0".to_string(), + }, + capabilities: None, + }, + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }; + let mut client = start_with_options( + args, + InProcessStartOptions::default().with_thread_store(thread_store.clone()), + ) + .await + .expect("in-process runtime should start"); + client._test_codex_home = Some(codex_home); + + let response = client + .request(ClientRequest::ThreadStart { + request_id: RequestId::Integer(4), + params: ThreadStartParams::default(), + }) + .await + .expect("request transport should work") + .expect("thread/start should succeed"); + let started: ThreadStartResponse = + serde_json::from_value(response).expect("thread/start response should parse"); + let thread_id = started.thread.id; + + let response = client + .request(ClientRequest::ThreadGoalSet { + request_id: RequestId::Integer(5), + params: ThreadGoalSetParams { + thread_id: thread_id.clone(), + objective: Some("Finish the external goal".to_string()), + status: None, + token_budget: Some(Some(321)), + }, + }) + .await + .expect("request transport should work") + .expect("external goal store should accept goal/set"); + let set: ThreadGoalSetResponse = + serde_json::from_value(response).expect("goal/set response should parse"); + assert_eq!(set.goal.objective, "Finish the external goal"); + assert_eq!(set.goal.token_budget, Some(321)); + + let persisted = thread_store + .load_external_thread_goal( + codex_protocol::ThreadId::from_string(thread_id.as_str()) + .expect("thread id should parse"), + ) + .await + .expect("external goal should load") + .expect("goal snapshot should be durable"); + assert_eq!(persisted.objective, "Finish the external goal"); + + let response = client + .request(ClientRequest::ThreadGoalGet { + request_id: RequestId::Integer(6), + params: ThreadGoalGetParams { + thread_id: thread_id.clone(), + }, + }) + .await + .expect("request transport should work") + .expect("external goal store should accept goal/get"); + let get: ThreadGoalGetResponse = + serde_json::from_value(response).expect("goal/get response should parse"); + assert_eq!( + get.goal.map(|goal: ThreadGoal| goal.objective), + Some("Finish the external goal".to_string()) + ); + + let response = client + .request(ClientRequest::ThreadGoalClear { + request_id: RequestId::Integer(7), + params: ThreadGoalClearParams { + thread_id: thread_id.clone(), + }, + }) + .await + .expect("request transport should work") + .expect("external goal store should accept goal/clear"); + let clear: ThreadGoalClearResponse = + serde_json::from_value(response).expect("goal/clear response should parse"); + assert!(clear.cleared); + + let response = client + .request(ClientRequest::ThreadGoalGet { + request_id: RequestId::Integer(8), + params: ThreadGoalGetParams { thread_id }, + }) + .await + .expect("request transport should work") + .expect("external goal store should accept cleared goal/get"); + let get: ThreadGoalGetResponse = + serde_json::from_value(response).expect("goal/get response should parse"); + assert_eq!(get.goal, None); + + client + .shutdown() + .await + .expect("in-process runtime should shutdown cleanly"); + } + + #[tokio::test] + async fn rolloutless_thread_without_external_goal_opt_in_stays_ephemeral() { + let codex_home = TempDir::new().expect("temp dir"); + std::fs::write( + codex_home.path().join("config.toml"), + "[features]\ngoals = true\n", + ) + .expect("write goals config"); + let config = Arc::new(build_test_config(codex_home.path()).await); + let thread_store = Arc::new(InMemoryThreadStore::default()); + let args = InProcessStartArgs { + arg0_paths: Arg0DispatchPaths::default(), + config, + cli_overrides: Vec::new(), + loader_overrides: LoaderOverrides::default(), + strict_config: false, + cloud_config_bundle: CloudConfigBundleLoader::default(), + thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader), + feedback: CodexFeedback::new(), + log_db: None, + state_db: None, + environment_manager: Arc::new(EnvironmentManager::default_for_tests()), + config_warnings: Vec::new(), + session_source: SessionSource::Cli, + enable_codex_api_key_env: false, + initialize: InitializeParams { + client_info: ClientInfo { + name: "codex-in-process-test".to_string(), + title: None, + version: "0.0.0".to_string(), + }, + capabilities: None, + }, + channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY, + }; + let mut client = start_with_options( + args, + InProcessStartOptions::default().with_thread_store(thread_store), + ) + .await + .expect("in-process runtime should start"); + client._test_codex_home = Some(codex_home); + + let response = client + .request(ClientRequest::ThreadStart { + request_id: RequestId::Integer(9), + params: ThreadStartParams::default(), + }) + .await + .expect("request transport should work") + .expect("thread/start should succeed"); + let started: ThreadStartResponse = + serde_json::from_value(response).expect("thread/start response should parse"); + + let error = client + .request(ClientRequest::ThreadGoalGet { + request_id: RequestId::Integer(10), + params: ThreadGoalGetParams { + thread_id: started.thread.id, + }, + }) + .await + .expect("request transport should work") + .expect_err("rolloutless thread without opt-in should reject goals"); + assert!( + error + .message + .contains("ephemeral thread does not support goals"), + "unexpected goal/get error: {}", + error.message + ); + + client + .shutdown() + .await + .expect("in-process runtime should shutdown cleanly"); + } + async fn saturated_warning_shutdown(event_delivery: InProcessEventDelivery) -> Vec { let (codex_home, mut args) = build_test_start_args(SessionSource::Cli, /*channel_capacity*/ 1).await; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index e21aa76cce71..c6ae59d3a1e8 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -340,6 +340,9 @@ impl MessageProcessor { let thread_store = thread_store.unwrap_or_else(|| { codex_core::thread_store_from_config(config.as_ref(), state_db.clone()) }); + // External durable stores can own the same SQLite projection/runtime used by goal + // extensions without exposing a local rollout path. + let state_db = thread_store.state_db_handle().or(state_db); let environment_manager_for_requests = Arc::clone(&environment_manager); let environment_manager_for_extensions = Arc::clone(&environment_manager); let restriction_product = session_source.restriction_product(); @@ -483,6 +486,7 @@ impl MessageProcessor { thread_state_manager.clone(), state_db.clone(), Arc::clone(&goal_service), + Arc::clone(&thread_store), ); let thread_processor = ThreadRequestProcessor::new( auth_manager.clone(), diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index 5878047237ad..1d38679ff2df 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -13,6 +13,7 @@ pub(crate) struct ThreadGoalRequestProcessor { thread_state_manager: ThreadStateManager, state_db: Option, goal_service: Arc, + thread_store: Arc, } impl ThreadGoalRequestProcessor { @@ -23,6 +24,7 @@ impl ThreadGoalRequestProcessor { thread_state_manager: ThreadStateManager, state_db: Option, goal_service: Arc, + thread_store: Arc, ) -> Self { Self { thread_manager, @@ -31,6 +33,7 @@ impl ThreadGoalRequestProcessor { thread_state_manager, state_db, goal_service, + thread_store, } } @@ -91,6 +94,18 @@ impl ThreadGoalRequestProcessor { } else { None }; + if self.thread_store.supports_external_thread_goal_state() + && let Some(state_db) = thread_goal_state_db.as_ref() + && let Err(err) = self + .hydrate_external_thread_goal(thread.session_configured().thread_id, state_db) + .await + { + warn!( + "failed to restore external thread goal before running resume snapshot for {}: {}", + thread.session_configured().thread_id, + err.message + ); + } (emit_thread_goal_update, thread_goal_state_db) } @@ -134,6 +149,8 @@ impl ThreadGoalRequestProcessor { ) .await .map_err(goal_service_error)?; + self.persist_external_thread_goal_if_supported(thread_id, Some(outcome.goal.clone())) + .await?; let goal = ThreadGoal::from(outcome.goal.clone()); let persist_result = match self.thread_manager.get_thread(thread_id).await { @@ -172,6 +189,10 @@ impl ThreadGoalRequestProcessor { let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?; let state_db = self.state_db_for_materialized_thread(thread_id).await?; + if self.thread_store.supports_external_thread_goal_state() { + self.hydrate_external_thread_goal(thread_id, &state_db) + .await?; + } let goal = self .goal_service .get_thread_goal(&state_db, thread_id) @@ -205,6 +226,10 @@ impl ThreadGoalRequestProcessor { .clear_thread_goal(&state_db, thread_id) .await .map_err(goal_service_error)?; + if cleared { + self.persist_external_thread_goal_if_supported(thread_id, None) + .await?; + } self.outgoing .send_response(request_id, ThreadGoalClearResponse { cleared }) @@ -220,8 +245,9 @@ impl ThreadGoalRequestProcessor { &self, thread_id: ThreadId, ) -> Result { + let external_goal_state = self.thread_store.supports_external_thread_goal_state(); if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { - if thread.rollout_path().is_none() { + if thread.rollout_path().is_none() && !external_goal_state { return Err(invalid_request(format!( "ephemeral thread does not support goals: {thread_id}" ))); @@ -229,6 +255,15 @@ impl ThreadGoalRequestProcessor { if let Some(state_db) = thread.state_db() { return Ok(state_db); } + } else if external_goal_state { + self.thread_store + .read_thread(StoreReadThreadParams { + thread_id, + include_archived: true, + include_history: false, + }) + .await + .map_err(external_thread_goal_store_error)?; } else { codex_rollout::find_thread_path_by_id_str( &self.config.codex_home, @@ -252,6 +287,9 @@ impl ThreadGoalRequestProcessor { thread_id: ThreadId, state_db: &StateDbHandle, ) -> Result<(), JSONRPCErrorError> { + if self.thread_store.supports_external_thread_goal_state() { + return self.hydrate_external_thread_goal(thread_id, state_db).await; + } let running_thread = self.thread_manager.get_thread(thread_id).await.ok(); let rollout_path = match running_thread.as_ref() { Some(thread) => thread.rollout_path().ok_or_else(|| { @@ -283,6 +321,74 @@ impl ThreadGoalRequestProcessor { Ok(()) } + async fn hydrate_external_thread_goal( + &self, + thread_id: ThreadId, + state_db: &StateDbHandle, + ) -> Result<(), JSONRPCErrorError> { + let external_goal = self + .thread_store + .load_external_thread_goal(thread_id) + .await + .map_err(external_thread_goal_store_error)?; + let local_goal = state_db + .thread_goals() + .get_thread_goal(thread_id) + .await + .map_err(|err| internal_error(format!("failed to read local thread goal: {err}")))?; + + match external_goal { + Some(external_goal) => { + if external_goal.thread_id != thread_id { + return Err(internal_error(format!( + "external thread goal snapshot has mismatched thread id: expected {thread_id}, got {}", + external_goal.thread_id + ))); + } + if local_goal.as_ref().is_some_and(|local_goal| { + protocol_thread_goal_from_state(local_goal) == external_goal + }) { + return Ok(()); + } + state_db + .thread_goals() + .restore_thread_goal(&external_goal) + .await + .map_err(|err| { + internal_error(format!("failed to restore external thread goal: {err}")) + })?; + } + None => { + if local_goal.is_some() { + state_db + .thread_goals() + .delete_thread_goal(thread_id) + .await + .map_err(|err| { + internal_error(format!( + "failed to apply external thread goal clear: {err}" + )) + })?; + } + } + } + Ok(()) + } + + async fn persist_external_thread_goal_if_supported( + &self, + thread_id: ThreadId, + goal: Option, + ) -> Result<(), JSONRPCErrorError> { + if !self.thread_store.supports_external_thread_goal_state() { + return Ok(()); + } + self.thread_store + .persist_external_thread_goal(thread_id, goal) + .await + .map_err(external_thread_goal_store_error) + } + async fn emit_thread_goal_snapshot(&self, thread_id: ThreadId) { let state_db = match self.state_db_for_materialized_thread(thread_id).await { Ok(state_db) => state_db, @@ -294,6 +400,17 @@ impl ThreadGoalRequestProcessor { return; } }; + if self.thread_store.supports_external_thread_goal_state() + && let Err(err) = self + .hydrate_external_thread_goal(thread_id, &state_db) + .await + { + warn!( + "failed to restore external thread goal before emitting resume snapshot for {thread_id}: {}", + err.message + ); + return; + } let listener_command_tx = { let thread_state = self.thread_state_manager.thread_state(thread_id).await; let thread_state = thread_state.lock().await; @@ -366,6 +483,55 @@ impl ThreadGoalRequestProcessor { } } +fn external_thread_goal_store_error(err: ThreadStoreError) -> JSONRPCErrorError { + match err { + ThreadStoreError::ThreadNotFound { thread_id } => { + invalid_request(format!("thread not found: {thread_id}")) + } + ThreadStoreError::InvalidRequest { message } => invalid_request(message), + ThreadStoreError::Unsupported { operation } => internal_error(format!( + "external thread goal store does not support {operation}" + )), + ThreadStoreError::Conflict { message } | ThreadStoreError::Internal { message } => { + internal_error(format!("external thread goal store failed: {message}")) + } + } +} + +fn protocol_thread_goal_from_state( + goal: &codex_state::ThreadGoal, +) -> codex_protocol::protocol::ThreadGoal { + codex_protocol::protocol::ThreadGoal { + thread_id: goal.thread_id, + objective: goal.objective.clone(), + status: match goal.status { + codex_state::ThreadGoalStatus::Active => { + codex_protocol::protocol::ThreadGoalStatus::Active + } + codex_state::ThreadGoalStatus::Paused => { + codex_protocol::protocol::ThreadGoalStatus::Paused + } + codex_state::ThreadGoalStatus::Blocked => { + codex_protocol::protocol::ThreadGoalStatus::Blocked + } + codex_state::ThreadGoalStatus::UsageLimited => { + codex_protocol::protocol::ThreadGoalStatus::UsageLimited + } + codex_state::ThreadGoalStatus::BudgetLimited => { + codex_protocol::protocol::ThreadGoalStatus::BudgetLimited + } + codex_state::ThreadGoalStatus::Complete => { + codex_protocol::protocol::ThreadGoalStatus::Complete + } + }, + token_budget: goal.token_budget, + tokens_used: goal.tokens_used, + time_used_seconds: goal.time_used_seconds, + created_at: goal.created_at.timestamp(), + updated_at: goal.updated_at.timestamp(), + } +} + pub(super) fn api_thread_goal_from_state(goal: codex_state::ThreadGoal) -> ThreadGoal { ThreadGoal { thread_id: goal.thread_id.to_string(), diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index be12e61e085e..9c53f3211b8d 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -149,7 +149,6 @@ use codex_terminal_detection::user_agent; use codex_thread_store::CreateThreadParams; use codex_thread_store::LiveThread; use codex_thread_store::LiveThreadInitGuard; -use codex_thread_store::LocalThreadStore; use codex_thread_store::ReadThreadParams; use codex_thread_store::ResumeThreadParams; use codex_thread_store::ThreadPersistenceMetadata; diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 5988834ee567..c434b03655ce 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -630,12 +630,8 @@ impl Session { let state_db_fut = async { if config.ephemeral { None - } else if let Some(local_store) = - thread_store.as_any().downcast_ref::() - { - local_store.state_db().await } else { - None + thread_store.state_db_handle() } } .instrument(info_span!( diff --git a/codex-rs/state/src/runtime/goals.rs b/codex-rs/state/src/runtime/goals.rs index e5769ccd1fda..841576f6994b 100644 --- a/codex-rs/state/src/runtime/goals.rs +++ b/codex-rs/state/src/runtime/goals.rs @@ -122,6 +122,82 @@ RETURNING thread_goal_from_row(&row) } + /// Restores a durable goal snapshot from an external thread store. + /// + /// External stores intentionally persist the protocol-visible goal shape rather than the + /// SQLite-only optimistic-concurrency id. Restoring creates a fresh goal id while preserving + /// the user-visible status, budget, accounting, and timestamps. + pub async fn restore_thread_goal( + &self, + goal: &codex_protocol::protocol::ThreadGoal, + ) -> anyhow::Result { + let goal_id = Uuid::new_v4().to_string(); + let status = match goal.status { + codex_protocol::protocol::ThreadGoalStatus::Active => crate::ThreadGoalStatus::Active, + codex_protocol::protocol::ThreadGoalStatus::Paused => crate::ThreadGoalStatus::Paused, + codex_protocol::protocol::ThreadGoalStatus::Blocked => crate::ThreadGoalStatus::Blocked, + codex_protocol::protocol::ThreadGoalStatus::UsageLimited => { + crate::ThreadGoalStatus::UsageLimited + } + codex_protocol::protocol::ThreadGoalStatus::BudgetLimited => { + crate::ThreadGoalStatus::BudgetLimited + } + codex_protocol::protocol::ThreadGoalStatus::Complete => { + crate::ThreadGoalStatus::Complete + } + }; + let status = status_after_budget_limit(status, goal.tokens_used, goal.token_budget); + let created_at_ms = goal.created_at.saturating_mul(1000); + let updated_at_ms = goal.updated_at.saturating_mul(1000); + let row = sqlx::query( + r#" +INSERT INTO thread_goals ( + thread_id, + goal_id, + objective, + status, + token_budget, + tokens_used, + time_used_seconds, + created_at_ms, + updated_at_ms +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +ON CONFLICT(thread_id) DO UPDATE SET + goal_id = excluded.goal_id, + objective = excluded.objective, + status = excluded.status, + token_budget = excluded.token_budget, + tokens_used = excluded.tokens_used, + time_used_seconds = excluded.time_used_seconds, + created_at_ms = excluded.created_at_ms, + updated_at_ms = excluded.updated_at_ms +RETURNING + thread_id, + goal_id, + objective, + status, + token_budget, + tokens_used, + time_used_seconds, + created_at_ms, + updated_at_ms + "#, + ) + .bind(goal.thread_id.to_string()) + .bind(goal_id) + .bind(goal.objective.as_str()) + .bind(status.as_str()) + .bind(goal.token_budget) + .bind(goal.tokens_used) + .bind(goal.time_used_seconds) + .bind(created_at_ms) + .bind(updated_at_ms) + .fetch_one(self.pool.as_ref()) + .await?; + + thread_goal_from_row(&row) + } + pub async fn insert_thread_goal( &self, thread_id: ThreadId, @@ -570,6 +646,38 @@ mod tests { .expect("test thread should be upserted"); } + #[tokio::test] + async fn restore_thread_goal_preserves_external_snapshot_accounting() { + let runtime = test_runtime().await; + let thread_id = test_thread_id(); + upsert_test_thread(&runtime, thread_id).await; + let snapshot = codex_protocol::protocol::ThreadGoal { + thread_id, + objective: "resume external work".to_string(), + status: codex_protocol::protocol::ThreadGoalStatus::Paused, + token_budget: Some(500), + tokens_used: 123, + time_used_seconds: 45, + created_at: 1_700_000_000, + updated_at: 1_700_000_100, + }; + + let restored = runtime + .thread_goals() + .restore_thread_goal(&snapshot) + .await + .expect("external goal snapshot should restore"); + + assert_eq!(restored.thread_id, thread_id); + assert_eq!(restored.objective, snapshot.objective); + assert_eq!(restored.status, crate::ThreadGoalStatus::Paused); + assert_eq!(restored.token_budget, snapshot.token_budget); + assert_eq!(restored.tokens_used, snapshot.tokens_used); + assert_eq!(restored.time_used_seconds, snapshot.time_used_seconds); + assert_eq!(restored.created_at.timestamp(), snapshot.created_at); + assert_eq!(restored.updated_at.timestamp(), snapshot.updated_at); + } + #[tokio::test] async fn replace_update_and_get_thread_goal() { let runtime = test_runtime().await; diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index dfed2ffac297..68b335059930 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -14,7 +14,9 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionContextWindow; use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; +use codex_protocol::protocol::ThreadGoal; use codex_protocol::protocol::ThreadMemoryMode; +use codex_rollout::StateDbHandle; use codex_rollout::persisted_rollout_items; use crate::AppendThreadItemsParams; @@ -227,9 +229,9 @@ pub struct InMemoryThreadStoreCalls { /// Test and debug configs can select this store by id, letting tests exercise /// config-driven non-local persistence without requiring the real remote gRPC /// service. -#[derive(Default)] pub struct InMemoryThreadStore { state: tokio::sync::Mutex, + external_goal_state_db: Option, } #[derive(Default)] @@ -240,9 +242,30 @@ struct InMemoryThreadStoreState { metadata_updates: HashMap, names: HashMap>, rollout_paths: HashMap, + external_thread_goals: HashMap>, +} + +impl Default for InMemoryThreadStore { + fn default() -> Self { + Self { + state: tokio::sync::Mutex::new(InMemoryThreadStoreState::default()), + external_goal_state_db: None, + } + } } impl InMemoryThreadStore { + /// Creates an in-memory store that behaves like an external durable goal-state backend. + /// + /// This is primarily useful for consumers that need to exercise a non-local thread store + /// while retaining the standard GoalService SQLite runtime. + pub fn with_external_thread_goal_state(state_db: StateDbHandle) -> Self { + Self { + state: tokio::sync::Mutex::new(InMemoryThreadStoreState::default()), + external_goal_state_db: Some(state_db), + } + } + /// Returns the store associated with `id`, creating it if needed. pub fn for_id(id: impl Into) -> Arc { let id = id.into(); @@ -432,6 +455,56 @@ impl ThreadStore for InMemoryThreadStore { self } + fn state_db_handle(&self) -> Option { + self.external_goal_state_db.clone() + } + + fn supports_external_thread_goal_state(&self) -> bool { + self.external_goal_state_db.is_some() + } + + fn load_external_thread_goal( + &self, + thread_id: ThreadId, + ) -> ThreadStoreFuture<'_, Option> { + Box::pin(async move { + if !self.supports_external_thread_goal_state() { + return Err(ThreadStoreError::Unsupported { + operation: "thread_goal/load_external", + }); + } + let state = self.state.lock().await; + if !state.created_threads.contains_key(&thread_id) { + return Err(ThreadStoreError::ThreadNotFound { thread_id }); + } + Ok(state + .external_thread_goals + .get(&thread_id) + .cloned() + .flatten()) + }) + } + + fn persist_external_thread_goal( + &self, + thread_id: ThreadId, + goal: Option, + ) -> ThreadStoreFuture<'_, ()> { + Box::pin(async move { + if !self.supports_external_thread_goal_state() { + return Err(ThreadStoreError::Unsupported { + operation: "thread_goal/persist_external", + }); + } + let mut state = self.state.lock().await; + if !state.created_threads.contains_key(&thread_id) { + return Err(ThreadStoreError::ThreadNotFound { thread_id }); + } + state.external_thread_goals.insert(thread_id, goal); + Ok(()) + }) + } + fn create_thread(&self, params: CreateThreadParams) -> ThreadStoreFuture<'_, ()> { Box::pin(InMemoryThreadStore::create_thread(self, params)) } diff --git a/codex-rs/thread-store/src/local/mod.rs b/codex-rs/thread-store/src/local/mod.rs index 51e622fc5bc4..f8b81fdf79c7 100644 --- a/codex-rs/thread-store/src/local/mod.rs +++ b/codex-rs/thread-store/src/local/mod.rs @@ -229,6 +229,10 @@ impl ThreadStore for LocalThreadStore { self } + fn state_db_handle(&self) -> Option { + self.state_db.clone() + } + fn create_thread(&self, params: CreateThreadParams) -> ThreadStoreFuture<'_, ()> { Box::pin(async move { live_writer::create_thread(self, params).await }) } diff --git a/codex-rs/thread-store/src/store.rs b/codex-rs/thread-store/src/store.rs index 65ad1ce8a887..02828bbe6b02 100644 --- a/codex-rs/thread-store/src/store.rs +++ b/codex-rs/thread-store/src/store.rs @@ -1,4 +1,6 @@ use codex_protocol::ThreadId; +use codex_protocol::protocol::ThreadGoal; +use codex_rollout::StateDbHandle; use std::any::Any; use std::future::Future; use std::pin::Pin; @@ -33,6 +35,55 @@ pub trait ThreadStore: Any + Send + Sync { /// Return this store as [`Any`] for implementation-owned escape hatches. fn as_any(&self) -> &dyn Any; + /// Returns the SQLite state runtime that backs durable, process-local projections for this + /// store, when one exists. + /// + /// Most non-local stores should leave this as `None`. Stores that opt into external thread + /// goal state must return a handle so GoalService can keep its existing runtime/accounting + /// implementation while the store remains the durable source of truth. + fn state_db_handle(&self) -> Option { + None + } + + /// Whether this store durably owns thread-goal snapshots outside local rollout JSONL. + /// + /// The default is deliberately false: a thread without a local rollout path remains + /// ephemeral and does not support goals unless its store explicitly opts in here. + fn supports_external_thread_goal_state(&self) -> bool { + false + } + + /// Loads the durable externally-owned goal snapshot for a thread. + /// + /// Implementations that return `true` from [`Self::supports_external_thread_goal_state`] + /// must implement this method. `Ok(None)` is the durable clear/tombstone state. + fn load_external_thread_goal( + &self, + _thread_id: ThreadId, + ) -> ThreadStoreFuture<'_, Option> { + Box::pin(async { + Err(ThreadStoreError::Unsupported { + operation: "thread_goal/load_external", + }) + }) + } + + /// Persists the externally-owned goal snapshot for a thread. + /// + /// Implementations that return `true` from [`Self::supports_external_thread_goal_state`] + /// must implement this method. `None` must durably represent a cleared goal. + fn persist_external_thread_goal( + &self, + _thread_id: ThreadId, + _goal: Option, + ) -> ThreadStoreFuture<'_, ()> { + Box::pin(async { + Err(ThreadStoreError::Unsupported { + operation: "thread_goal/persist_external", + }) + }) + } + /// Creates a new live thread. fn create_thread(&self, params: CreateThreadParams) -> ThreadStoreFuture<'_, ()>; From 74ac7ec2ca307c0a7a8450d4a4ce8ef587d793a4 Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Sat, 27 Jun 2026 09:18:44 -0700 Subject: [PATCH 2/2] [core] Annotate external goal clear argument --- .../app-server/src/request_processors/thread_goal_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs index 1d38679ff2df..f320258c2e96 100644 --- a/codex-rs/app-server/src/request_processors/thread_goal_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_goal_processor.rs @@ -227,7 +227,7 @@ impl ThreadGoalRequestProcessor { .await .map_err(goal_service_error)?; if cleared { - self.persist_external_thread_goal_if_supported(thread_id, None) + self.persist_external_thread_goal_if_supported(thread_id, /*goal*/ None) .await?; }