diff --git a/pom.xml b/pom.xml
index 3e12aa3fa..afa7e82e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -801,6 +801,7 @@
src/test/java/com/rabbitmq/client/test/server/Permissions.java
src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java
src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java
+ src/test/java/com/rabbitmq/client/test/InboundFrameMax.java
${google-java-format.version}
diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
index d92c6e4a0..b0f564b28 100644
--- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java
+++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java
@@ -1077,7 +1077,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
this.nettyConf.enqueuingTimeout,
connectionTimeout,
socketConf,
- maxInboundMessageBodySize,
+ AMQP.FRAME_MIN_SIZE,
this.automaticRecovery,
recoveryCondition);
}
@@ -1090,7 +1090,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO
isSSL(),
this.shutdownExecutor,
sslContextFactory,
- this.maxInboundMessageBodySize);
+ AMQP.FRAME_MIN_SIZE);
}
}
diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
index 796cce2dd..5d5d714f8 100644
--- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
+++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java
@@ -432,11 +432,8 @@ public void start()
// Inbound payload limit: the smaller of frame_max (less framing
// overhead) and the configured message body cap.
- if (frameMax > 0) {
- _frameHandler.setMaxInboundFramePayloadSize(
- Math.min(this.maxInboundMessageBodySize,
- frameMax - AMQCommand.EMPTY_FRAME_SIZE + 1));
- }
+ _frameHandler.setFrameMax(
+ Math.min(this.maxInboundMessageBodySize, frameMax));
int negotiatedHeartbeat =
negotiatedMaxValue(this.requestedHeartbeat,
diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java
index 497bb2138..8e6b9aa37 100644
--- a/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java
+++ b/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java
@@ -25,13 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory
protected final int connectionTimeout;
protected final SocketConfigurator configurator;
protected final boolean ssl;
- protected final int maxInboundMessageBodySize;
+ protected final int frameMax;
protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator,
- boolean ssl, int maxInboundMessageBodySize) {
+ boolean ssl, int frameMax) {
this.connectionTimeout = connectionTimeout;
this.configurator = configurator;
this.ssl = ssl;
- this.maxInboundMessageBodySize = maxInboundMessageBodySize;
+ this.frameMax = frameMax;
}
}
diff --git a/src/main/java/com/rabbitmq/client/impl/Frame.java b/src/main/java/com/rabbitmq/client/impl/Frame.java
index 02ff0dc5a..970ed8189 100644
--- a/src/main/java/com/rabbitmq/client/impl/Frame.java
+++ b/src/main/java/com/rabbitmq/client/impl/Frame.java
@@ -27,6 +27,8 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.function.IntSupplier;
+
import static java.lang.String.format;
/**
@@ -104,7 +106,7 @@ public static Frame fromBodyFragment(int channelNumber, ByteBuffer body, int off
*
* @return a new Frame if we read a frame successfully, otherwise null
*/
- public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException {
+ public static Frame readFrom(DataInputStream is, IntSupplier payloadLimit) throws IOException {
int type;
int channel;
@@ -128,16 +130,9 @@ public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOEx
}
channel = is.readUnsignedShort();
- int payloadSize = is.readInt();
- if (payloadSize < 0 || payloadSize >= maxPayloadSize) {
- throw new MalformedFrameException(format(
- "Frame body size is invalid (%d), maximum configured size is %d. " +
- "See ConnectionFactory#setMaxInboundMessageBodySize " +
- "if you need to increase the limit.",
- payloadSize, maxPayloadSize
- ));
- }
- byte[] payload = new byte[payloadSize];
+ int frameSize = is.readInt();
+ Utils.enforceFrameMax(frameSize, payloadLimit.getAsInt());
+ byte[] payload = new byte[frameSize];
is.readFully(payload);
int frameEndMarker = is.readUnsignedByte();
diff --git a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java
index 9b6b270da..ce7e8bbfb 100644
--- a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java
+++ b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java
@@ -60,7 +60,7 @@ default void finishConnectionNegotiation() {
}
/** Cap inbound frame payloads, applied once frame_max is negotiated. */
- default void setMaxInboundFramePayloadSize(int maxPayloadSize) {
+ default void setFrameMax(int frameMax) {
}
diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java
index ae74bad15..1a18ff12c 100644
--- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java
+++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java
@@ -15,7 +15,6 @@
// info@rabbitmq.com.
package com.rabbitmq.client.impl;
-import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -32,6 +31,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
@@ -89,10 +89,10 @@ public NettyFrameHandlerFactory(
Duration enqueuingTimeout,
int connectionTimeout,
SocketConfigurator configurator,
- int maxInboundMessageBodySize,
+ int frameMax,
boolean automaticRecovery,
Predicate recoveryCondition) {
- super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize);
+ super(connectionTimeout, configurator, sslContextFactory != null, frameMax);
this.eventLoopGroup = eventLoopGroup;
this.sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory;
this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer;
@@ -151,7 +151,7 @@ private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGro
public FrameHandler create(Address addr, String connectionName) throws IOException {
SslContext sslContext = this.sslContextFactory.apply(connectionName);
return new NettyFrameHandler(
- this.maxInboundMessageBodySize,
+ this.frameMax,
addr,
sslContext,
this.eventLoopGroup,
@@ -191,7 +191,7 @@ private static final class NettyFrameHandler implements FrameHandler {
private final int zeroCopyThreshold = 1024;
private NettyFrameHandler(
- int maxInboundMessageBodySize,
+ int frameMax,
Address addr,
SslContext sslContext,
EventLoopGroup elg,
@@ -232,13 +232,7 @@ private NettyFrameHandler(
b.option(ChannelOption.ALLOCATOR, Utils.byteBufAllocator());
}
- // type + channel + payload size + payload + frame end marker
- int maxFrameLength = 1 + 2 + 4 + maxInboundMessageBodySize + 1;
- int lengthFieldOffset = 3;
- int lengthFieldLength = 4;
- int lengthAdjustement = 1;
- AmqpHandler amqpHandler =
- new AmqpHandler(maxInboundMessageBodySize, this::close, willRecover);
+ AmqpHandler amqpHandler = new AmqpHandler(frameMax, this::close, willRecover);
int port = ConnectionFactory.portOrDefault(addr.getPort(), sslContext != null);
b.handler(
new ChannelInitializer() {
@@ -251,15 +245,7 @@ public void initChannel(SocketChannel ch) {
FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true));
ch.pipeline()
.addLast(HANDLER_PROTOCOL_VERSION_MISMATCH, new ProtocolVersionMismatchHandler());
- ch.pipeline()
- .addLast(
- HANDLER_FRAME_DECODER,
- new LengthFieldBasedFrameDecoder(
- maxFrameLength,
- lengthFieldOffset,
- lengthFieldLength,
- lengthAdjustement,
- 0));
+ ch.pipeline().addLast(HANDLER_FRAME_DECODER, createFrameDecoder(frameMax));
ch.pipeline().addLast(AmqpHandler.class.getSimpleName(), amqpHandler);
if (sslContext != null) {
SslHandler sslHandler = sslContext.newHandler(ch.alloc(), addr.getHost(), port);
@@ -352,8 +338,11 @@ public void finishConnectionNegotiation() {
}
@Override
- public void setMaxInboundFramePayloadSize(int maxPayloadSize) {
- this.handler.maxPayloadSize = maxPayloadSize;
+ public void setFrameMax(int frameMax) {
+ this.channel
+ .pipeline()
+ .replace(HANDLER_FRAME_DECODER, HANDLER_FRAME_DECODER, createFrameDecoder(frameMax));
+ this.handler.setFrameMax(frameMax);
}
@Override
@@ -507,11 +496,19 @@ InetSocketAddress maybeInetSocketAddress(SocketAddress socketAddress) {
return null;
}
}
+
+ private ChannelHandler createFrameDecoder(int frameMax) {
+ int lengthFieldOffset = 3;
+ int lengthFieldLength = 4;
+ int lengthAdjustement = 1; // frame-end byte
+ return new LengthFieldBasedFrameDecoder(
+ frameMax, lengthFieldOffset, lengthFieldLength, lengthAdjustement, 0);
+ }
}
private static class AmqpHandler extends ChannelInboundHandlerAdapter {
- private volatile int maxPayloadSize;
+ private volatile int framePayloadLimit;
private final Runnable closeSequence;
private final Predicate willRecover;
private volatile AMQConnection connection;
@@ -524,15 +521,20 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
private final String id;
private AmqpHandler(
- int maxPayloadSize,
- Runnable closeSequence,
- Predicate willRecover) {
- this.maxPayloadSize = maxPayloadSize;
+ int frameMax, Runnable closeSequence, Predicate willRecover) {
+ this.setFrameMax(frameMax);
this.closeSequence = closeSequence;
this.willRecover = willRecover;
this.id = "amqp-handler-" + SEQUENCE.getAndIncrement();
}
+ private void setFrameMax(int frameMax) {
+ if (frameMax > 0 && frameMax < AMQP.FRAME_MIN_SIZE) {
+ frameMax = AMQP.FRAME_MIN_SIZE;
+ }
+ this.framePayloadLimit = Utils.framePayloadLimit(frameMax);
+ }
+
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ch = ctx.channel();
@@ -545,17 +547,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
try {
int type = m.readUnsignedByte();
int channel = m.readUnsignedShort();
- int payloadSize = m.readInt();
- if (payloadSize < 0 || payloadSize >= maxPayloadSize) {
- throw new MalformedFrameException(
- format(
- "Frame body size is invalid (%d), maximum configured size is %d. "
- + "See ConnectionFactory#setMaxInboundMessageBodySize "
- + "if you need to increase the limit.",
- payloadSize, maxPayloadSize));
- }
+ int frameSize = m.readInt();
+ Utils.enforceFrameMax(frameSize, this.framePayloadLimit);
- byte[] payload = new byte[payloadSize];
+ byte[] payload = new byte[frameSize];
m.readBytes(payload);
int frameEndMarker = m.readUnsignedByte();
diff --git a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
index 407c8505c..0fef5d0a6 100644
--- a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
+++ b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java
@@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.IntSupplier;
/**
* A socket-based frame handler.
@@ -57,7 +58,8 @@ public class SocketFrameHandler implements FrameHandler {
private final DataOutputStream _outputStream;
private final Lock _outputStreamLock = new ReentrantLock();
- private volatile int maxInboundMessageBodySize;
+ private volatile int framePayloadLimit;
+ private final IntSupplier payloadLimitSupplier;
/** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1;
@@ -73,13 +75,14 @@ public SocketFrameHandler(Socket socket) throws IOException {
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor,
- int maxInboundMessageBodySize) throws IOException {
+ int frameMax) throws IOException {
_socket = socket;
_shutdownExecutor = shutdownExecutor;
- this.maxInboundMessageBodySize = maxInboundMessageBodySize;
+ this.setFrameMax(frameMax);
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+ this.payloadLimitSupplier = () -> this.framePayloadLimit;
}
@Override
@@ -194,15 +197,19 @@ public void initialize(AMQConnection connection) {
}
@Override
- public void setMaxInboundFramePayloadSize(int maxPayloadSize) {
- this.maxInboundMessageBodySize = maxPayloadSize;
+ public void setFrameMax(int frameMax) {
+ this.framePayloadLimit = Utils.framePayloadLimit(frameMax);
}
@Override
public Frame readFrame() throws IOException {
_inputStreamLock.lock();
try {
- return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize);
+ // we need to check frameMax against the latest value, hence the supplier
+ // otherwise we can start waiting for a new frame with the current limit
+ // and the frameMax is changed while we wait, so the next frame is checked against
+ // a stale value
+ return Frame.readFrom(_inputStream, this.payloadLimitSupplier);
} finally {
_inputStreamLock.unlock();
}
diff --git a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java
index 7a983c9fd..56de9a417 100644
--- a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java
+++ b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java
@@ -81,7 +81,7 @@ protected Socket createSocket(String connectionName) throws IOException {
public FrameHandler create(Socket sock) throws IOException
{
- return new SocketFrameHandler(sock, this.shutdownExecutor, this.maxInboundMessageBodySize);
+ return new SocketFrameHandler(sock, this.shutdownExecutor, this.frameMax);
}
private static void quietTrySocketClose(Socket socket) {
diff --git a/src/main/java/com/rabbitmq/client/impl/Utils.java b/src/main/java/com/rabbitmq/client/impl/Utils.java
index 86fd7a5d3..ccbc00430 100644
--- a/src/main/java/com/rabbitmq/client/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/client/impl/Utils.java
@@ -16,6 +16,8 @@
package com.rabbitmq.client.impl;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.MalformedFrameException;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
@@ -23,6 +25,8 @@
import java.util.function.Consumer;
+import static java.lang.String.format;
+
final class Utils {
@SuppressWarnings("rawtypes")
@@ -69,4 +73,29 @@ static ByteBufAllocator byteBufAllocator() {
static Consumer noOpConsumer() {
return (Consumer) NO_OP_CONSUMER;
}
+
+ static int framePayloadLimit(int frameMax) {
+ if (frameMax <= 0) {
+ return Integer.MAX_VALUE;
+ } else if (frameMax < AMQP.FRAME_MIN_SIZE) {
+ return AMQP.FRAME_MIN_SIZE - AMQCommand.EMPTY_FRAME_SIZE;
+ } else {
+ return frameMax - AMQCommand.EMPTY_FRAME_SIZE;
+ }
+ }
+
+ static void enforceFrameMax(int framePayloadSize, int framePayloadLimit) throws MalformedFrameException {
+ if (framePayloadSize < 0 || framePayloadSize > framePayloadLimit) {
+ throw new MalformedFrameException(
+ format(
+ "Frame size is invalid (%d), maximum configured size is %d. "
+ + "See ConnectionFactory#setMaxInboundMessageBodySize "
+ + "if you need to increase the limit.",
+ frameSizeFromPayloadSize(framePayloadSize), frameSizeFromPayloadSize(framePayloadLimit)));
+ }
+ }
+
+ private static int frameSizeFromPayloadSize(int limit) {
+ return limit + AMQCommand.EMPTY_FRAME_SIZE;
+ }
}
diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java
index dd7d54025..cdf1a95f5 100644
--- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java
+++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java
@@ -76,7 +76,8 @@
IoDeadlockOnConnectionClosing.class,
ProtocolVersionMismatch.class,
ByteBufferPublishTest.class,
- PublishWithByteBufferTest.class
+ PublishWithByteBufferTest.class,
+ InboundFrameMax.class,
})
public class ClientTestSuite {
diff --git a/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java b/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java
new file mode 100644
index 000000000..fa296324f
--- /dev/null
+++ b/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java
@@ -0,0 +1,234 @@
+// Copyright (c) 2026 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.client.test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.impl.AMQCommand;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class InboundFrameMax {
+
+ private static final int FRAME_METHOD = 1;
+ private static final int FRAME_END = 206;
+
+ @ParameterizedTest
+ @ValueSource(ints = {8192, 10_000})
+ void oversizedFrameShouldPassWhenEqualToLimit(int frameMax) throws Exception {
+ // just the limit
+ int openOkPayloadSize = frameMax - AMQCommand.EMPTY_FRAME_SIZE;
+ doEnforceInboundFrameMax(frameMax, openOkPayloadSize, false);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {8192, 10_000})
+ void oversizedFrameShouldNotPassWhenAboveLimit(int frameMax) throws Exception {
+ // just above the limit
+ int openOkPayloadSize = frameMax - AMQCommand.EMPTY_FRAME_SIZE + 1;
+ doEnforceInboundFrameMax(frameMax, openOkPayloadSize, true);
+ }
+
+ private void doEnforceInboundFrameMax(int frameMax, int openOkPayloadSize, boolean shouldFail)
+ throws Exception {
+
+ CountDownLatch serverDone = new CountDownLatch(1);
+ AtomicReference serverError = new AtomicReference<>();
+
+ try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"))) {
+ int port = server.getLocalPort();
+ Thread peer =
+ new Thread(
+ () -> runFakeBroker(server, serverDone, serverError, frameMax, openOkPayloadSize),
+ "fake-amqp-broker");
+ peer.setDaemon(true);
+ peer.start();
+
+ ConnectionFactory factory = TestUtils.connectionFactory();
+ factory.setHost("127.0.0.1");
+ factory.setPort(port);
+ factory.setRequestedFrameMax(frameMax);
+ factory.setHandshakeTimeout(5000);
+ factory.setConnectionTimeout(5000);
+ factory.setRequestedHeartbeat(0);
+
+ if (shouldFail) {
+ assertThatThrownBy(() -> factory.newConnection()).isInstanceOf(IOException.class);
+ } else {
+ try (Connection connection = factory.newConnection()) {
+ assertThat(connection.getFrameMax()).isEqualTo(frameMax);
+ }
+ }
+ }
+ assertThat(serverDone.await(5, TimeUnit.SECONDS)).isTrue();
+ if (!shouldFail) {
+ assertThat(serverError.get()).isNull();
+ }
+ }
+
+ private static void runFakeBroker(
+ ServerSocket server,
+ CountDownLatch done,
+ AtomicReference error,
+ int frameMax,
+ int openOkPayloadSize) {
+ try (Socket socket = server.accept()) {
+ socket.setSoTimeout(5000);
+ DataInputStream in = new DataInputStream(socket.getInputStream());
+ DataOutputStream out = new DataOutputStream(socket.getOutputStream());
+
+ byte[] header = new byte[8];
+ in.readFully(header);
+ writeMethodFrame(out, startPayload());
+ readFrame(in);
+ writeMethodFrame(out, tunePayload(frameMax));
+ readFrame(in);
+ readFrame(in);
+ writeOversizedOpenOk(out, openOkPayloadSize);
+ readFrame(in);
+ writeMethodFrame(out, closeOkPayload());
+ done.countDown();
+ } catch (Throwable t) {
+ error.set(t);
+ done.countDown();
+ }
+ }
+
+ private static byte[] startPayload() {
+ Buffer b = new Buffer();
+ b.u16(10).u16(10);
+ b.u8(0).u8(9);
+ b.u32(0);
+ b.longstr("PLAIN");
+ b.longstr("en_US");
+ return b.bytes();
+ }
+
+ private static byte[] tunePayload(int frameMax) {
+ Buffer b = new Buffer();
+ b.u16(10).u16(30);
+ b.u16(0);
+ b.u32(frameMax);
+ b.u16(0);
+ return b.bytes();
+ }
+
+ private static byte[] closeOkPayload() {
+ Buffer b = new Buffer();
+ b.u16(10).u16(51);
+ return b.bytes();
+ }
+
+ private static void writeOversizedOpenOk(DataOutputStream out, int size) throws Exception {
+ byte[] payload = new byte[size];
+ payload[0] = 0;
+ payload[1] = 10;
+ payload[2] = 0;
+ payload[3] = 41;
+ payload[4] = 0;
+ writeFrame(out, FRAME_METHOD, 0, payload);
+ }
+
+ private static void writeMethodFrame(DataOutputStream out, byte[] payload) throws Exception {
+ writeFrame(out, FRAME_METHOD, 0, payload);
+ }
+
+ private static void writeFrame(DataOutputStream out, int type, int channel, byte[] payload)
+ throws Exception {
+ out.writeByte(type);
+ out.writeShort(channel);
+ out.writeInt(payload.length);
+ out.write(payload);
+ out.writeByte(FRAME_END);
+ out.flush();
+ }
+
+ private static void readFrame(DataInputStream in) throws Exception {
+ in.readUnsignedByte();
+ in.readUnsignedShort();
+ int size = in.readInt();
+ byte[] payload = new byte[size];
+ in.readFully(payload);
+ int end = in.readUnsignedByte();
+ if (end != FRAME_END) {
+ throw new AssertionError("bad client frame end: " + end);
+ }
+ }
+
+ private static final class Buffer {
+ private byte[] data = new byte[64];
+ private int size;
+
+ Buffer u8(int value) {
+ ensure(1);
+ data[size++] = (byte) value;
+ return this;
+ }
+
+ Buffer u16(int value) {
+ ensure(2);
+ data[size++] = (byte) (value >>> 8);
+ data[size++] = (byte) value;
+ return this;
+ }
+
+ Buffer u32(int value) {
+ ensure(4);
+ data[size++] = (byte) (value >>> 24);
+ data[size++] = (byte) (value >>> 16);
+ data[size++] = (byte) (value >>> 8);
+ data[size++] = (byte) value;
+ return this;
+ }
+
+ Buffer longstr(String value) {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ u32(bytes.length);
+ ensure(bytes.length);
+ System.arraycopy(bytes, 0, data, size, bytes.length);
+ size += bytes.length;
+ return this;
+ }
+
+ byte[] bytes() {
+ byte[] copy = new byte[size];
+ System.arraycopy(data, 0, copy, 0, size);
+ return copy;
+ }
+
+ private void ensure(int count) {
+ if (size + count <= data.length) {
+ return;
+ }
+ byte[] next = new byte[Math.max(data.length * 2, size + count)];
+ System.arraycopy(data, 0, next, 0, size);
+ data = next;
+ }
+ }
+}
diff --git a/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java b/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java
index f11661f0f..f04b4d73d 100644
--- a/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java
+++ b/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java
@@ -69,7 +69,7 @@ private static Frame readSingleFrame(int payloadSize) throws Exception {
out.flush();
SocketFrameHandler handler = new SocketFrameHandler(client);
- handler.setMaxInboundFramePayloadSize(INBOUND_PAYLOAD_LIMIT);
+ handler.setFrameMax(INBOUND_PAYLOAD_LIMIT);
return handler.readFrame();
}
}
diff --git a/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java b/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java
index 86b9d2d83..e69461852 100644
--- a/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java
+++ b/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java
@@ -49,7 +49,7 @@ public static Object[] data() {
@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = super.newConnectionFactory();
- connectionFactory.setNetworkRecoveryInterval(2);
+ connectionFactory.setNetworkRecoveryInterval(5);
connectionFactory.setTopologyRecoveryRetryHandler(
TopologyRecoveryRetryHandlerBuilder.builder()
.queueRecoveryRetryCondition(