Added support of RetryConfig to topic writers#637
Added support of RetryConfig to topic writers#637alex268 wants to merge 4 commits intoydb-platform:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds configurable retry behavior to Topic writers by introducing RetryConfig into writer settings and moving the writer stream lifecycle to the shared retryable-stream infrastructure, along with additional tests around shutdown/close behavior and retry scenarios.
Changes:
- Add
RetryConfigsupport toWriterSettingsand wire it into writer stream creation/retry logic. - Refactor writer implementation to use
TopicRetryableStream-basedWriteSession/WriteStream. - Expand tests: buffer-manager close semantics and integration tests that inject failures and validate retries / written data.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java | Adds RetryConfig to writer settings + builder API and default value. |
| topic/src/main/java/tech/ydb/topic/settings/TopicRetryConfig.java | Introduces default retry config used by writers. |
| topic/src/main/java/tech/ydb/topic/impl/TopicRetryableStream.java | Retryable stream base updated (logging, debug id field). |
| topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java | Refactors writer lifecycle around WriteSession/retryable streams. |
| topic/src/main/java/tech/ydb/topic/write/impl/WriteSession.java | Migrates session logic to TopicRetryableStream and adds writer callbacks. |
| topic/src/main/java/tech/ydb/topic/write/impl/WriteStream.java | New TopicStream wrapper for write RPC + status parsing. |
| topic/src/main/java/tech/ydb/topic/write/impl/WriterQueue.java | Adds close handling, debug-id logging, and seqNo update behavior tweaks. |
| topic/src/main/java/tech/ydb/topic/write/impl/BufferManager.java | Adds close status + unblocking of waiters and closed checks on acquire paths. |
| topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java | Adds debug-id logging for request send ranges. |
| topic/src/main/java/tech/ydb/topic/write/impl/EnqueuedMessage.java | Renames error field from encoding-only to generic problem. |
| topic/src/test/java/tech/ydb/topic/write/impl/BufferManagerTest.java | Adds tests validating acquire behavior when buffer is closed. |
| topic/src/test/java/tech/ydb/topic/TopicWritersIntegrationTest.java | New integration test location + retry/failure-injection scenario validation. |
| topic/src/test/java/tech/ydb/topic/FailableWriterInterceptor.java | New gRPC interceptor for injecting init/ack/send failures in tests. |
| topic/src/test/java/tech/ydb/topic/impl/TopicWritersIntegrationTest.java | Removed (relocated to tech.ydb.topic). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private Message validate(Message message) { | ||
| if (isStopped.get()) { | ||
| if (shutdownFuture.isDone()) { | ||
| throw new RuntimeException("Writer is already stopped"); | ||
| } |
There was a problem hiding this comment.
shutdownFuture.isDone() only becomes true after ListenerImpl.onClose() runs, so after shutdown() is called there is a window where validate() still accepts new messages. This can lead to messages being enqueued/sent after shutdown was requested. Consider setting a separate “closing/stopped” flag at the start of shutdown() (or completing shutdownFuture immediately) and checking that in validate().
| public CompletableFuture<InitResult> init() { | ||
| logger.info("[{}] initImpl called", id); | ||
| if (initResultFutureRef.compareAndSet(null, new CompletableFuture<>())) { | ||
| session = sessionFactory.createNextSession(); | ||
| session.startAndInitialize(); | ||
| } else { | ||
| logger.warn("[{}] Init is called on this writer more than once. Nothing is done", id); | ||
| } | ||
| return initResultFutureRef.get(); | ||
| logger.info("[{}] start called", debugId); | ||
| stream.start(); | ||
| return initFuture; |
There was a problem hiding this comment.
init() unconditionally calls stream.start() every time it is invoked. If init() is called twice, TopicRetryableStream.start() will create an extra stream instance only to immediately close it, and will also spam warnings. Consider guarding init() so it only starts the stream once (e.g., via an AtomicBoolean/CAS or by checking initFuture state) and returns the same future on subsequent calls.
| public void sendWriteRequest() { | ||
| YdbTopic.StreamWriteMessage.WriteRequest.Builder req = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder(); | ||
| if (currentTransaction != null) { | ||
| req.setTx(YdbTopic.TransactionIdentity.newBuilder() | ||
| .setId(currentTransaction.getId()) | ||
| .setSession(currentTransaction.getSessionId())); | ||
| } | ||
|
|
||
| req.setCodec(codecCode); | ||
| req.addAllMessages(messages); | ||
|
|
||
| YdbTopic.StreamWriteMessage.FromClient fromClient = YdbTopic.StreamWriteMessage.FromClient.newBuilder() | ||
| .setWriteRequest(req.build()) | ||
| .build(); | ||
|
|
||
| if (logger.isDebugEnabled()) { | ||
| logger.debug("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) " + | ||
| if (logger.isTraceEnabled()) { | ||
| logger.trace("Predicted request size: {} = {}(request overhead) + {}(all MessageData protos) " + | ||
| "+ {}(message overheads) Actual request size: {} bytes", getCurrentRequestSize(), | ||
| REQUEST_OVERHEAD, messagesPbSize, MESSAGE_OVERHEAD * messages.size(), | ||
| fromClient.getSerializedSize()); | ||
| } | ||
|
|
||
| logger.debug("[{}] write {} messages with seq numbers {}-{}", debugId, messages.size(), | ||
| messages.get(0).getSeqNo(), messages.get(messages.size() - 1).getSeqNo()); | ||
|
|
There was a problem hiding this comment.
sendWriteRequest() logs messages.get(0) / messages.get(messages.size()-1) without guarding for an empty messages list. Since sendWriteRequest() is public, it can be called directly and would throw IndexOutOfBoundsException when empty. Consider making it private (and only calling via flush()), or add an early return / validation when messages.isEmpty().
| public Builder setRetryConfig(RetryConfig config) { | ||
| this.retryConfig = config; | ||
| return this; | ||
| } |
There was a problem hiding this comment.
setRetryConfig accepts null, but WriteSession passes settings.getRetryConfig() into TopicRetryableStream, which will later dereference it (e.g., retryConfig.getStatusRetryPolicy(...)) and NPE. Consider validating config != null here (or defaulting to TopicRetryConfig.FOREVER) to keep the public API safe.
| t1.join(100); | ||
| t2.join(100); | ||
|
|
||
| Assert.assertEquals(2, problems.size()); | ||
| for (Exception ex: problems) { | ||
| Assert.assertTrue("Unexpected " + ex.getClass(), ex instanceof IllegalStateException); |
There was a problem hiding this comment.
Threads are only joined for 100ms, and the test does not assert they actually terminated. On slower/loaded CI this can become flaky (or leave the threads running after the test). Consider asserting thread termination after join (or using a larger timeout / await mechanism) and failing with a clear message if the threads didn’t stop.
| t1.join(100); | ||
| t2.join(100); | ||
|
|
||
| Assert.assertEquals(2, problems.size()); | ||
| for (Exception ex: problems) { | ||
| Assert.assertTrue("Unexpected " + ex.getClass(), ex instanceof IllegalStateException); | ||
| Assert.assertEquals("Writer was closed with status Status{code = TIMEOUT(code=400090)}", ex.getMessage()); |
There was a problem hiding this comment.
Threads are only joined for 100ms, and the test does not assert they actually terminated. On slower/loaded CI this can become flaky (or leave the threads running after the test). Consider asserting thread termination after join (or using a larger timeout / await mechanism) and failing with a clear message if the threads didn’t stop.
| @Override | ||
| protected void onStop() { | ||
| logger.debug("[{}] Session {} onStop called", streamId, sessionId); | ||
| public void onClose(Status status) { | ||
| logger.debug("[{}] Session onStop with status {} called", debugId, status); | ||
| listener.onClose(status); | ||
| if (errorsHandler != null && !status.isSuccess()) { | ||
| errorsHandler.accept(status, null); | ||
| } |
There was a problem hiding this comment.
Log message in onClose says "Session onStop ..." which is misleading when debugging lifecycle issues (this method is the terminal close callback). Consider updating the log wording to reflect onClose (and keep onStop for retry/temporary stops).
| public CompletableFuture<Void> shutdown() { | ||
| return shutdownImpl(""); | ||
| stream.close(); | ||
| return shutdownFuture.thenApply(s -> null); | ||
| } |
There was a problem hiding this comment.
shutdown() returns shutdownFuture, but shutdownFuture is only completed from ListenerImpl.onClose(). If the user calls shutdown() before init() (so TopicRetryableStream has no active realStream), stream.close() will not invoke onClose() and shutdownFuture may never complete. Consider completing shutdownFuture in shutdown() when there is no active stream (or making TopicRetryableStream.close() invoke onClose() even when realStream is null).
| @@ -32,11 +32,6 @@ public TopicRetryableStream(Logger logger, String debugId, RetryConfig config, S | |||
| this.scheduler = scheduler; | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
TopicRetryableStream no longer overrides toString(), but start() still logs this on double-start (logger.warn("{} double start...", this)). That will now produce ClassName@hash instead of a useful identifier. Either restore toString() (e.g., include debugId) or change that log to use debugId directly.
| @Override | |
| public String toString() { | |
| return debugId; | |
| } |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #637 +/- ##
============================================
+ Coverage 70.58% 70.62% +0.04%
- Complexity 3291 3296 +5
============================================
Files 372 374 +2
Lines 15640 15686 +46
Branches 1638 1648 +10
============================================
+ Hits 11039 11079 +40
- Misses 3948 3954 +6
Partials 653 653 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
No description provided.