Skip to content

drpcstream: introduce shared BufferPool for ring buffer#55

Open
shubhamdhama wants to merge 15 commits intostream-multiplexingfrom
shubham/buffer-pool-for-ringbuffer
Open

drpcstream: introduce shared BufferPool for ring buffer#55
shubhamdhama wants to merge 15 commits intostream-multiplexingfrom
shubham/buffer-pool-for-ringbuffer

Conversation

@shubhamdhama
Copy link
Copy Markdown

@shubhamdhama shubhamdhama commented Apr 17, 2026

Add a BufferPool backed by sync.Pool that is shared across all streams
within a Manager. The ring buffer now obtains buffers from the pool on
Enqueue and transfers ownership to the caller on Dequeue, which advances
the tail immediately. This removes the two-step Dequeue/Done protocol
and simplifies Close (no longer needs to wait for held buffers).

The pool is a required parameter in the Stream constructor, created once
per Manager and passed to all streams it creates.

We are not using it and not planning to use it anytime soon, so till then
it's just a burden to maintain.
Enable multiple concurrent streams over a single transport. This is the
foundational change that replaces the single-stream-at-a-time model with true
multiplexing, allowing clients and servers to run multiple RPCs concurrently on
a shared connection.

Background:

Previously, the Manager enforced single-stream semantics: a semaphore (sem)
allowed only one active stream, and each new stream had to wait for the previous
one to finish (waitForPreviousStream). Stream IDs were required to be
monotonically increasing (checkStreamMonotonicity), and a single PacketAssembler
was shared across all invoke sequences. This was simple and correct for
one-at-a-time RPCs but incompatible with multiplexing.

Structural changes:

Manager:
- Remove the semaphore (sem) and acquireSemaphore/waitForPreviousStream.
  Multiple streams can now be created concurrently without blocking on each
  other.
- Remove checkStreamMonotonicity. With multiplexing, frames from different
  streams arrive interleaved; monotonicity is not meaningful.
- Remove lastFrameID/lastFrameKind tracking fields (only used by the
  monotonicity check).
- Replace the single shared PacketAssembler with a per-stream invokesAssembler
  map (map[uint64]*invokeAssembler). Each stream's invoke/metadata frame
  sequence is assembled independently.
- Remove SoftCancel option (see error semantics below).
- Remove GetLatest from streamRegistry; manageReader now dispatches frames by
  looking up the stream ID in the registry directly.

Server:
- ServeOne now spawns a goroutine per incoming RPC via sync.WaitGroup, rather
  than handling RPCs sequentially. Errors from individual RPC handlers are
  logged (via opts.Log) rather than terminating the connection.

Stream:
- NewWithOptions no longer calls wr.Reset() on the shared Writer. With
  multiplexing, multiple streams share the same Writer; resetting it would
  discard buffered frames from other streams.
- SendCancel no longer returns a (busy, error) tuple. It blocks on the stream's
  write lock instead of returning busy=true when another write is in progress.
  This guarantees the cancel frame is sent (or fails with an IO error), at the
  cost of waiting for any in-progress write to finish. A future writer queue
  will eliminate this blocking.

Error and cancellation semantics:

The central design principle is that the manageReader goroutine is the single
authority on transport health. It is the only goroutine that reads from the
transport, and if the transport fails, it will detect the failure and terminate
the connection. Write-side errors propagate to the caller but do not directly
terminate the connection; the reader will independently detect the broken
transport (since an IO write failure implies the transport is broken, and the
next read will also fail). This matches gRPC's approach: when loopyWriter
encounters an IO error, it does not close the connection. Instead, it relies on
the reader to detect the failure and clean up.

Error classification:

  Connection-scoped (terminates all streams):
  - Transport read error: manageReader fails to read a frame.
  - Frame assembly error: corrupted wire data that cannot be parsed.
  - Protocol error: e.g., receiving an invoke on an existing stream, or an
    unknown non-control packet kind.
  - Manager.Close(): explicit shutdown by the application.

  Stream-scoped (only affects that stream):
  - Application error: the RPC handler returns an error, which is sent via
    SendError (KindError) and terminates only that stream.
  - Remote close: receiving KindClose or KindCloseSend terminates or half-closes
    only that stream.
  - Remote cancel: receiving KindCancel terminates only that stream.
  - Remote error: receiving KindError terminates only that stream.
  - Write error (MsgSend, SendError, CloseSend, SendCancel): the error
    propagates to the caller. The stream is terminated locally. The manageReader
    goroutine will detect the transport failure on its next read and terminate
    the connection.

Context cancellation:

When a stream's context is cancelled, manageStream:
1. Attempts to send a KindCancel frame (SendCancel). This blocks until any
   in-progress write on that stream completes, then sends the cancel. If the
   send fails (IO error), the error is logged. The reader will catch the
   transport failure.
2. Cancels the stream locally (stream.Cancel), which terminates the stream and
   causes any blocked Send/Recv to return the context error.
3. Waits for the stream to finish (stream.Finished).

The SoftCancel option is removed. Previously, SoftCancel=false would terminate
the entire connection when a stream's context was cancelled (calling m.terminate
if the stream wasn't already finished). With multiplexing, cancelling one stream
must never kill the connection. SoftCancel=true behavior (send cancel, then
cancel locally) is now the only behavior, simplified to always block for the
write lock rather than returning "busy" and falling back to a hard cancel.

Manager termination:

When the manager terminates (from any connection-scoped error), it closes the
transport and the stream registry. Each active stream's manageStream goroutine
detects termination via m.sigs.term, cancels its stream, and waits for it to
finish. Manager.Close() then waits for all stream goroutines (m.wg.Wait) and the
reader goroutine before returning.

Known limitations:

- The shared drpcwire.Writer is protected by a mutex. All streams serialize
  their writes through this single writer.
- SendCancel blocks on the stream's write lock. If a stream has a large
  in-progress write, the cancel is delayed.
- packetBuffer is single-slot (Put blocks until Get+Done). A slow consumer
  stream blocks manageReader, stalling frame delivery to all streams. This needs
  to be addressed with per-stream buffering or async delivery.
- Conn.Invoke() holds a mutex for the entire unary RPC duration, serializing
  concurrent unary RPCs. Streaming RPCs (NewStream) are not affected.
Add Stream.WriteInvoke that writes InvokeMetadata and Invoke frames
under a single write lock acquisition. This prevents SendCancel from
slipping in between the two frames when a context is cancelled during
stream setup.

Without this, the following race is possible:
1. Client creates stream, starts manageStream goroutine.
2. doNewStream sends InvokeMetadata, releases write lock.
3. Context cancels. manageStream calls SendCancel, acquires write lock,
   sends KindCancel, terminates the stream.
4. doNewStream tries to send Invoke, sees stream terminated, returns error.

The server receives InvokeMetadata then Cancel, but never the Invoke.
It has no registered stream to cancel, so the Cancel is dropped and
the partial invokeAssembler entry leaks until the connection closes.

With WriteInvoke, SendCancel blocks until both frames are written.
The server always sees a complete invoke before any cancel.
…s.Close

Previously, when a manager terminated, each stream's manageStream goroutine
independently detected m.sigs.term.Signal and cancelled its own stream.
This was asynchronous.

Instead, cancel all streams directly in activeStreams.Close(), called from
Manager.terminate. This makes termination synchronous and immediate, and
removes the term.Signal case from manageStream.
With multiplexing, multiple Invoke calls run concurrently. The shared
c.wbuf buffer and its protecting mutex are replaced with per-call
allocation. Stats collection is removed (unused by cockroach).
Fix compilation errors and remove tests for removed features:
- Update New/NewWithOptions calls to include ManagerKind argument
- Update activeStreams.Close calls to include error argument
- Remove tests for SoftCancel, cross-stream monotonicity, Unblocked
  semantics, SendCancel busy return, and ForEach (all removed)
- Adapt OldStreamFramesIgnored to use stream.Finished instead of
  Unblocked
The old packetBuffer was single-slot: Put blocked until the consumer
called Get+Done. This meant manageReader was stuck delivering to one
stream and couldn't serve others — a deadlock under multiplexing.

Replace it with a ring-buffer packetQueue (capacity 256) that copies
data on Put and returns immediately. Get drains queued messages before
returning the close error, ensuring graceful shutdown delivers all
buffered data.
Replace per-stream drpcwire.Writer with a shared MuxWriter that uses a
dedicated drain goroutine. This decouples stream writes from transport I/O,
enabling true concurrent stream multiplexing.

Core Architecture:

MuxWriter (drpcwire/mux_writer.go):
- Single instance per Manager, shared across all streams
- Dedicated goroutine continuously drains buffered frames to transport
- Non-blocking WriteFrame: appends to buffer under lock, signals goroutine
- Double-buffer swap (buf/spare) minimizes time spent under lock

Stream changes (drpcstream/stream.go):
- wr field: *drpcwire.Writer -> *drpcwire.MuxWriter
- Removed: ManualFlush option, RawFlush, rawFlushLocked, checkRecvFlush
- sendPacketLocked/rawWriteLocked: WriteFrame only, no Flush

Manager integration (drpcmanager/manager.go):
- Creates MuxWriter with onError: m.terminate
- terminate(): wr.Stop() THEN tr.Close() — Stop makes WriteFrame reject
  immediately; transport close unblocks any in-flight Write in the drain
  goroutine
- Close(): <-wr.Done() to wait for drain goroutine exit

Key Design Decisions:

1. No explicit flush needed.
The drain goroutine continuously pulls from the buffer. Natural batching
occurs because appends accumulate while the goroutine is mid-Write. The
old cork pattern (delay flush until first recv) is unnecessary — appending
is a memcpy under lock, and the goroutine controls when transport I/O
happens.

2. sync.Cond over channels.
Signal coalescing: multiple WriteFrame calls while run() is in Write
produce a single wakeup. No allocation overhead. Stop uses closed bool +
Broadcast. Consistent with packetQueue.

3. Two-phase shutdown (Stop/Done split to avoid deadlock).
Stop() is non-blocking: sets closed, Broadcast, returns immediately.
Done() returns a channel that closes when run() exits. This split is
critical for the onError path: run() -> Write fails -> sets closed ->
onError -> terminate -> Stop (finds closed=true, noop) -> run() returns.
If Stop blocked until run() exited, this path would self-join.

4. run() owns its lifecycle on write failure.
When Write fails, run() sets closed=true itself before calling onError.
The subsequent onError -> terminate -> Stop path finds closed already set.
No coordination needed; the flag is idempotent.

5. No per-stream FrameWriter wrapper.
Initially considered a per-stream FrameWriter wrapping *MuxWriter, but
the only value was a closed check before append. That check lives in
MuxWriter.WriteFrame directly. Streams hold *MuxWriter and call
WriteFrame.

What this unlocks:
- Concurrent multiplexing: streams no longer serialize on writes
- Simplified stream: all flush/cork complexity removed
- Natural batching from continuous drain
- Direct error propagation: transport write failures fire manager
  termination via onError callback

Breaking changes:
- drpcstream.Options.ManualFlush removed
- Stream.RawFlush(), SetManualFlush() removed
- Stream constructor: *drpcwire.Writer -> *drpcwire.MuxWriter

Test coverage: 8 concurrency tests for MuxWriter covering concurrent
WriteFrame, write errors, onError->Stop deadlock path, blocked Write
unblocked by Close, concurrent Stop, abort semantics (Stop discards
buffered data), and write-during-active-drain. A data race in the
initial implementation (reading buf capacity without lock) was caught
by these tests and fixed.
Fix a race condition where a unary RPC with a cancelled context could
return io.EOF instead of codes.Canceled. Two changes, mirroring how
gRPC handles this:

1. Early ctx.Err() check in NewClientStream before creating the stream.
2. Deferred stream.CheckCancelError in doInvoke to convert io.EOF to
   the cancel error if the stream was cancelled mid-operation.

The problem:

With multiplexing, each stream gets a manageStream goroutine that
watches ctx.Done() and calls SendCancel + Cancel when the context is
cancelled. This races with doInvoke, which writes invoke and message
frames through the same stream. The race has three outcomes depending
on who acquires the stream's write lock first:

  1. doInvoke wins the lock, completes all writes, and the RPC succeeds
     even though it should have been cancelled.
  2. SendCancel wins, sets send=io.EOF before doInvoke runs.
     rawWriteLocked sees send.IsSet() and returns io.EOF. Invoke's
     ToRPCErr passes io.EOF through unchanged, so the caller gets the
     wrong error code.
  3. doInvoke finishes writes, then MsgRecv sees the cancellation and
     returns codes.Canceled. This is the correct outcome but only
     happens by luck of timing.

Why this didn't happen before multiplexing:

The old single-stream manager used a non-blocking SendCancel that
returned (busy=true) when the write lock was held by an in-progress
write. With SoftCancel=false (the default), the fallback path was:

  manageStream calls stream.Cancel(ctx.Err()). The stream is not
  finished because doInvoke holds the write lock, so the manager calls
  m.terminate(), which closes the entire transport. The in-flight
  Writer.Write() fails with an IO error, and checkCancelError sees
  cancel.IsSet() and returns context.Canceled.

The correct error surfaced, but through connection termination. This was
fine in single-stream mode where one stream is one connection. With
multiplexing, we cannot terminate the entire connection for one stream's
cancellation. The new SendCancel blocks on the write lock to guarantee
the cancel frame is sent, and that introduced this race.

How gRPC handles this (verified against grpc-go source):

gRPC uses two mechanisms. First, newAttemptLocked (stream.go:408) checks
cs.ctx.Err() before creating the transport stream. This catches the
already-cancelled case without allocating resources. Second, for unary
RPCs, csAttempt.sendMsg (stream.go:1092) swallows write errors and
returns nil when !cs.desc.ClientStreams. The real error always surfaces
from RecvMsg, which detects context cancellation via
recvBufferReader.readClient (transport.go:239) and returns
status.Error(codes.Canceled, ...). This means gRPC never returns io.EOF
from a unary RPC because it never short-circuits on a send error.

For streaming RPCs, gRPC returns io.EOF from Send() after cancel (the
stream is done for writing) and codes.Canceled from Recv() (the actual
reason). Our grpccompat tests confirm this by comparing gRPC and DRPC
error results for identical cancel scenarios.

Our fix:

Rather than restructuring doInvoke to swallow send errors like gRPC, we
use the stream's existing CheckCancelError mechanism.

NewClientStream checks ctx.Err() before creating the stream. This
mirrors gRPC's newAttemptLocked check and avoids wasting a stream ID,
spawning a goroutine, and allocating stream resources.

doInvoke defers stream.CheckCancelError on its return value. If any
operation in doInvoke fails because SendCancel won the write lock race
(returning io.EOF via the send signal), CheckCancelError replaces it
with the cancel signal's error (context.Canceled). This is the same
function the stream already uses internally for transport write
failures. CheckCancelError is exported (was checkCancelError) so that
doInvoke in the drpcconn package can call it.

On TOCTOU:

The NewClientStream check is technically TOCTOU: the context could be
cancelled immediately after the check passes. This is acceptable because
Go's context cancellation model is cooperative, not preemptive. The
context package provides Done() "for use in select statements," and
operations check at natural boundaries rather than continuously. The
standard library follows this pattern: http.Client.Do checks between
redirect hops, database/sql checks before query execution, and gRPC
checks in newAttemptLocked before creating the transport stream. If the
context is cancelled mid-operation, manageStream handles cleanup and the
deferred CheckCancelError corrects the error code.
@shubhamdhama
Copy link
Copy Markdown
Author

I had an idea which I ran through claude and below is the summary of it. I'm not planning to do it but in future we can re-consider if profiling shows any gain.

Buffer pool: further optimization ideas

Right now the data copy chain for an incoming message looks like this:

  1. PacketAssembler.AppendFrame: copies frame data into pa.pk.Data via append
  2. ringBuffer.Enqueue: copies pkt.Data into a pooled buffer via append
  3. RawRecv copies the pooled buffer out, or MsgRecv unmarshals from it

We could eliminate copy #2 by having the packet assembler get its buffer from the pool directly. The assembler already has a TODO for buffer reuse. Instead of reusing its own backing array across packets (lines 84-87), it would pool.Get a buffer, assemble into it, hand it off through the ring buffer, and pool.Get a fresh one for the next packet.

Another idea: size-bucketed pools (e.g. 1KiB, 16KiB, 32KiB) so that Enqueue's append doesn't have to reallocate when messages are larger than the default capacity. You could even have a pool.Append(buf, data...) method that detects when the buffer needs to grow and fetches from the right bucket.

I think we should keep the pool simple for now. sync.Pool already gives you natural high-water-mark behavior: a buffer that grew to 32KiB stays at 32KiB when returned, so after warm-up the pool self-tunes to the workload's size distribution. Size buckets would add real maintenance cost (choosing boundaries, handling cross-bucket transitions) for a gain that append + sync.Pool already provides. Latency here is dominated by network IO anyway, not memcpy.

The assembler integration is the more interesting optimization since it removes a full copy per message. Worth revisiting once we have benchmarks to measure the actual impact.

shubhamdhama and others added 2 commits April 17, 2026 20:25
…xWriter

When the Manager terminates, the actual error (e.g. "connection closed")
now flows through activeStreams.Close and MuxWriter.Stop instead of being
replaced by generic messages like "add to closed collection". This lets
callers such as cockroach's IsClosedConnection recognize shutdown errors.

MuxWriter.Stop now accepts an error parameter. Whichever of Stop
(external, from terminate) or run (internal, on write failure) fires
first stores its error; the other is a no-op. The EOF to Canceled
rewrite in terminate is moved before Stop so both MuxWriter and
activeStreams see the rewritten error.

Also removes TestingStopWait, inlining Stop + <-Done at call sites.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@shubhamdhama shubhamdhama force-pushed the shubham/enable-stream-multiplexing branch from a17330d to b91bf1b Compare April 17, 2026 14:57
@shubhamdhama shubhamdhama force-pushed the shubham/buffer-pool-for-ringbuffer branch from cafa1dc to b3d2355 Compare April 17, 2026 14:57
shubhamdhama and others added 2 commits April 17, 2026 20:29
Rename the type and its methods to better reflect what the structure
actually is: a bounded ring buffer used as a FIFO queue. Put/Get become
Enqueue/Dequeue, and the stream field is renamed from pbuf to recvQueue
since to the consumer it's just a queue.

This is groundwork for a follow-up that introduces a shared buffer pool.
Done() has a TODO noting it will be removed once Dequeue can transfer
buffer ownership directly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a BufferPool backed by sync.Pool that is shared across all streams
within a Manager. The ring buffer now obtains buffers from the pool on
Enqueue and transfers ownership to the caller on Dequeue, which advances
the tail immediately. This removes the two-step Dequeue/Done protocol
and simplifies Close (no longer needs to wait for held buffers).

The pool is a required parameter in the Stream constructor, created once
per Manager and passed to all streams it creates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@shubhamdhama shubhamdhama force-pushed the shubham/enable-stream-multiplexing branch from b91bf1b to a58986c Compare April 17, 2026 15:00
@shubhamdhama shubhamdhama force-pushed the shubham/buffer-pool-for-ringbuffer branch from b3d2355 to 38c84dc Compare April 17, 2026 15:00
Base automatically changed from shubham/enable-stream-multiplexing to stream-multiplexing April 17, 2026 16:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant