Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/app-server-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-chatgpt = { workspace = true }
codex-config = { workspace = true }
codex-core = { workspace = true }
codex-exec-server = { workspace = true }
Expand Down
35 changes: 35 additions & 0 deletions codex-rs/app-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::referrals::ReferralClient;
use codex_config::CloudConfigBundleLoader;
use codex_config::LoaderOverrides;
use codex_config::NoopThreadConfigLoader;
Expand Down Expand Up @@ -436,6 +437,9 @@ enum ClientCommand {
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
ReferralClient {
response_tx: oneshot::Sender<Arc<ReferralClient>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
},
Expand Down Expand Up @@ -484,6 +488,7 @@ impl InProcessAppServerClient {
let channel_capacity = args.channel_capacity.max(1);
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
let referral_client = handle.referral_client();
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
Expand Down Expand Up @@ -529,6 +534,9 @@ impl InProcessAppServerClient {
let send_result = request_sender.fail_server_request(request_id, error);
let _ = response_tx.send(send_result);
}
Some(ClientCommand::ReferralClient { response_tx }) => {
let _ = response_tx.send(Arc::clone(&referral_client));
}
Some(ClientCommand::Shutdown { response_tx }) => {
let shutdown_result = handle.shutdown().await;
let _ = response_tx.send(shutdown_result);
Expand Down Expand Up @@ -757,6 +765,7 @@ impl InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
..
} = self;
let mut worker_handle = worker_handle;
// Drop the caller-facing receiver before asking the worker to shut
Expand Down Expand Up @@ -829,6 +838,25 @@ impl InProcessAppServerRequestHandle {
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}

pub async fn referral_client(&self) -> IoResult<Arc<ReferralClient>> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::ReferralClient { response_tx })
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process referral client channel is closed",
)
})
}
}

impl AppServerRequestHandle {
Expand All @@ -848,6 +876,13 @@ impl AppServerRequestHandle {
Self::Remote(handle) => handle.request_typed(request).await,
}
}

pub async fn referral_client(&self) -> IoResult<Option<Arc<ReferralClient>>> {
match self {
Self::InProcess(handle) => handle.referral_client().await.map(Some),
Self::Remote(_) => Ok(None),
}
}
}

impl AppServerClient {
Expand Down
15 changes: 14 additions & 1 deletion codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -77,6 +78,7 @@ use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::referrals::ReferralClient;
use codex_config::CloudConfigBundleLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigLoader;
Expand Down Expand Up @@ -261,6 +263,7 @@ pub struct InProcessClientHandle {
client: InProcessClientSender,
event_rx: mpsc::Receiver<InProcessServerEvent>,
runtime_handle: tokio::task::JoinHandle<()>,
referral_client: Arc<ReferralClient>,
#[cfg(test)]
_test_codex_home: Option<tempfile::TempDir>,
}
Expand Down Expand Up @@ -342,6 +345,10 @@ impl InProcessClientHandle {
pub fn sender(&self) -> InProcessClientSender {
self.client.clone()
}

pub fn referral_client(&self) -> Arc<ReferralClient> {
Arc::clone(&self.referral_client)
}
}

/// Starts an in-process app-server runtime and performs initialize handshake.
Expand Down Expand Up @@ -376,12 +383,18 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let referral_auth_manager = Arc::new(OnceLock::new());
let referral_client = Arc::new(ReferralClient::new(
Arc::clone(&referral_auth_manager),
args.config.chatgpt_base_url.clone(),
));

let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let _ = referral_auth_manager.set(Arc::clone(&auth_manager));
let analytics_events_client =
analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref());
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
Expand Down Expand Up @@ -716,11 +729,11 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let _ = done_tx.send(());
}
});

Ok(InProcessClientHandle {
client: InProcessClientSender { client_tx },
event_rx,
runtime_handle,
referral_client,
#[cfg(test)]
_test_codex_home: None,
})
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/chatgpt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ codex-model-provider = { workspace = true }
codex-plugin = { workspace = true }
codex-utils-cli = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[dev-dependencies]
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }

[lib]
Expand Down
1 change: 1 addition & 0 deletions codex-rs/chatgpt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod apply_command;
mod chatgpt_client;
pub mod connectors;
pub mod get_task;
pub mod referrals;
pub mod workspace_settings;
Loading
Loading