Skip to content

Added support of RetryConfig to topic writers#637

Open
alex268 wants to merge 4 commits intoydb-platform:masterfrom
alex268:master
Open

Added support of RetryConfig to topic writers#637
alex268 wants to merge 4 commits intoydb-platform:masterfrom
alex268:master

Conversation

@alex268
Copy link
Copy Markdown
Member

@alex268 alex268 commented Apr 17, 2026

No description provided.

@alex268 alex268 requested a review from pnv1 April 17, 2026 14:27
@pnv1 pnv1 requested a review from Copilot April 17, 2026 14:31
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds configurable retry behavior to Topic writers by introducing RetryConfig into writer settings and moving the writer stream lifecycle to the shared retryable-stream infrastructure, along with additional tests around shutdown/close behavior and retry scenarios.

Changes:

  • Add RetryConfig support to WriterSettings and wire it into writer stream creation/retry logic.
  • Refactor writer implementation to use TopicRetryableStream-based WriteSession / WriteStream.
  • Expand tests: buffer-manager close semantics and integration tests that inject failures and validate retries / written data.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java Adds RetryConfig to writer settings + builder API and default value.
topic/src/main/java/tech/ydb/topic/settings/TopicRetryConfig.java Introduces default retry config used by writers.
topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java Retryable stream base updated (logging, debug id field).
topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java Refactors writer lifecycle around WriteSession/retryable streams.
topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java Migrates session logic to TopicRetryableStream and adds writer callbacks.
topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java New TopicStream wrapper for write RPC + status parsing.
topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java Adds close handling, debug-id logging, and seqNo update behavior tweaks.
topic/src/main/java/tech/ydb/topic/write/impl/BufferManager.java Adds close status + unblocking of waiters and closed checks on acquire paths.
topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java Adds debug-id logging for request send ranges.
topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java Renames error field from encoding-only to generic problem.
topic/src/test/java/tech/ydb/topic/write/impl/BufferManagerTest.java Adds tests validating acquire behavior when buffer is closed.
topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java New integration test location + retry/failure-injection scenario validation.
topic/src/test/java/tech/ydb/topic/FailableWriterInterceptor.java New gRPC interceptor for injecting init/ack/send failures in tests.
topic/src/test/java/tech/ydb/topic/impl/TopicWritersIntegrationTest.java Removed (relocated to tech.ydb.topic).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 72 to 75
private Message validate(Message message) {
if (isStopped.get()) {
if (shutdownFuture.isDone()) {
throw new RuntimeException("Writer is already stopped");
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownFuture.isDone() only becomes true after ListenerImpl.onClose() runs, so after shutdown() is called there is a window where validate() still accepts new messages. This can lead to messages being enqueued/sent after shutdown was requested. Consider setting a separate “closing/stopped” flag at the start of shutdown() (or completing shutdownFuture immediately) and checking that in validate().

Copilot uses AI. Check for mistakes.
Comment on lines 57 to +60
public CompletableFuture<InitResult> init() {
logger.info("[{}] initImpl called", id);
if (initResultFutureRef.compareAndSet(null, new CompletableFuture<>())) {
session = sessionFactory.createNextSession();
session.startAndInitialize();
} else {
logger.warn("[{}] Init is called on this writer more than once. Nothing is done", id);
}
return initResultFutureRef.get();
logger.info("[{}] start called", debugId);
stream.start();
return initFuture;
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init() unconditionally calls stream.start() every time it is invoked. If init() is called twice, TopicRetryableStream.start() will create an extra stream instance only to immediately close it, and will also spam warnings. Consider guarding init() so it only starts the stream once (e.g., via an AtomicBoolean/CAS or by checking initFuture state) and returns the same future on subsequent calls.

Copilot uses AI. Check for mistakes.
Comment on lines 70 to +94
public void sendWriteRequest() {
YdbTopic.StreamWriteMessage.WriteRequest.Builder req = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder();
if (currentTransaction != null) {
req.setTx(YdbTopic.TransactionIdentity.newBuilder()
.setId(currentTransaction.getId())
.setSession(currentTransaction.getSessionId()));
}

req.setCodec(codecCode);
req.addAllMessages(messages);

YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder()
.setWriteRequest(req.build())
.build();

if (logger.isDebugEnabled()) {
logger.debug("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) " +
if (logger.isTraceEnabled()) {
logger.trace("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) " +
"+ {}(message overheads) Actual request size: {} bytes", getCurrentRequestSize(),
REQUEST_OVERHEAD, messagesPbSize, MESSAGE_OVERHEAD * messages.size(),
fromClient.getSerializedSize());
}

logger.debug("[{}] write {} messages with seq numbers {}-{}", debugId, messages.size(),
messages.get(0).getSeqNo(), messages.get(messages.size() - 1).getSeqNo());

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendWriteRequest() logs messages.get(0) / messages.get(messages.size()-1) without guarding for an empty messages list. Since sendWriteRequest() is public, it can be called directly and would throw IndexOutOfBoundsException when empty. Consider making it private (and only calling via flush()), or add an early return / validation when messages.isEmpty().

Copilot uses AI. Check for mistakes.
Comment on lines +194 to +197
public Builder setRetryConfig(RetryConfig config) {
this.retryConfig = config;
return this;
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setRetryConfig accepts null, but WriteSession passes settings.getRetryConfig() into TopicRetryableStream, which will later dereference it (e.g., retryConfig.getStatusRetryPolicy(...)) and NPE. Consider validating config != null here (or defaulting to TopicRetryConfig.FOREVER) to keep the public API safe.

Copilot uses AI. Check for mistakes.
Comment on lines +271 to +276
t1.join(100);
t2.join(100);

Assert.assertEquals(2, problems.size());
for (Exception ex: problems) {
Assert.assertTrue("Unexpected " + ex.getClass(), ex instanceof IllegalStateException);
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threads are only joined for 100ms, and the test does not assert they actually terminated. On slower/loaded CI this can become flaky (or leave the threads running after the test). Consider asserting thread termination after join (or using a larger timeout / await mechanism) and failing with a clear message if the threads didn’t stop.

Copilot uses AI. Check for mistakes.
Comment on lines +314 to +320
t1.join(100);
t2.join(100);

Assert.assertEquals(2, problems.size());
for (Exception ex: problems) {
Assert.assertTrue("Unexpected " + ex.getClass(), ex instanceof IllegalStateException);
Assert.assertEquals("Writer was closed with status Status{code = TIMEOUT(code=400090)}", ex.getMessage());
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threads are only joined for 100ms, and the test does not assert they actually terminated. On slower/loaded CI this can become flaky (or leave the threads running after the test). Consider asserting thread termination after join (or using a larger timeout / await mechanism) and failing with a clear message if the threads didn’t stop.

Copilot uses AI. Check for mistakes.
Comment on lines 123 to +129
@Override
protected void onStop() {
logger.debug("[{}] Session {} onStop called", streamId, sessionId);
public void onClose(Status status) {
logger.debug("[{}] Session onStop with status {} called", debugId, status);
listener.onClose(status);
if (errorsHandler != null && !status.isSuccess()) {
errorsHandler.accept(status, null);
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log message in onClose says "Session onStop ..." which is misleading when debugging lifecycle issues (this method is the terminal close callback). Consider updating the log wording to reflect onClose (and keep onStop for retry/temporary stops).

Copilot uses AI. Check for mistakes.
Comment on lines 63 to 66
public CompletableFuture<Void> shutdown() {
return shutdownImpl("");
stream.close();
return shutdownFuture.thenApply(s -> null);
}
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() returns shutdownFuture, but shutdownFuture is only completed from ListenerImpl.onClose(). If the user calls shutdown() before init() (so TopicRetryableStream has no active realStream), stream.close() will not invoke onClose() and shutdownFuture may never complete. Consider completing shutdownFuture in shutdown() when there is no active stream (or making TopicRetryableStream.close() invoke onClose() even when realStream is null).

Copilot uses AI. Check for mistakes.
@@ -32,11 +32,6 @@ public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, S
this.scheduler = scheduler;
}

Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicRetryableStream no longer overrides toString(), but start() still logs this on double-start (logger.warn("{} double start...", this)). That will now produce ClassName@hash instead of a useful identifier. Either restore toString() (e.g., include debugId) or change that log to use debugId directly.

Suggested change
@Override
public String toString() {
return debugId;
}

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 73.73737% with 52 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.62%. Comparing base (41369f3) to head (4809411).

Files with missing lines Patch % Lines
...in/java/tech/ydb/topic/write/impl/WriterQueue.java 36.36% 17 Missing and 4 partials ⚠️
...n/java/tech/ydb/topic/write/impl/WriteSession.java 75.00% 8 Missing and 5 partials ⚠️
.../java/tech/ydb/topic/write/impl/BufferManager.java 79.59% 7 Missing and 3 partials ⚠️
...in/java/tech/ydb/topic/write/impl/WriteStream.java 57.14% 3 Missing ⚠️
...n/java/tech/ydb/topic/settings/WriterSettings.java 60.00% 2 Missing ⚠️
.../java/tech/ydb/topic/write/impl/MessageSender.java 66.66% 1 Missing and 1 partial ⚠️
...ain/java/tech/ydb/topic/write/impl/WriterImpl.java 96.96% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master     #637      +/-   ##
============================================
+ Coverage     70.58%   70.62%   +0.04%     
- Complexity     3291     3296       +5     
============================================
  Files           372      374       +2     
  Lines         15640    15686      +46     
  Branches       1638     1648      +10     
============================================
+ Hits          11039    11079      +40     
- Misses         3948     3954       +6     
  Partials        653      653              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants