Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,13 @@ mod tests {
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::ExternalAgentConfigImportCompletedNotification;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadGoalClearParams;
use codex_app_server_protocol::ThreadGoalClearResponse;
use codex_app_server_protocol::ThreadGoalGetParams;
use codex_app_server_protocol::ThreadGoalGetResponse;
use codex_app_server_protocol::ThreadGoalSetParams;
use codex_app_server_protocol::ThreadGoalSetResponse;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
Expand All @@ -818,7 +825,9 @@ mod tests {
use codex_app_server_protocol::TurnItemsView;
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use codex_state::StateRuntime;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
Expand Down Expand Up @@ -988,6 +997,224 @@ mod tests {
.expect("in-process runtime should shutdown cleanly");
}

#[tokio::test]
async fn external_goal_store_allows_goals_for_rolloutless_threads() {
let codex_home = TempDir::new().expect("temp dir");
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\ngoals = true\n",
)
.expect("write goals config");
let config = Arc::new(build_test_config(codex_home.path()).await);
let state_db = StateRuntime::init(
codex_home.path().join("sqlite"),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let thread_store = Arc::new(InMemoryThreadStore::with_external_thread_goal_state(
state_db,
));
let args = InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config,
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
strict_config: false,
cloud_config_bundle: CloudConfigBundleLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
};
let mut client = start_with_options(
args,
InProcessStartOptions::default().with_thread_store(thread_store.clone()),
)
.await
.expect("in-process runtime should start");
client._test_codex_home = Some(codex_home);

let response = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(4),
params: ThreadStartParams::default(),
})
.await
.expect("request transport should work")
.expect("thread/start should succeed");
let started: ThreadStartResponse =
serde_json::from_value(response).expect("thread/start response should parse");
let thread_id = started.thread.id;

let response = client
.request(ClientRequest::ThreadGoalSet {
request_id: RequestId::Integer(5),
params: ThreadGoalSetParams {
thread_id: thread_id.clone(),
objective: Some("Finish the external goal".to_string()),
status: None,
token_budget: Some(Some(321)),
},
})
.await
.expect("request transport should work")
.expect("external goal store should accept goal/set");
let set: ThreadGoalSetResponse =
serde_json::from_value(response).expect("goal/set response should parse");
assert_eq!(set.goal.objective, "Finish the external goal");
assert_eq!(set.goal.token_budget, Some(321));

let persisted = thread_store
.load_external_thread_goal(
codex_protocol::ThreadId::from_string(thread_id.as_str())
.expect("thread id should parse"),
)
.await
.expect("external goal should load")
.expect("goal snapshot should be durable");
assert_eq!(persisted.objective, "Finish the external goal");

let response = client
.request(ClientRequest::ThreadGoalGet {
request_id: RequestId::Integer(6),
params: ThreadGoalGetParams {
thread_id: thread_id.clone(),
},
})
.await
.expect("request transport should work")
.expect("external goal store should accept goal/get");
let get: ThreadGoalGetResponse =
serde_json::from_value(response).expect("goal/get response should parse");
assert_eq!(
get.goal.map(|goal: ThreadGoal| goal.objective),
Some("Finish the external goal".to_string())
);

let response = client
.request(ClientRequest::ThreadGoalClear {
request_id: RequestId::Integer(7),
params: ThreadGoalClearParams {
thread_id: thread_id.clone(),
},
})
.await
.expect("request transport should work")
.expect("external goal store should accept goal/clear");
let clear: ThreadGoalClearResponse =
serde_json::from_value(response).expect("goal/clear response should parse");
assert!(clear.cleared);

let response = client
.request(ClientRequest::ThreadGoalGet {
request_id: RequestId::Integer(8),
params: ThreadGoalGetParams { thread_id },
})
.await
.expect("request transport should work")
.expect("external goal store should accept cleared goal/get");
let get: ThreadGoalGetResponse =
serde_json::from_value(response).expect("goal/get response should parse");
assert_eq!(get.goal, None);

client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}

#[tokio::test]
async fn rolloutless_thread_without_external_goal_opt_in_stays_ephemeral() {
let codex_home = TempDir::new().expect("temp dir");
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\ngoals = true\n",
)
.expect("write goals config");
let config = Arc::new(build_test_config(codex_home.path()).await);
let thread_store = Arc::new(InMemoryThreadStore::default());
let args = InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config,
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
strict_config: false,
cloud_config_bundle: CloudConfigBundleLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
state_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
};
let mut client = start_with_options(
args,
InProcessStartOptions::default().with_thread_store(thread_store),
)
.await
.expect("in-process runtime should start");
client._test_codex_home = Some(codex_home);

let response = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(9),
params: ThreadStartParams::default(),
})
.await
.expect("request transport should work")
.expect("thread/start should succeed");
let started: ThreadStartResponse =
serde_json::from_value(response).expect("thread/start response should parse");

let error = client
.request(ClientRequest::ThreadGoalGet {
request_id: RequestId::Integer(10),
params: ThreadGoalGetParams {
thread_id: started.thread.id,
},
})
.await
.expect("request transport should work")
.expect_err("rolloutless thread without opt-in should reject goals");
assert!(
error
.message
.contains("ephemeral thread does not support goals"),
"unexpected goal/get error: {}",
error.message
);

client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}

async fn saturated_warning_shutdown(event_delivery: InProcessEventDelivery) -> Vec<String> {
let (codex_home, mut args) =
build_test_start_args(SessionSource::Cli, /*channel_capacity*/ 1).await;
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ impl MessageProcessor {
let thread_store = thread_store.unwrap_or_else(|| {
codex_core::thread_store_from_config(config.as_ref(), state_db.clone())
});
// External durable stores can own the same SQLite projection/runtime used by goal
// extensions without exposing a local rollout path.
let state_db = thread_store.state_db_handle().or(state_db);
let environment_manager_for_requests = Arc::clone(&environment_manager);
let environment_manager_for_extensions = Arc::clone(&environment_manager);
let restriction_product = session_source.restriction_product();
Expand Down Expand Up @@ -483,6 +486,7 @@ impl MessageProcessor {
thread_state_manager.clone(),
state_db.clone(),
Arc::clone(&goal_service),
Arc::clone(&thread_store),
);
let thread_processor = ThreadRequestProcessor::new(
auth_manager.clone(),
Expand Down
Loading
Loading