From 6fe2debff23175ce3f7ede6a48fea60abc23d480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 23 Jun 2026 15:46:00 +0200 Subject: [PATCH] Enforce inbound frame max more strictly Follow-up of #1994 --- pom.xml | 1 + .../rabbitmq/client/ConnectionFactory.java | 4 +- .../rabbitmq/client/impl/AMQConnection.java | 7 +- .../impl/AbstractFrameHandlerFactory.java | 6 +- .../java/com/rabbitmq/client/impl/Frame.java | 17 +- .../rabbitmq/client/impl/FrameHandler.java | 2 +- .../client/impl/NettyFrameHandlerFactory.java | 71 +++--- .../client/impl/SocketFrameHandler.java | 19 +- .../impl/SocketFrameHandlerFactory.java | 2 +- .../java/com/rabbitmq/client/impl/Utils.java | 29 +++ .../rabbitmq/client/test/ClientTestSuite.java | 3 +- .../rabbitmq/client/test/InboundFrameMax.java | 234 ++++++++++++++++++ .../test/NegotiatedFrameMaxInboundTest.java | 2 +- .../client/test/RpcTopologyRecordingTest.java | 2 +- 14 files changed, 329 insertions(+), 70 deletions(-) create mode 100644 src/test/java/com/rabbitmq/client/test/InboundFrameMax.java diff --git a/pom.xml b/pom.xml index 3e12aa3fa2..afa7e82e37 100644 --- a/pom.xml +++ b/pom.xml @@ -801,6 +801,7 @@ src/test/java/com/rabbitmq/client/test/server/Permissions.java src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java + src/test/java/com/rabbitmq/client/test/InboundFrameMax.java ${google-java-format.version} diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index d92c6e4a0a..b0f564b283 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -1077,7 +1077,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO this.nettyConf.enqueuingTimeout, connectionTimeout, socketConf, - maxInboundMessageBodySize, + AMQP.FRAME_MIN_SIZE, this.automaticRecovery, recoveryCondition); } @@ -1090,7 +1090,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IO isSSL(), this.shutdownExecutor, sslContextFactory, - this.maxInboundMessageBodySize); + AMQP.FRAME_MIN_SIZE); } } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java index 796cce2ddc..5d5d714f8f 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java @@ -432,11 +432,8 @@ public void start() // Inbound payload limit: the smaller of frame_max (less framing // overhead) and the configured message body cap. - if (frameMax > 0) { - _frameHandler.setMaxInboundFramePayloadSize( - Math.min(this.maxInboundMessageBodySize, - frameMax - AMQCommand.EMPTY_FRAME_SIZE + 1)); - } + _frameHandler.setFrameMax( + Math.min(this.maxInboundMessageBodySize, frameMax)); int negotiatedHeartbeat = negotiatedMaxValue(this.requestedHeartbeat, diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java index 497bb2138e..8e6b9aa372 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractFrameHandlerFactory.java @@ -25,13 +25,13 @@ public abstract class AbstractFrameHandlerFactory implements FrameHandlerFactory protected final int connectionTimeout; protected final SocketConfigurator configurator; protected final boolean ssl; - protected final int maxInboundMessageBodySize; + protected final int frameMax; protected AbstractFrameHandlerFactory(int connectionTimeout, SocketConfigurator configurator, - boolean ssl, int maxInboundMessageBodySize) { + boolean ssl, int frameMax) { this.connectionTimeout = connectionTimeout; this.configurator = configurator; this.ssl = ssl; - this.maxInboundMessageBodySize = maxInboundMessageBodySize; + this.frameMax = frameMax; } } diff --git a/src/main/java/com/rabbitmq/client/impl/Frame.java b/src/main/java/com/rabbitmq/client/impl/Frame.java index 02ff0dc5ad..970ed81891 100644 --- a/src/main/java/com/rabbitmq/client/impl/Frame.java +++ b/src/main/java/com/rabbitmq/client/impl/Frame.java @@ -27,6 +27,8 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.function.IntSupplier; + import static java.lang.String.format; /** @@ -104,7 +106,7 @@ public static Frame fromBodyFragment(int channelNumber, ByteBuffer body, int off * * @return a new Frame if we read a frame successfully, otherwise null */ - public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOException { + public static Frame readFrom(DataInputStream is, IntSupplier payloadLimit) throws IOException { int type; int channel; @@ -128,16 +130,9 @@ public static Frame readFrom(DataInputStream is, int maxPayloadSize) throws IOEx } channel = is.readUnsignedShort(); - int payloadSize = is.readInt(); - if (payloadSize < 0 || payloadSize >= maxPayloadSize) { - throw new MalformedFrameException(format( - "Frame body size is invalid (%d), maximum configured size is %d. " + - "See ConnectionFactory#setMaxInboundMessageBodySize " + - "if you need to increase the limit.", - payloadSize, maxPayloadSize - )); - } - byte[] payload = new byte[payloadSize]; + int frameSize = is.readInt(); + Utils.enforceFrameMax(frameSize, payloadLimit.getAsInt()); + byte[] payload = new byte[frameSize]; is.readFully(payload); int frameEndMarker = is.readUnsignedByte(); diff --git a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java index 9b6b270da0..ce7e8bbfb2 100644 --- a/src/main/java/com/rabbitmq/client/impl/FrameHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/FrameHandler.java @@ -60,7 +60,7 @@ default void finishConnectionNegotiation() { } /** Cap inbound frame payloads, applied once frame_max is negotiated. */ - default void setMaxInboundFramePayloadSize(int maxPayloadSize) { + default void setFrameMax(int frameMax) { } diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index ae74bad153..1a18ff12c7 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -15,7 +15,6 @@ // info@rabbitmq.com. package com.rabbitmq.client.impl; -import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -32,6 +31,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; @@ -89,10 +89,10 @@ public NettyFrameHandlerFactory( Duration enqueuingTimeout, int connectionTimeout, SocketConfigurator configurator, - int maxInboundMessageBodySize, + int frameMax, boolean automaticRecovery, Predicate recoveryCondition) { - super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize); + super(connectionTimeout, configurator, sslContextFactory != null, frameMax); this.eventLoopGroup = eventLoopGroup; this.sslContextFactory = sslContextFactory == null ? connName -> null : sslContextFactory; this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer; @@ -151,7 +151,7 @@ private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGro public FrameHandler create(Address addr, String connectionName) throws IOException { SslContext sslContext = this.sslContextFactory.apply(connectionName); return new NettyFrameHandler( - this.maxInboundMessageBodySize, + this.frameMax, addr, sslContext, this.eventLoopGroup, @@ -191,7 +191,7 @@ private static final class NettyFrameHandler implements FrameHandler { private final int zeroCopyThreshold = 1024; private NettyFrameHandler( - int maxInboundMessageBodySize, + int frameMax, Address addr, SslContext sslContext, EventLoopGroup elg, @@ -232,13 +232,7 @@ private NettyFrameHandler( b.option(ChannelOption.ALLOCATOR, Utils.byteBufAllocator()); } - // type + channel + payload size + payload + frame end marker - int maxFrameLength = 1 + 2 + 4 + maxInboundMessageBodySize + 1; - int lengthFieldOffset = 3; - int lengthFieldLength = 4; - int lengthAdjustement = 1; - AmqpHandler amqpHandler = - new AmqpHandler(maxInboundMessageBodySize, this::close, willRecover); + AmqpHandler amqpHandler = new AmqpHandler(frameMax, this::close, willRecover); int port = ConnectionFactory.portOrDefault(addr.getPort(), sslContext != null); b.handler( new ChannelInitializer() { @@ -251,15 +245,7 @@ public void initChannel(SocketChannel ch) { FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true)); ch.pipeline() .addLast(HANDLER_PROTOCOL_VERSION_MISMATCH, new ProtocolVersionMismatchHandler()); - ch.pipeline() - .addLast( - HANDLER_FRAME_DECODER, - new LengthFieldBasedFrameDecoder( - maxFrameLength, - lengthFieldOffset, - lengthFieldLength, - lengthAdjustement, - 0)); + ch.pipeline().addLast(HANDLER_FRAME_DECODER, createFrameDecoder(frameMax)); ch.pipeline().addLast(AmqpHandler.class.getSimpleName(), amqpHandler); if (sslContext != null) { SslHandler sslHandler = sslContext.newHandler(ch.alloc(), addr.getHost(), port); @@ -352,8 +338,11 @@ public void finishConnectionNegotiation() { } @Override - public void setMaxInboundFramePayloadSize(int maxPayloadSize) { - this.handler.maxPayloadSize = maxPayloadSize; + public void setFrameMax(int frameMax) { + this.channel + .pipeline() + .replace(HANDLER_FRAME_DECODER, HANDLER_FRAME_DECODER, createFrameDecoder(frameMax)); + this.handler.setFrameMax(frameMax); } @Override @@ -507,11 +496,19 @@ InetSocketAddress maybeInetSocketAddress(SocketAddress socketAddress) { return null; } } + + private ChannelHandler createFrameDecoder(int frameMax) { + int lengthFieldOffset = 3; + int lengthFieldLength = 4; + int lengthAdjustement = 1; // frame-end byte + return new LengthFieldBasedFrameDecoder( + frameMax, lengthFieldOffset, lengthFieldLength, lengthAdjustement, 0); + } } private static class AmqpHandler extends ChannelInboundHandlerAdapter { - private volatile int maxPayloadSize; + private volatile int framePayloadLimit; private final Runnable closeSequence; private final Predicate willRecover; private volatile AMQConnection connection; @@ -524,15 +521,20 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter { private final String id; private AmqpHandler( - int maxPayloadSize, - Runnable closeSequence, - Predicate willRecover) { - this.maxPayloadSize = maxPayloadSize; + int frameMax, Runnable closeSequence, Predicate willRecover) { + this.setFrameMax(frameMax); this.closeSequence = closeSequence; this.willRecover = willRecover; this.id = "amqp-handler-" + SEQUENCE.getAndIncrement(); } + private void setFrameMax(int frameMax) { + if (frameMax > 0 && frameMax < AMQP.FRAME_MIN_SIZE) { + frameMax = AMQP.FRAME_MIN_SIZE; + } + this.framePayloadLimit = Utils.framePayloadLimit(frameMax); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ch = ctx.channel(); @@ -545,17 +547,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception try { int type = m.readUnsignedByte(); int channel = m.readUnsignedShort(); - int payloadSize = m.readInt(); - if (payloadSize < 0 || payloadSize >= maxPayloadSize) { - throw new MalformedFrameException( - format( - "Frame body size is invalid (%d), maximum configured size is %d. " - + "See ConnectionFactory#setMaxInboundMessageBodySize " - + "if you need to increase the limit.", - payloadSize, maxPayloadSize)); - } + int frameSize = m.readInt(); + Utils.enforceFrameMax(frameSize, this.framePayloadLimit); - byte[] payload = new byte[payloadSize]; + byte[] payload = new byte[frameSize]; m.readBytes(payload); int frameEndMarker = m.readUnsignedByte(); diff --git a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java index 407c8505c2..0fef5d0a63 100644 --- a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java +++ b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntSupplier; /** * A socket-based frame handler. @@ -57,7 +58,8 @@ public class SocketFrameHandler implements FrameHandler { private final DataOutputStream _outputStream; private final Lock _outputStreamLock = new ReentrantLock(); - private volatile int maxInboundMessageBodySize; + private volatile int framePayloadLimit; + private final IntSupplier payloadLimitSupplier; /** Time to linger before closing the socket forcefully. */ public static final int SOCKET_CLOSING_TIMEOUT = 1; @@ -73,13 +75,14 @@ public SocketFrameHandler(Socket socket) throws IOException { * @param socket the socket to use */ public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor, - int maxInboundMessageBodySize) throws IOException { + int frameMax) throws IOException { _socket = socket; _shutdownExecutor = shutdownExecutor; - this.maxInboundMessageBodySize = maxInboundMessageBodySize; + this.setFrameMax(frameMax); _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); _outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + this.payloadLimitSupplier = () -> this.framePayloadLimit; } @Override @@ -194,15 +197,19 @@ public void initialize(AMQConnection connection) { } @Override - public void setMaxInboundFramePayloadSize(int maxPayloadSize) { - this.maxInboundMessageBodySize = maxPayloadSize; + public void setFrameMax(int frameMax) { + this.framePayloadLimit = Utils.framePayloadLimit(frameMax); } @Override public Frame readFrame() throws IOException { _inputStreamLock.lock(); try { - return Frame.readFrom(_inputStream, this.maxInboundMessageBodySize); + // we need to check frameMax against the latest value, hence the supplier + // otherwise we can start waiting for a new frame with the current limit + // and the frameMax is changed while we wait, so the next frame is checked against + // a stale value + return Frame.readFrom(_inputStream, this.payloadLimitSupplier); } finally { _inputStreamLock.unlock(); } diff --git a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java index 7a983c9fd0..56de9a417c 100644 --- a/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/SocketFrameHandlerFactory.java @@ -81,7 +81,7 @@ protected Socket createSocket(String connectionName) throws IOException { public FrameHandler create(Socket sock) throws IOException { - return new SocketFrameHandler(sock, this.shutdownExecutor, this.maxInboundMessageBodySize); + return new SocketFrameHandler(sock, this.shutdownExecutor, this.frameMax); } private static void quietTrySocketClose(Socket socket) { diff --git a/src/main/java/com/rabbitmq/client/impl/Utils.java b/src/main/java/com/rabbitmq/client/impl/Utils.java index 86fd7a5d3a..ccbc004300 100644 --- a/src/main/java/com/rabbitmq/client/impl/Utils.java +++ b/src/main/java/com/rabbitmq/client/impl/Utils.java @@ -16,6 +16,8 @@ package com.rabbitmq.client.impl; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.MalformedFrameException; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultiThreadIoEventLoopGroup; @@ -23,6 +25,8 @@ import java.util.function.Consumer; +import static java.lang.String.format; + final class Utils { @SuppressWarnings("rawtypes") @@ -69,4 +73,29 @@ static ByteBufAllocator byteBufAllocator() { static Consumer noOpConsumer() { return (Consumer) NO_OP_CONSUMER; } + + static int framePayloadLimit(int frameMax) { + if (frameMax <= 0) { + return Integer.MAX_VALUE; + } else if (frameMax < AMQP.FRAME_MIN_SIZE) { + return AMQP.FRAME_MIN_SIZE - AMQCommand.EMPTY_FRAME_SIZE; + } else { + return frameMax - AMQCommand.EMPTY_FRAME_SIZE; + } + } + + static void enforceFrameMax(int framePayloadSize, int framePayloadLimit) throws MalformedFrameException { + if (framePayloadSize < 0 || framePayloadSize > framePayloadLimit) { + throw new MalformedFrameException( + format( + "Frame size is invalid (%d), maximum configured size is %d. " + + "See ConnectionFactory#setMaxInboundMessageBodySize " + + "if you need to increase the limit.", + frameSizeFromPayloadSize(framePayloadSize), frameSizeFromPayloadSize(framePayloadLimit))); + } + } + + private static int frameSizeFromPayloadSize(int limit) { + return limit + AMQCommand.EMPTY_FRAME_SIZE; + } } diff --git a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java index dd7d54025f..cdf1a95f52 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTestSuite.java @@ -76,7 +76,8 @@ IoDeadlockOnConnectionClosing.class, ProtocolVersionMismatch.class, ByteBufferPublishTest.class, - PublishWithByteBufferTest.class + PublishWithByteBufferTest.class, + InboundFrameMax.class, }) public class ClientTestSuite { diff --git a/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java b/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java new file mode 100644 index 0000000000..fa296324f0 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/InboundFrameMax.java @@ -0,0 +1,234 @@ +// Copyright (c) 2026 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.client.test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.impl.AMQCommand; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class InboundFrameMax { + + private static final int FRAME_METHOD = 1; + private static final int FRAME_END = 206; + + @ParameterizedTest + @ValueSource(ints = {8192, 10_000}) + void oversizedFrameShouldPassWhenEqualToLimit(int frameMax) throws Exception { + // just the limit + int openOkPayloadSize = frameMax - AMQCommand.EMPTY_FRAME_SIZE; + doEnforceInboundFrameMax(frameMax, openOkPayloadSize, false); + } + + @ParameterizedTest + @ValueSource(ints = {8192, 10_000}) + void oversizedFrameShouldNotPassWhenAboveLimit(int frameMax) throws Exception { + // just above the limit + int openOkPayloadSize = frameMax - AMQCommand.EMPTY_FRAME_SIZE + 1; + doEnforceInboundFrameMax(frameMax, openOkPayloadSize, true); + } + + private void doEnforceInboundFrameMax(int frameMax, int openOkPayloadSize, boolean shouldFail) + throws Exception { + + CountDownLatch serverDone = new CountDownLatch(1); + AtomicReference serverError = new AtomicReference<>(); + + try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"))) { + int port = server.getLocalPort(); + Thread peer = + new Thread( + () -> runFakeBroker(server, serverDone, serverError, frameMax, openOkPayloadSize), + "fake-amqp-broker"); + peer.setDaemon(true); + peer.start(); + + ConnectionFactory factory = TestUtils.connectionFactory(); + factory.setHost("127.0.0.1"); + factory.setPort(port); + factory.setRequestedFrameMax(frameMax); + factory.setHandshakeTimeout(5000); + factory.setConnectionTimeout(5000); + factory.setRequestedHeartbeat(0); + + if (shouldFail) { + assertThatThrownBy(() -> factory.newConnection()).isInstanceOf(IOException.class); + } else { + try (Connection connection = factory.newConnection()) { + assertThat(connection.getFrameMax()).isEqualTo(frameMax); + } + } + } + assertThat(serverDone.await(5, TimeUnit.SECONDS)).isTrue(); + if (!shouldFail) { + assertThat(serverError.get()).isNull(); + } + } + + private static void runFakeBroker( + ServerSocket server, + CountDownLatch done, + AtomicReference error, + int frameMax, + int openOkPayloadSize) { + try (Socket socket = server.accept()) { + socket.setSoTimeout(5000); + DataInputStream in = new DataInputStream(socket.getInputStream()); + DataOutputStream out = new DataOutputStream(socket.getOutputStream()); + + byte[] header = new byte[8]; + in.readFully(header); + writeMethodFrame(out, startPayload()); + readFrame(in); + writeMethodFrame(out, tunePayload(frameMax)); + readFrame(in); + readFrame(in); + writeOversizedOpenOk(out, openOkPayloadSize); + readFrame(in); + writeMethodFrame(out, closeOkPayload()); + done.countDown(); + } catch (Throwable t) { + error.set(t); + done.countDown(); + } + } + + private static byte[] startPayload() { + Buffer b = new Buffer(); + b.u16(10).u16(10); + b.u8(0).u8(9); + b.u32(0); + b.longstr("PLAIN"); + b.longstr("en_US"); + return b.bytes(); + } + + private static byte[] tunePayload(int frameMax) { + Buffer b = new Buffer(); + b.u16(10).u16(30); + b.u16(0); + b.u32(frameMax); + b.u16(0); + return b.bytes(); + } + + private static byte[] closeOkPayload() { + Buffer b = new Buffer(); + b.u16(10).u16(51); + return b.bytes(); + } + + private static void writeOversizedOpenOk(DataOutputStream out, int size) throws Exception { + byte[] payload = new byte[size]; + payload[0] = 0; + payload[1] = 10; + payload[2] = 0; + payload[3] = 41; + payload[4] = 0; + writeFrame(out, FRAME_METHOD, 0, payload); + } + + private static void writeMethodFrame(DataOutputStream out, byte[] payload) throws Exception { + writeFrame(out, FRAME_METHOD, 0, payload); + } + + private static void writeFrame(DataOutputStream out, int type, int channel, byte[] payload) + throws Exception { + out.writeByte(type); + out.writeShort(channel); + out.writeInt(payload.length); + out.write(payload); + out.writeByte(FRAME_END); + out.flush(); + } + + private static void readFrame(DataInputStream in) throws Exception { + in.readUnsignedByte(); + in.readUnsignedShort(); + int size = in.readInt(); + byte[] payload = new byte[size]; + in.readFully(payload); + int end = in.readUnsignedByte(); + if (end != FRAME_END) { + throw new AssertionError("bad client frame end: " + end); + } + } + + private static final class Buffer { + private byte[] data = new byte[64]; + private int size; + + Buffer u8(int value) { + ensure(1); + data[size++] = (byte) value; + return this; + } + + Buffer u16(int value) { + ensure(2); + data[size++] = (byte) (value >>> 8); + data[size++] = (byte) value; + return this; + } + + Buffer u32(int value) { + ensure(4); + data[size++] = (byte) (value >>> 24); + data[size++] = (byte) (value >>> 16); + data[size++] = (byte) (value >>> 8); + data[size++] = (byte) value; + return this; + } + + Buffer longstr(String value) { + byte[] bytes = value.getBytes(StandardCharsets.UTF_8); + u32(bytes.length); + ensure(bytes.length); + System.arraycopy(bytes, 0, data, size, bytes.length); + size += bytes.length; + return this; + } + + byte[] bytes() { + byte[] copy = new byte[size]; + System.arraycopy(data, 0, copy, 0, size); + return copy; + } + + private void ensure(int count) { + if (size + count <= data.length) { + return; + } + byte[] next = new byte[Math.max(data.length * 2, size + count)]; + System.arraycopy(data, 0, next, 0, size); + data = next; + } + } +} diff --git a/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java b/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java index f11661f0f6..f04b4d73d1 100644 --- a/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java +++ b/src/test/java/com/rabbitmq/client/test/NegotiatedFrameMaxInboundTest.java @@ -69,7 +69,7 @@ private static Frame readSingleFrame(int payloadSize) throws Exception { out.flush(); SocketFrameHandler handler = new SocketFrameHandler(client); - handler.setMaxInboundFramePayloadSize(INBOUND_PAYLOAD_LIMIT); + handler.setFrameMax(INBOUND_PAYLOAD_LIMIT); return handler.readFrame(); } } diff --git a/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java b/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java index 86b9d2d835..e694618524 100644 --- a/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java +++ b/src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java @@ -49,7 +49,7 @@ public static Object[] data() { @Override protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = super.newConnectionFactory(); - connectionFactory.setNetworkRecoveryInterval(2); + connectionFactory.setNetworkRecoveryInterval(5); connectionFactory.setTopologyRecoveryRetryHandler( TopologyRecoveryRetryHandlerBuilder.builder() .queueRecoveryRetryCondition(