From 360da991db18eb8c6ecd6abb412ca5b5739d782a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 19 Mar 2026 14:50:49 +0100 Subject: [PATCH 01/26] Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker: prevent double-registration on onRequestQueued and avoid IO-thread deadlock on connection failure retries Add a metrics gauge for available semaphores --- ...SOLR-18174-prevent-double-registration.yml | 8 + .../component/HttpShardHandlerFactory.java | 18 + .../AsyncTrackerSemaphoreLeakTest.java | 564 ++++++++++++++++++ .../solrj/jetty/HttpJettySolrClient.java | 70 ++- 4 files changed, 653 insertions(+), 7 deletions(-) create mode 100644 changelog/unreleased/SOLR-18174-prevent-double-registration.yml create mode 100644 solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java diff --git a/changelog/unreleased/SOLR-18174-prevent-double-registration.yml b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml new file mode 100644 index 000000000000..8000e7230cb4 --- /dev/null +++ b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml @@ -0,0 +1,8 @@ +title: "Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker. Avoid IO-thread deadlock on connection failure retries. Add a new metric gauge solr.http.client.async_permits" +type: fixed +authors: + - name: Jan Høydahl + url: https://home.apache.org/phonebook.html?uid=janhoy +links: + - name: SOLR-18174 + url: https://issues.apache.org/jira/browse/SOLR-18174 diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 6dc30f47b9bb..9bcd3f2c2021 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -19,6 +19,7 @@ import static org.apache.solr.util.stats.InstrumentedHttpListenerFactory.KNOWN_METRIC_NAME_STRATEGIES; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongGauge; import java.lang.invoke.MethodHandles; import java.util.Iterator; import java.util.List; @@ -85,6 +86,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory protected volatile HttpJettySolrClient defaultClient; protected InstrumentedHttpListenerFactory httpListenerFactory; protected LBAsyncSolrClient loadbalancer; + private ObservableLongGauge asyncRequestsGauge; int corePoolSize = 0; int maximumPoolSize = Integer.MAX_VALUE; @@ -352,6 +354,7 @@ public void close() { ExecutorUtil.shutdownAndAwaitTermination(commExecutor); } } + IOUtils.closeQuietly(asyncRequestsGauge); try { SolrMetricProducer.super.close(); } catch (Exception e) { @@ -440,5 +443,20 @@ public void initializeMetrics(SolrMetricsContext parentContext, Attributes attri commExecutor = solrMetricsContext.instrumentedExecutorService( commExecutor, "solr.core.executor", "httpShardExecutor", SolrInfoBean.Category.QUERY); + if (defaultClient != null) { + asyncRequestsGauge = + solrMetricsContext.observableLongGauge( + "solr.http.client.async_permits", + "Outstanding async HTTP request permits in the Jetty SolrJ client" + + " (state=max: configured ceiling; state=available: currently unused permits).", + measurement -> { + measurement.record( + defaultClient.asyncTrackerMaxPermits(), Attributes.of(STATE_KEY_ATTR, "max")); + measurement.record( + defaultClient.asyncTrackerAvailablePermits(), + Attributes.of(STATE_KEY_ATTR, "available")); + }, + null); + } } } diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java new file mode 100644 index 000000000000..417170e60b88 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.Field; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import org.apache.lucene.util.SuppressForbidden; +import org.apache.solr.client.solrj.impl.LBSolrClient; +import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; +import org.apache.solr.client.solrj.jetty.LBJettySolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reproduces the {@link org.apache.solr.client.solrj.impl.LBAsyncSolrClient} semaphore leak that + * causes distributed queries to hang permanently. + * + *

Bug scenario

+ * + *
    + *
  1. A shard HTTP request fails with a connection-level error (not an HTTP-level + * error). Jetty fires the {@code onFailure} response callback directly on the IO selector + * thread. + *
  2. {@link org.apache.solr.client.solrj.jetty.HttpJettySolrClient#requestAsync} completes its + * {@code CompletableFuture} exceptionally from within that {@code onFailure} callback — still + * on the IO thread. + *
  3. {@code LBAsyncSolrClient.doAsyncRequest} registered a {@code whenComplete} on that future. + * Because the future completes on the IO thread, {@code whenComplete} also fires + * synchronously on the IO thread. + *
  4. The {@code whenComplete} action calls {@code doAsyncRequest} again (retry to the next + * endpoint), which eventually calls Jetty's {@code HttpClient.send()}. That triggers the + * {@code AsyncTracker.queuedListener} — which calls {@code semaphore.acquire()} — still on + * the IO thread, before the original request's {@code completeListener.onComplete()} has had + * a chance to call {@code semaphore.release()}. + *
  5. If the semaphore is at zero, {@code acquire()} blocks the IO thread. The blocked + * IO thread cannot execute the {@code completeListener} that would release the original + * permit. The permit is permanently leaked, and the IO thread is permanently stuck. Repeat + * until all permits are exhausted: distributed queries hang forever. + *
+ * + *

Test setup

+ * + *

A raw TCP server accepts {@value #NUM_RETRY_REQUESTS} connections and holds them open until + * all are established (so all semaphore permits are consumed). It then closes all connections + * simultaneously via TCP RST, causing all Jetty {@code onFailure} events to fire on the IO threads + * at the same time. Because the semaphore is already at 0, every retry's {@code acquire()} blocks + * the IO thread immediately, and no {@code onComplete} release can fire. + * + *

The test asserts that after a short wait the semaphore is at 0 and none of the {@code + * CompletableFuture}s returned by {@code requestAsync} have completed — proving the permanent + * permit exhaustion. + */ +public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION = "semaphore_leak_test"; + + /** Reduced semaphore size so we can observe the drain without needing thousands of requests. */ + private static final int MAX_PERMITS = 40; + + /** + * Number of concurrent requests. Set equal to MAX_PERMITS so that all permits are exhausted + * before any retry can acquire, triggering the IO-thread deadlock. + */ + private static final int NUM_RETRY_REQUESTS = MAX_PERMITS; + + @BeforeClass + public static void setupCluster() throws Exception { + // Reduce the semaphore size so we can observe drain with few requests. + // This property is read when HttpJettySolrClient is constructed, so it must + // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up. + System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, String.valueOf(MAX_PERMITS)); + + configureCluster(2).addConfig("conf", configset("cloud-dynamic")).configure(); + + CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1) + .process(cluster.getSolrClient()); + + waitForState( + "Expected 1 active shard with 1 replica", + COLLECTION, + (n, c) -> SolrCloudTestCase.replicasForCollectionAreFullyActive(n, c, 2, 1)); + } + + @AfterClass + public static void cleanup() { + System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + } + + /** + * Demonstrates the permanent IO-thread deadlock caused by {@link + * org.apache.solr.client.solrj.impl.LBAsyncSolrClient} retrying a request synchronously inside a + * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO selector thread. + * + *

This assertion FAILS with the current code, demonstrating the bug. After a fix (e.g. + * dispatching the retry to an executor thread instead of running it synchronously on the IO + * thread), the retries proceed on executor threads, the IO threads remain free to fire {@code + * onComplete → release()}, and all futures eventually complete via the real server. + */ + @Test + public void testSemaphoreLeakOnLBRetry() throws Exception { + // Create a dedicated HttpJettySolrClient for this test so that if the IO threads are + // permanently deadlocked they don't affect the cluster's shared client. + // The system property is still set to MAX_PERMITS from setupCluster(). + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP connection + .build(); + + // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and holds them open. + // Once all are established (semaphore exhausted), closes all with RST simultaneously. + ServerSocket fakeServer = new ServerSocket(0); + CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS); + List fakeConnections = Collections.synchronizedList(new ArrayList<>()); + + Thread fakeServerThread = + new Thread( + () -> { + try { + while (fakeConnections.size() < NUM_RETRY_REQUESTS && !fakeServer.isClosed()) { + Socket s = fakeServer.accept(); + fakeConnections.add(s); + allConnected.countDown(); + } + } catch (IOException ignored) { + } + }, + "fake-tcp-server"); + fakeServerThread.setDaemon(true); + fakeServerThread.start(); + + String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() + "/solr"; + String realBaseUrl = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; + + List> futures = new ArrayList<>(); + + try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + + assertEquals( + "All permits should be available before the test (verifies sysprop was applied)", + MAX_PERMITS, + testClient.asyncTrackerAvailablePermits()); + + // Submit NUM_RETRY_REQUESTS async requests. + // Each request has two endpoints: fakeBaseUrl (first) and realBaseUrl (second/retry). + // Each requestAsync() call acquires a semaphore permit synchronously during send(). + // After NUM_RETRY_REQUESTS calls, the semaphore is at 0. + for (int i = 0; i < NUM_RETRY_REQUESTS; i++) { + ModifiableSolrParams qparams = new ModifiableSolrParams(); + qparams.set("q", "*:*"); + QueryRequest qr = new QueryRequest(qparams); + LBSolrClient.Req req = + new LBSolrClient.Req( + qr, + List.of( + new LBSolrClient.Endpoint(fakeBaseUrl), + new LBSolrClient.Endpoint(realBaseUrl))); + futures.add(lbClient.requestAsync(req)); + } + + log.info( + "Queued {} requests (semaphore now at 0). Waiting for all TCP connections...", + NUM_RETRY_REQUESTS); + + // Wait until the fake server has accepted all NUM_RETRY_REQUESTS connections. + // At this point all semaphore permits are consumed and no onComplete has fired yet. + assertTrue( + "All " + + NUM_RETRY_REQUESTS + + " connections should be established within 15 s, but only " + + (NUM_RETRY_REQUESTS - allConnected.getCount()) + + " were.", + allConnected.await(15, TimeUnit.SECONDS)); + + assertEquals( + "Semaphore should be fully consumed after queuing all requests", + 0, + testClient.asyncTrackerAvailablePermits()); + + // Close all fake connections simultaneously with TCP RST. + // This fires Jetty's onFailure callback on the IO selector thread for each request. + // The onFailure path → future.completeExceptionally() → whenComplete fires synchronously + // on the IO thread → LBAsyncSolrClient.doAsyncRequest (retry) → send() → + // onRequestQueued → semaphore.acquire() → BLOCKS (semaphore = 0) → IO thread stuck. + int connCount = fakeConnections.size(); + log.info("Closing {} fake connections via RST...", connCount); + for (Socket s : fakeConnections) { + try { + s.setSoLinger(true, 0); // send RST instead of FIN + s.close(); + } catch (IOException ignored) { + } + } + + // Give IO threads time to process the failure events and attempt the retry acquires. + Thread.sleep(2000); + + int permitsAfterFailures = testClient.asyncTrackerAvailablePermits(); + long completedCount = futures.stream().filter(CompletableFuture::isDone).count(); + log.info( + "Permits after 2s: {}/{}; futures completed: {}/{}", + permitsAfterFailures, + MAX_PERMITS, + completedCount, + NUM_RETRY_REQUESTS); + + // With the bug: the IO threads are deadlocked. Permits remain at 0 and no future completes. + // This assertion FAILS with the current code, demonstrating the bug. + assertEquals( + "BUG (LBAsyncSolrClient retry leak): all " + + NUM_RETRY_REQUESTS + + " semaphore permits should be released once the retries complete on the" + + " real server. Instead the IO threads are permanently blocked in" + + " semaphore.acquire() because the retry fires synchronously on the IO thread" + + " before the original request's completeListener can call release().", + MAX_PERMITS, + permitsAfterFailures); + } finally { + fakeServer.close(); + + // Force-stop the underlying Jetty HttpClient to unblock any IO threads permanently + // stuck in semaphore.acquire(), so that the test client can be closed without hanging. + try { + testClient.getHttpClient().stop(); + } catch (Exception ignored) { + } + try { + testClient.close(); + } catch (Exception ignored) { + } + + for (CompletableFuture f : futures) { + f.cancel(true); + } + } + } + + /** + * Verifies that no semaphore permits are permanently leaked when connection-level failures + * trigger LB retries on the Jetty IO selector thread, provided the semaphore is not exhausted. + * + *

This test uses the production default ({@code 1000}) permits and only {@code 20} requests. + * With plenty of permits available, {@code acquire()} on the IO thread returns immediately (does + * not block), so {@code onComplete} fires normally and every permit is returned. + * + *

This test passes both with and without the Pattern A fix. Run it with the fix + * commented out to confirm that the deadlock only manifests when the semaphore is fully exhausted + * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}). + */ + @Test + public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { + // The @BeforeClass set ASYNC_REQUESTS_MAX_SYSPROP=40 for the cluster. Clear it temporarily + // so this test's dedicated client uses the real production default (1000 permits). + String savedMax = System.getProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + + final int numRequests = 20; + + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + .useHttp1_1(true) + .build(); + + ServerSocket fakeServer = new ServerSocket(0); + CountDownLatch allConnected = new CountDownLatch(numRequests); + List fakeConnections = Collections.synchronizedList(new ArrayList<>()); + + Thread fakeServerThread = + new Thread( + () -> { + try { + while (fakeConnections.size() < numRequests && !fakeServer.isClosed()) { + Socket s = fakeServer.accept(); + fakeConnections.add(s); + allConnected.countDown(); + } + } catch (IOException ignored) { + } + }, + "fake-tcp-server"); + fakeServerThread.setDaemon(true); + fakeServerThread.start(); + + String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() + "/solr"; + String realBaseUrl = + cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; + + List> futures = new ArrayList<>(); + + try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + + int initialPermits = testClient.asyncTrackerMaxPermits(); + assertEquals("Should use the production default of 1000 permits", 1000, initialPermits); + assertEquals( + "All permits available before test", + initialPermits, + testClient.asyncTrackerAvailablePermits()); + + for (int i = 0; i < numRequests; i++) { + ModifiableSolrParams p = new ModifiableSolrParams(); + p.set("q", "*:*"); + futures.add( + lbClient.requestAsync( + new LBSolrClient.Req( + new QueryRequest(p), + List.of( + new LBSolrClient.Endpoint(fakeBaseUrl), + new LBSolrClient.Endpoint(realBaseUrl))))); + } + + log.info( + "Submitted {} requests ({} permits, semaphore far from exhaustion). " + + "Waiting for connections...", + numRequests, + initialPermits); + assertTrue( + "All " + numRequests + " connections should be established within 15 s", + allConnected.await(15, TimeUnit.SECONDS)); + + int rstCount = fakeConnections.size(); + log.info("RST-ing {} fake connections...", rstCount); + for (Socket s : fakeConnections) { + try { + s.setSoLinger(true, 0); + s.close(); + } catch (IOException ignored) { + } + } + + // With permits >> 0, acquire() on the IO thread returns immediately (no blocking). + // onComplete fires normally after each retry and restores every permit. + // Expect all futures to resolve (via retry to the real server) within 30 s. + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + // Retry failure (e.g. transient real-server error): permits are still released by + // onComplete, so we proceed to the assertion. + log.warn("Some futures completed exceptionally", e); + } + + int permitsAfter = testClient.asyncTrackerAvailablePermits(); + long completedCount = futures.stream().filter(CompletableFuture::isDone).count(); + log.info( + "Permits after retries: {}/{}; futures completed: {}/{}", + permitsAfter, + initialPermits, + completedCount, + numRequests); + + assertEquals( + "No permits leaked: with " + + numRequests + + " requests against a " + + initialPermits + + "-permit semaphore, acquire() never blocks the IO thread so onComplete" + + " always fires and restores every permit.", + initialPermits, + permitsAfter); + + } finally { + // Restore the property so subsequent tests (and @AfterClass) see the expected value. + if (savedMax != null) { + System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, savedMax); + } + fakeServer.close(); + try { + testClient.getHttpClient().stop(); + } catch (Exception ignored) { + } + try { + testClient.close(); + } catch (Exception ignored) { + } + for (CompletableFuture f : futures) { + f.cancel(true); + } + } + } + + /** + * Demonstrates the gradual permit leak caused by Jetty HTTP/2 internally re-queuing the + * same request when it is dispatched to a connection that has already been marked {@code + * closed=true} (post-GOAWAY race). + * + *

Root cause

+ * + *

In Jetty 12, {@code HttpConnectionOverHTTP2.send(HttpExchange)} returns {@code + * SendFailure(ClosedChannelException, retry=true)} when the target connection is already marked + * closed (e.g. because the server sent a GOAWAY frame while the request was being dispatched from + * the destination queue). {@code HttpDestination.process()} then calls {@code send(HttpExchange)} + * again — re-enqueuing the same exchange object and re-firing {@code notifyQueued()} — + * which invokes {@code AsyncTracker.queuedListener} a second time, calling {@code + * semaphore.acquire()} again. Because {@code onComplete} still fires only once for the logical + * request, there is no matching second {@code release()}. One permit is permanently consumed per + * occurrence. + * + *

Production impact

+ * + *

In a 2-node/2-shard cluster with default 1000 permits, each HTTP/2 connection close (server + * restart, load-balancer connection draining, etc.) that races with an in-flight request leaks + * one permit. Over hours or days, permits gradually drain from 1000 toward zero. Once the + * semaphore is nearly exhausted, even a small burst of connection failures triggers the Pattern A + * IO-thread deadlock and the cluster hangs permanently. + * + *

Test approach

+ * + *

Rather than setting up a real HTTP/2 server with GOAWAY, this test directly simulates the + * double {@code onRequestQueued} notification via reflection. It accesses {@code + * AsyncTracker.queuedListener} and {@code AsyncTracker.completeListener}, invokes the queued + * listener twice for the same {@code Request} object, and invokes the complete listener once. + * With the bug present the semaphore count drops by one; with the fix (idempotency guard on the + * request attribute) the count is unchanged. + * + *

This test FAILS without the {@code PERMIT_ACQUIRED_ATTR} idempotency guard, + * demonstrating the leak. + */ + @Test + @SuppressForbidden( + reason = + "Reflection needed to access AsyncTracker's private fields for white-box testing without exposing them in the production API") + public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { + assumeWorkingMockito(); + // Clear the @BeforeClass 40-permit cap so this client gets the production default (1000). + String savedMax = System.getProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); + + HttpJettySolrClient testClient = + new HttpJettySolrClient.Builder() + .withConnectionTimeout(5, TimeUnit.SECONDS) + .withIdleTimeout(30, TimeUnit.SECONDS) + // HTTP/2 is the default transport where this GOAWAY race occurs. + .build(); + + // Capture asyncTracker and its class for reflection-based listener access and cleanup. + Field asyncTrackerField = HttpJettySolrClient.class.getDeclaredField("asyncTracker"); + asyncTrackerField.setAccessible(true); + Object asyncTracker = asyncTrackerField.get(testClient); + Class asyncTrackerClass = asyncTracker.getClass(); + + try { + int maxPermits = testClient.asyncTrackerMaxPermits(); + assertEquals("Should use production default of 1000 permits", 1000, maxPermits); + assertEquals( + "All permits available before test", + maxPermits, + testClient.asyncTrackerAvailablePermits()); + + // Access the raw listeners via reflection to simulate Jetty's internal double-fire. + Field queuedListenerField = asyncTrackerClass.getDeclaredField("queuedListener"); + queuedListenerField.setAccessible(true); + Request.QueuedListener queuedListener = + (Request.QueuedListener) queuedListenerField.get(asyncTracker); + + Field completeListenerField = asyncTrackerClass.getDeclaredField("completeListener"); + completeListenerField.setAccessible(true); + Response.CompleteListener completeListener = + (Response.CompleteListener) completeListenerField.get(asyncTracker); + + // Build a minimal fake Request that supports attribute get/set. + // The fix reads request.getAttributes().get(key) and writes request.attribute(key, value). + // Without the fix the request parameter is ignored, so null would also suffice here. + Map reqAttributes = new HashMap<>(); + Request fakeRequest = Mockito.mock(Request.class); + Mockito.when(fakeRequest.getAttributes()).thenReturn(reqAttributes); + Mockito.when(fakeRequest.attribute(ArgumentMatchers.anyString(), ArgumentMatchers.any())) + .thenAnswer( + inv -> { + reqAttributes.put(inv.getArgument(0), inv.getArgument(1)); + return fakeRequest; + }); + + // Simulate the Jetty HTTP/2 GOAWAY race: + // 1st call — normal queueing; acquire() consumes one permit. + // 2nd call — Jetty internal retry after ClosedChannelException; BUG: acquire() again. + queuedListener.onQueued(fakeRequest); + queuedListener.onQueued(fakeRequest); + + // Only one onComplete fires for the logical request (regardless of internal retries). + completeListener.onComplete(null); + + int permitsAfter = testClient.asyncTrackerAvailablePermits(); + log.info("Permits after double-queued + single complete: {}/{}", permitsAfter, maxPermits); + + // BUG: the second acquire() has no matching release(). One permit is permanently leaked. + // This assertion FAILS with the current code, demonstrating the bug. + // After applying the PERMIT_ACQUIRED_ATTR idempotency guard the second onQueued call is + // a no-op, acquire() is called only once, and this assertion passes. + assertEquals( + "BUG (Jetty HTTP/2 GOAWAY retry permit leak): onRequestQueued fired twice for the" + + " same Request object but onComplete fired only once. The second acquire()" + + " was not matched by a release(), permanently leaking one permit per" + + " occurrence. In production this causes gradual semaphore depletion over" + + " hours/days until Pattern A IO-thread deadlock triggers.", + maxPermits, + permitsAfter); + + } finally { + // Restore the system property for subsequent tests. + if (savedMax != null) { + System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, savedMax); + } + + // Force-terminate the Phaser as a safety net in case close() would otherwise hang. + // With the PERMIT_ACQUIRED_ATTR fix the second onQueued() call is a no-op (the attribute + // is already set), so the phaser is balanced. forceTermination() is therefore harmless here. + try { + Field phaserField = asyncTrackerClass.getDeclaredField("phaser"); + phaserField.setAccessible(true); + Phaser phaser = (Phaser) phaserField.get(asyncTracker); + phaser.forceTermination(); + } catch (Exception ignored) { + } + + try { + testClient.close(); + } catch (Exception ignored) { + } + } + } +} diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index cac90ba46705..efc6833cfe9a 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Phaser; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -110,6 +111,9 @@ public class HttpJettySolrClient extends HttpSolrClientBase { */ public static final String CLIENT_CUSTOMIZER_SYSPROP = "solr.solrj.http.jetty.customizer"; + /** System property to cap the maximum number of outstanding async HTTP requests. Default 1000. */ + public static final String ASYNC_REQUESTS_MAX_SYSPROP = "solr.http.client.async_requests.max"; + public static final String REQ_PRINCIPAL_KEY = "solr-req-principal"; private static final String USER_AGENT = "Solr[" + MethodHandles.lookup().lookupClass().getName() + "] " + SolrVersion.LATEST_STRING; @@ -439,7 +443,17 @@ public void onHeaders(Response response) { @Override public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); - future.completeExceptionally(new SolrServerException(failure.getMessage(), failure)); + // Dispatch off the IO thread so any whenComplete retry won't block on + // semaphore.acquire(). + try { + executor.execute( + () -> + future.completeExceptionally( + new SolrServerException(failure.getMessage(), failure))); + } catch (RejectedExecutionException ree) { + // Executor shut down; safe to complete inline since retries will fail immediately. + future.completeExceptionally(new SolrServerException(failure.getMessage(), failure)); + } } }); @@ -836,6 +850,18 @@ public void close() { private static class AsyncTracker { private static final int MAX_OUTSTANDING_REQUESTS = 1000; + /** + * Request attribute key used to mark that a semaphore permit has been acquired for a given + * request. Jetty can internally re-queue the same exchange object and re-fire {@code + * onRequestQueued} more than once (e.g. when retrying after a connection-level failure), while + * {@code onComplete} always fires exactly once. This attribute makes the queued/complete + * listeners idempotent: a second {@code onRequestQueued} for the same request is a no-op, and + * {@code onComplete} releases the permit only when one was actually acquired. + */ + private static final String PERMIT_ACQUIRED_ATTR = "solr.async_tracker.permit_acquired"; + + private final int maxRequests; + // wait for async requests private final Phaser phaser; // maximum outstanding requests left @@ -844,28 +870,48 @@ private static class AsyncTracker { private final Response.CompleteListener completeListener; AsyncTracker() { + maxRequests = Integer.getInteger(ASYNC_REQUESTS_MAX_SYSPROP, MAX_OUTSTANDING_REQUESTS); // TODO: what about shared instances? phaser = new Phaser(1); - available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false); + available = new Semaphore(maxRequests, false); queuedListener = request -> { + if (request.getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { + return; + } phaser.register(); try { available.acquire(); - } catch (InterruptedException ignored) { - + } catch (InterruptedException e) { + // Undo phaser registration: no permit was acquired so completeListener must not + // release. + phaser.arriveAndDeregister(); + Thread.currentThread().interrupt(); + return; } + request.attribute(PERMIT_ACQUIRED_ATTR, Boolean.TRUE); }; completeListener = result -> { - phaser.arriveAndDeregister(); - available.release(); + if (result == null + || result.getRequest().getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { + phaser.arriveAndDeregister(); + available.release(); + } }; } int getMaxRequestsQueuedPerDestination() { // comfortably above max outstanding requests - return MAX_OUTSTANDING_REQUESTS * 3; + return maxRequests * 3; + } + + int maxPermits() { + return maxRequests; + } + + int availablePermits() { + return available.availablePermits(); } public void waitForComplete() { @@ -874,6 +920,16 @@ public void waitForComplete() { } } + /** Returns the configured maximum number of outstanding async requests. */ + public int asyncTrackerMaxPermits() { + return asyncTracker.maxPermits(); + } + + /** Returns the number of currently available async-request permits. */ + public int asyncTrackerAvailablePermits() { + return asyncTracker.availablePermits(); + } + public static class Builder extends HttpSolrClientBuilderBase { From ab9401af926e700ed805769e3f4cf07f5e33a134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 12:07:48 +0200 Subject: [PATCH 02/26] Rewrite comments in test class --- .../AsyncTrackerSemaphoreLeakTest.java | 139 +++++------------- 1 file changed, 40 insertions(+), 99 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 417170e60b88..372278387c20 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -40,6 +40,7 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.Response; +import org.eclipse.jetty.client.Result; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -49,43 +50,25 @@ import org.slf4j.LoggerFactory; /** - * Reproduces the {@link org.apache.solr.client.solrj.impl.LBAsyncSolrClient} semaphore leak that - * causes distributed queries to hang permanently. + * Tests for two semaphore-permit leak bugs in {@link HttpJettySolrClient}'s {@code AsyncTracker} + * that cause distributed queries to hang permanently. * - *

Bug scenario

+ *

Pattern A – HTTP/2 GOAWAY double-queue leak

* - *
    - *
  1. A shard HTTP request fails with a connection-level error (not an HTTP-level - * error). Jetty fires the {@code onFailure} response callback directly on the IO selector - * thread. - *
  2. {@link org.apache.solr.client.solrj.jetty.HttpJettySolrClient#requestAsync} completes its - * {@code CompletableFuture} exceptionally from within that {@code onFailure} callback — still - * on the IO thread. - *
  3. {@code LBAsyncSolrClient.doAsyncRequest} registered a {@code whenComplete} on that future. - * Because the future completes on the IO thread, {@code whenComplete} also fires - * synchronously on the IO thread. - *
  4. The {@code whenComplete} action calls {@code doAsyncRequest} again (retry to the next - * endpoint), which eventually calls Jetty's {@code HttpClient.send()}. That triggers the - * {@code AsyncTracker.queuedListener} — which calls {@code semaphore.acquire()} — still on - * the IO thread, before the original request's {@code completeListener.onComplete()} has had - * a chance to call {@code semaphore.release()}. - *
  5. If the semaphore is at zero, {@code acquire()} blocks the IO thread. The blocked - * IO thread cannot execute the {@code completeListener} that would release the original - * permit. The permit is permanently leaked, and the IO thread is permanently stuck. Repeat - * until all permits are exhausted: distributed queries hang forever. - *
+ *

Jetty HTTP/2 can re-queue the same exchange after a GOAWAY/connection race, firing {@code + * onRequestQueued} twice for one logical request. Because {@code onComplete} fires only once, one + * permit is permanently consumed per occurrence, gradually draining the semaphore over hours or + * days until Pattern B triggers. * - *

Test setup

+ *

Pattern B – IO-thread deadlock on LB retry when permits depleted

* - *

A raw TCP server accepts {@value #NUM_RETRY_REQUESTS} connections and holds them open until - * all are established (so all semaphore permits are consumed). It then closes all connections - * simultaneously via TCP RST, causing all Jetty {@code onFailure} events to fire on the IO threads - * at the same time. Because the semaphore is already at 0, every retry's {@code acquire()} blocks - * the IO thread immediately, and no {@code onComplete} release can fire. - * - *

The test asserts that after a short wait the semaphore is at 0 and none of the {@code - * CompletableFuture}s returned by {@code requestAsync} have completed — proving the permanent - * permit exhaustion. + *

When a connection-level failure causes {@link + * org.apache.solr.client.solrj.jetty.LBJettySolrClient} to retry synchronously inside a {@code + * whenComplete} callback on the Jetty IO selector thread, the retry calls {@code acquire()} on that + * same IO thread before the original request's {@code onComplete} can call {@code release()}. No + * permits are permanently lost — the deadlock simply requires two permits to be available + * simultaneously — but if the semaphore is at zero, {@code acquire()} blocks the IO thread + * permanently and distributed queries hang forever. */ public class AsyncTrackerSemaphoreLeakTest extends SolrCloudTestCase { @@ -126,20 +109,17 @@ public static void cleanup() { } /** - * Demonstrates the permanent IO-thread deadlock caused by {@link - * org.apache.solr.client.solrj.impl.LBAsyncSolrClient} retrying a request synchronously inside a + * Demonstrates the permanent IO-thread deadlock (Pattern B) caused by {@link + * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request synchronously inside a * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO selector thread. * - *

This assertion FAILS with the current code, demonstrating the bug. After a fix (e.g. - * dispatching the retry to an executor thread instead of running it synchronously on the IO - * thread), the retries proceed on executor threads, the IO threads remain free to fire {@code - * onComplete → release()}, and all futures eventually complete via the real server. + *

This assertion FAILS with the current code, demonstrating the bug. The fix would + * dispatch retries to an executor thread so the IO thread remains free to fire {@code onComplete + * → release()}. */ @Test public void testSemaphoreLeakOnLBRetry() throws Exception { - // Create a dedicated HttpJettySolrClient for this test so that if the IO threads are - // permanently deadlocked they don't affect the cluster's shared client. - // The system property is still set to MAX_PERMITS from setupCluster(). + // Dedicated client so that permanently deadlocked IO threads don't affect the cluster's client. HttpJettySolrClient testClient = new HttpJettySolrClient.Builder() .withConnectionTimeout(5, TimeUnit.SECONDS) @@ -219,10 +199,8 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { testClient.asyncTrackerAvailablePermits()); // Close all fake connections simultaneously with TCP RST. - // This fires Jetty's onFailure callback on the IO selector thread for each request. - // The onFailure path → future.completeExceptionally() → whenComplete fires synchronously - // on the IO thread → LBAsyncSolrClient.doAsyncRequest (retry) → send() → - // onRequestQueued → semaphore.acquire() → BLOCKS (semaphore = 0) → IO thread stuck. + // onFailure fires on the IO thread → LBJettySolrClient retry → acquire() blocks + // (semaphore=0). int connCount = fakeConnections.size(); log.info("Closing {} fake connections via RST...", connCount); for (Socket s : fakeConnections) { @@ -245,10 +223,8 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { completedCount, NUM_RETRY_REQUESTS); - // With the bug: the IO threads are deadlocked. Permits remain at 0 and no future completes. - // This assertion FAILS with the current code, demonstrating the bug. assertEquals( - "BUG (LBAsyncSolrClient retry leak): all " + "BUG (LBJettySolrClient retry leak): all " + NUM_RETRY_REQUESTS + " semaphore permits should be released once the retries complete on the" + " real server. Instead the IO threads are permanently blocked in" @@ -284,7 +260,7 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { * With plenty of permits available, {@code acquire()} on the IO thread returns immediately (does * not block), so {@code onComplete} fires normally and every permit is returned. * - *

This test passes both with and without the Pattern A fix. Run it with the fix + *

This test passes both with and without the Pattern B fix. Run it with the fix * commented out to confirm that the deadlock only manifests when the semaphore is fully exhausted * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}). */ @@ -422,41 +398,14 @@ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { } /** - * Demonstrates the gradual permit leak caused by Jetty HTTP/2 internally re-queuing the - * same request when it is dispatched to a connection that has already been marked {@code - * closed=true} (post-GOAWAY race). - * - *

Root cause

- * - *

In Jetty 12, {@code HttpConnectionOverHTTP2.send(HttpExchange)} returns {@code - * SendFailure(ClosedChannelException, retry=true)} when the target connection is already marked - * closed (e.g. because the server sent a GOAWAY frame while the request was being dispatched from - * the destination queue). {@code HttpDestination.process()} then calls {@code send(HttpExchange)} - * again — re-enqueuing the same exchange object and re-firing {@code notifyQueued()} — - * which invokes {@code AsyncTracker.queuedListener} a second time, calling {@code - * semaphore.acquire()} again. Because {@code onComplete} still fires only once for the logical - * request, there is no matching second {@code release()}. One permit is permanently consumed per - * occurrence. - * - *

Production impact

- * - *

In a 2-node/2-shard cluster with default 1000 permits, each HTTP/2 connection close (server - * restart, load-balancer connection draining, etc.) that races with an in-flight request leaks - * one permit. Over hours or days, permits gradually drain from 1000 toward zero. Once the - * semaphore is nearly exhausted, even a small burst of connection failures triggers the Pattern A - * IO-thread deadlock and the cluster hangs permanently. - * - *

Test approach

- * - *

Rather than setting up a real HTTP/2 server with GOAWAY, this test directly simulates the - * double {@code onRequestQueued} notification via reflection. It accesses {@code - * AsyncTracker.queuedListener} and {@code AsyncTracker.completeListener}, invokes the queued - * listener twice for the same {@code Request} object, and invokes the complete listener once. - * With the bug present the semaphore count drops by one; with the fix (idempotency guard on the - * request attribute) the count is unchanged. + * Verifies that the {@code PERMIT_ACQUIRED_ATTR} idempotency guard prevents the Pattern A permit + * leak where Jetty HTTP/2 re-queues the same exchange after a GOAWAY/connection race, firing + * {@code onRequestQueued} twice for one logical request while {@code onComplete} fires only once. * - *

This test FAILS without the {@code PERMIT_ACQUIRED_ATTR} idempotency guard, - * demonstrating the leak. + *

Rather than setting up a real HTTP/2 server, this test uses reflection to invoke {@code + * AsyncTracker.queuedListener} twice and {@code AsyncTracker.completeListener} once for the same + * {@code Request} object. Without the guard the semaphore count drops by one; with the guard the + * second queued call is a no-op and the count is unchanged. */ @Test @SuppressForbidden( @@ -500,9 +449,7 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { Response.CompleteListener completeListener = (Response.CompleteListener) completeListenerField.get(asyncTracker); - // Build a minimal fake Request that supports attribute get/set. - // The fix reads request.getAttributes().get(key) and writes request.attribute(key, value). - // Without the fix the request parameter is ignored, so null would also suffice here. + // Fake Request that supports the attribute get/set used by the idempotency guard. Map reqAttributes = new HashMap<>(); Request fakeRequest = Mockito.mock(Request.class); Mockito.when(fakeRequest.getAttributes()).thenReturn(reqAttributes); @@ -513,28 +460,24 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { return fakeRequest; }); - // Simulate the Jetty HTTP/2 GOAWAY race: - // 1st call — normal queueing; acquire() consumes one permit. - // 2nd call — Jetty internal retry after ClosedChannelException; BUG: acquire() again. + // Simulate the GOAWAY double-fire: 1st call acquires a permit; 2nd is the bug trigger. queuedListener.onQueued(fakeRequest); queuedListener.onQueued(fakeRequest); + Result fakeResult = Mockito.mock(Result.class); + Mockito.when(fakeResult.getRequest()).thenReturn(fakeRequest); // Only one onComplete fires for the logical request (regardless of internal retries). - completeListener.onComplete(null); + completeListener.onComplete(fakeResult); int permitsAfter = testClient.asyncTrackerAvailablePermits(); log.info("Permits after double-queued + single complete: {}/{}", permitsAfter, maxPermits); - // BUG: the second acquire() has no matching release(). One permit is permanently leaked. - // This assertion FAILS with the current code, demonstrating the bug. - // After applying the PERMIT_ACQUIRED_ATTR idempotency guard the second onQueued call is - // a no-op, acquire() is called only once, and this assertion passes. assertEquals( "BUG (Jetty HTTP/2 GOAWAY retry permit leak): onRequestQueued fired twice for the" + " same Request object but onComplete fired only once. The second acquire()" + " was not matched by a release(), permanently leaking one permit per" + " occurrence. In production this causes gradual semaphore depletion over" - + " hours/days until Pattern A IO-thread deadlock triggers.", + + " hours/days until Pattern B IO-thread deadlock triggers.", maxPermits, permitsAfter); @@ -544,9 +487,7 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, savedMax); } - // Force-terminate the Phaser as a safety net in case close() would otherwise hang. - // With the PERMIT_ACQUIRED_ATTR fix the second onQueued() call is a no-op (the attribute - // is already set), so the phaser is balanced. forceTermination() is therefore harmless here. + // Force-terminate the Phaser as a safety net; without the fix the phaser would be unbalanced. try { Field phaserField = asyncTrackerClass.getDeclaredField("phaser"); phaserField.setAccessible(true); From db3fa1479574b6799d6a0b8203b5c196afe1ddba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 12:20:10 +0200 Subject: [PATCH 03/26] Review comment - make sure we can complete a failed request even if permits are depleted --- .../solrj/jetty/HttpJettySolrClient.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index efc6833cfe9a..a0611140fe12 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; @@ -131,6 +132,16 @@ public class HttpJettySolrClient extends HttpSolrClientBase { private ExecutorService executor; private boolean shutdownExecutor; + /** Fallback for {@code onFailure} dispatch; unbounded so it never rejects. */ + private final ExecutorService failureDispatchExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 1, + 4, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new SolrNamedThreadFactory("h2sc-failure-dispatch")); + private AuthenticationStoreHolder authenticationStore; private KeyStoreScanner scanner; @@ -376,6 +387,7 @@ public void close() { if (shutdownExecutor) { ExecutorUtil.shutdownAndAwaitTermination(executor); } + ExecutorUtil.shutdownAndAwaitTermination(failureDispatchExecutor); } assert ObjectReleaseTracker.release(this); } @@ -451,8 +463,11 @@ public void onFailure(Response response, Throwable failure) { future.completeExceptionally( new SolrServerException(failure.getMessage(), failure))); } catch (RejectedExecutionException ree) { - // Executor shut down; safe to complete inline since retries will fail immediately. - future.completeExceptionally(new SolrServerException(failure.getMessage(), failure)); + // Never complete inline on the IO thread; use the unbounded fallback instead. + failureDispatchExecutor.execute( + () -> + future.completeExceptionally( + new SolrServerException(failure.getMessage(), failure))); } } }); From cc6d3458fa9645508aae3f575918b2028b425e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 12:24:58 +0200 Subject: [PATCH 04/26] Simpler initialization of MAX_OUTSTANDING_REQUESTS --- .../AsyncTrackerSemaphoreLeakTest.java | 26 +++---------------- .../solrj/jetty/HttpJettySolrClient.java | 12 ++++----- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 372278387c20..7e35a93589f0 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -256,9 +256,9 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { * Verifies that no semaphore permits are permanently leaked when connection-level failures * trigger LB retries on the Jetty IO selector thread, provided the semaphore is not exhausted. * - *

This test uses the production default ({@code 1000}) permits and only {@code 20} requests. - * With plenty of permits available, {@code acquire()} on the IO thread returns immediately (does - * not block), so {@code onComplete} fires normally and every permit is returned. + *

Uses only {@code 20} requests, well below the configured permit limit. With plenty of + * permits available, {@code acquire()} on the IO thread returns immediately (does not block), so + * {@code onComplete} fires normally and every permit is returned. * *

This test passes both with and without the Pattern B fix. Run it with the fix * commented out to confirm that the deadlock only manifests when the semaphore is fully exhausted @@ -266,11 +266,6 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { */ @Test public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { - // The @BeforeClass set ASYNC_REQUESTS_MAX_SYSPROP=40 for the cluster. Clear it temporarily - // so this test's dedicated client uses the real production default (1000 permits). - String savedMax = System.getProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); - System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); - final int numRequests = 20; HttpJettySolrClient testClient = @@ -309,7 +304,7 @@ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { int initialPermits = testClient.asyncTrackerMaxPermits(); - assertEquals("Should use the production default of 1000 permits", 1000, initialPermits); + assertTrue("numRequests must be well below permit limit", numRequests < initialPermits); assertEquals( "All permits available before test", initialPermits, @@ -378,10 +373,6 @@ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { permitsAfter); } finally { - // Restore the property so subsequent tests (and @AfterClass) see the expected value. - if (savedMax != null) { - System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, savedMax); - } fakeServer.close(); try { testClient.getHttpClient().stop(); @@ -413,9 +404,6 @@ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { "Reflection needed to access AsyncTracker's private fields for white-box testing without exposing them in the production API") public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { assumeWorkingMockito(); - // Clear the @BeforeClass 40-permit cap so this client gets the production default (1000). - String savedMax = System.getProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); - System.clearProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP); HttpJettySolrClient testClient = new HttpJettySolrClient.Builder() @@ -432,7 +420,6 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { try { int maxPermits = testClient.asyncTrackerMaxPermits(); - assertEquals("Should use production default of 1000 permits", 1000, maxPermits); assertEquals( "All permits available before test", maxPermits, @@ -482,11 +469,6 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { permitsAfter); } finally { - // Restore the system property for subsequent tests. - if (savedMax != null) { - System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, savedMax); - } - // Force-terminate the Phaser as a safety net; without the fix the phaser would be unbalanced. try { Field phaserField = asyncTrackerClass.getDeclaredField("phaser"); diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index a0611140fe12..4c359efcd660 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -863,7 +863,8 @@ public void close() { } private static class AsyncTracker { - private static final int MAX_OUTSTANDING_REQUESTS = 1000; + private static final int MAX_OUTSTANDING_REQUESTS = + EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000); /** * Request attribute key used to mark that a semaphore permit has been acquired for a given @@ -875,8 +876,6 @@ private static class AsyncTracker { */ private static final String PERMIT_ACQUIRED_ATTR = "solr.async_tracker.permit_acquired"; - private final int maxRequests; - // wait for async requests private final Phaser phaser; // maximum outstanding requests left @@ -885,10 +884,9 @@ private static class AsyncTracker { private final Response.CompleteListener completeListener; AsyncTracker() { - maxRequests = Integer.getInteger(ASYNC_REQUESTS_MAX_SYSPROP, MAX_OUTSTANDING_REQUESTS); // TODO: what about shared instances? phaser = new Phaser(1); - available = new Semaphore(maxRequests, false); + available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false); queuedListener = request -> { if (request.getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { @@ -918,11 +916,11 @@ private static class AsyncTracker { int getMaxRequestsQueuedPerDestination() { // comfortably above max outstanding requests - return maxRequests * 3; + return MAX_OUTSTANDING_REQUESTS * 3; } int maxPermits() { - return maxRequests; + return MAX_OUTSTANDING_REQUESTS; } int availablePermits() { From 9074c67e4d556703a93cc39f61ef15b44f18ae83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 12:50:06 +0200 Subject: [PATCH 05/26] Do not release permit when result == null --- .../apache/solr/client/solrj/jetty/HttpJettySolrClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 4c359efcd660..1ee2bc3bd40d 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -906,8 +906,8 @@ private static class AsyncTracker { }; completeListener = result -> { - if (result == null - || result.getRequest().getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { + if (result != null + && result.getRequest().getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { phaser.arriveAndDeregister(); available.release(); } From 5bbdf53d76b6cfbb73a46d066138ccc8018ffc20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 12:56:43 +0200 Subject: [PATCH 06/26] Fix review comment about test waiting 2000 ms --- .../AsyncTrackerSemaphoreLeakTest.java | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 7e35a93589f0..68e6a6b17099 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -28,8 +28,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.lucene.util.SuppressForbidden; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; @@ -211,41 +213,37 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { } } - // Give IO threads time to process the failure events and attempt the retry acquires. - Thread.sleep(2000); + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .get(30, TimeUnit.SECONDS); + } catch (ExecutionException e) { + // Individual request failure is fine; permits are released by onComplete regardless. + log.warn("Some requests failed during retry", e); + } catch (TimeoutException e) { + // Force-stop the HttpClient to unblock any threads stuck in semaphore.acquire() + // before asserting failure, so the finally block can close the client without hanging. + try { + testClient.getHttpClient().stop(); + } catch (Exception ignored) { + } + fail( + "BUG (LBJettySolrClient retry deadlock): futures did not complete within 30s." + + " IO threads are permanently blocked in semaphore.acquire() because the retry" + + " fires synchronously on the IO thread before onComplete can release()."); + } int permitsAfterFailures = testClient.asyncTrackerAvailablePermits(); - long completedCount = futures.stream().filter(CompletableFuture::isDone).count(); - log.info( - "Permits after 2s: {}/{}; futures completed: {}/{}", - permitsAfterFailures, - MAX_PERMITS, - completedCount, - NUM_RETRY_REQUESTS); - + log.info("Permits after retries: {}/{}", permitsAfterFailures, MAX_PERMITS); assertEquals( - "BUG (LBJettySolrClient retry leak): all " - + NUM_RETRY_REQUESTS - + " semaphore permits should be released once the retries complete on the" - + " real server. Instead the IO threads are permanently blocked in" - + " semaphore.acquire() because the retry fires synchronously on the IO thread" - + " before the original request's completeListener can call release().", + "All permits should be restored after retries complete", MAX_PERMITS, permitsAfterFailures); } finally { fakeServer.close(); - - // Force-stop the underlying Jetty HttpClient to unblock any IO threads permanently - // stuck in semaphore.acquire(), so that the test client can be closed without hanging. - try { - testClient.getHttpClient().stop(); - } catch (Exception ignored) { - } try { testClient.close(); } catch (Exception ignored) { } - for (CompletableFuture f : futures) { f.cancel(true); } @@ -374,10 +372,6 @@ public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { } finally { fakeServer.close(); - try { - testClient.getHttpClient().stop(); - } catch (Exception ignored) { - } try { testClient.close(); } catch (Exception ignored) { From 6068e530e428449080933f9186f17dd701117671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 13:08:47 +0200 Subject: [PATCH 07/26] Simplify failure dispatch --- .../client/solrj/jetty/HttpJettySolrClient.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 1ee2bc3bd40d..030573ac9bf7 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -457,18 +456,10 @@ public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); // Dispatch off the IO thread so any whenComplete retry won't block on // semaphore.acquire(). - try { - executor.execute( - () -> - future.completeExceptionally( - new SolrServerException(failure.getMessage(), failure))); - } catch (RejectedExecutionException ree) { - // Never complete inline on the IO thread; use the unbounded fallback instead. - failureDispatchExecutor.execute( - () -> - future.completeExceptionally( - new SolrServerException(failure.getMessage(), failure))); - } + failureDispatchExecutor.execute( + () -> + future.completeExceptionally( + new SolrServerException(failure.getMessage(), failure))); } }); From 9f481baa9b3adc7bb4397e76fe2d509d5f6c8930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 17:14:10 +0200 Subject: [PATCH 08/26] Correct some javadoc --- .../AsyncTrackerSemaphoreLeakTest.java | 17 ++++++++--------- .../client/solrj/jetty/HttpJettySolrClient.java | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 68e6a6b17099..941faabb61f6 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -115,9 +115,9 @@ public static void cleanup() { * org.apache.solr.client.solrj.jetty.LBJettySolrClient} retrying a request synchronously inside a * {@link CompletableFuture#whenComplete} callback that runs on the Jetty IO selector thread. * - *

This assertion FAILS with the current code, demonstrating the bug. The fix would - * dispatch retries to an executor thread so the IO thread remains free to fire {@code onComplete - * → release()}. + *

This test passes with the {@code failureDispatchExecutor} fix in this branch. Without + * the fix, the IO thread would block forever in {@code semaphore.acquire()} and this test would + * time out. */ @Test public void testSemaphoreLeakOnLBRetry() throws Exception { @@ -254,16 +254,15 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { * Verifies that no semaphore permits are permanently leaked when connection-level failures * trigger LB retries on the Jetty IO selector thread, provided the semaphore is not exhausted. * - *

Uses only {@code 20} requests, well below the configured permit limit. With plenty of - * permits available, {@code acquire()} on the IO thread returns immediately (does not block), so + *

Uses only {@code 20} requests, well below the configured limit of {@code 40}. With permits + * still available, {@code acquire()} on the IO thread returns immediately (does not block), so * {@code onComplete} fires normally and every permit is returned. * - *

This test passes both with and without the Pattern B fix. Run it with the fix - * commented out to confirm that the deadlock only manifests when the semaphore is fully exhausted - * (as demonstrated by {@link #testSemaphoreLeakOnLBRetry}). + *

This test passes both with and without the Pattern B fix. The deadlock only manifests + * when the semaphore is fully exhausted (demonstrated by {@link #testSemaphoreLeakOnLBRetry}). */ @Test - public void testNoPermitLeakOnLBRetryWithDefaultPermits() throws Exception { + public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exception { final int numRequests = 20; HttpJettySolrClient testClient = diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 030573ac9bf7..8acd0ea238f9 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -131,7 +131,7 @@ public class HttpJettySolrClient extends HttpSolrClientBase { private ExecutorService executor; private boolean shutdownExecutor; - /** Fallback for {@code onFailure} dispatch; unbounded so it never rejects. */ + /** Executor for {@code onFailure} dispatch; unbounded so it never rejects. */ private final ExecutorService failureDispatchExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor( 1, From 41b9ad3cf362bec57db6de77a1abea0b432953f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 17:18:16 +0200 Subject: [PATCH 09/26] Review feedback: Change sysprop name --- .../org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 8acd0ea238f9..56240d53e178 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -112,7 +112,7 @@ public class HttpJettySolrClient extends HttpSolrClientBase { public static final String CLIENT_CUSTOMIZER_SYSPROP = "solr.solrj.http.jetty.customizer"; /** System property to cap the maximum number of outstanding async HTTP requests. Default 1000. */ - public static final String ASYNC_REQUESTS_MAX_SYSPROP = "solr.http.client.async_requests.max"; + public static final String ASYNC_REQUESTS_MAX_SYSPROP = "solr.solrj.http.jetty.async_requests.max"; public static final String REQ_PRINCIPAL_KEY = "solr-req-principal"; private static final String USER_AGENT = From 633d7d467a3d1c5c017912d56c6c93d8cba83b54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 17:18:31 +0200 Subject: [PATCH 10/26] Review feedback: No quotes in changelog title --- changelog/unreleased/SOLR-18174-prevent-double-registration.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/unreleased/SOLR-18174-prevent-double-registration.yml b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml index 8000e7230cb4..ce816f4fece7 100644 --- a/changelog/unreleased/SOLR-18174-prevent-double-registration.yml +++ b/changelog/unreleased/SOLR-18174-prevent-double-registration.yml @@ -1,4 +1,4 @@ -title: "Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker. Avoid IO-thread deadlock on connection failure retries. Add a new metric gauge solr.http.client.async_permits" +title: Fix semaphore permit leaks in HttpJettySolrClient's AsyncTracker. Avoid IO-thread deadlock on connection failure retries. Add a new metric gauge solr.http.client.async_permits type: fixed authors: - name: Jan Høydahl From c8e01f8e409bb2cf3fa91faa35d0daa4a5a60228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 17:22:27 +0200 Subject: [PATCH 11/26] Use only 1 node for test cluster configuration in AsyncTrackerSemaphoreLeakTest --- .../solr/handler/component/AsyncTrackerSemaphoreLeakTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 941faabb61f6..b54164ddab1a 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -94,7 +94,7 @@ public static void setupCluster() throws Exception { // be set BEFORE the cluster (and its HttpShardHandlerFactory) starts up. System.setProperty(HttpJettySolrClient.ASYNC_REQUESTS_MAX_SYSPROP, String.valueOf(MAX_PERMITS)); - configureCluster(2).addConfig("conf", configset("cloud-dynamic")).configure(); + configureCluster(1).addConfig("conf", configset("cloud-dynamic")).configure(); CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 1) .process(cluster.getSolrClient()); From d2d3d59c0a4d8a3049051a9d1c18cfbdb31611e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 17:28:33 +0200 Subject: [PATCH 12/26] Factor out fakeServer logic into an inner class and make sure this server is always closed using try-with-resources --- .../AsyncTrackerSemaphoreLeakTest.java | 162 ++++++++++-------- .../solrj/jetty/HttpJettySolrClient.java | 3 +- 2 files changed, 93 insertions(+), 72 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index b54164ddab1a..ca4f55e290a0 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.util.SuppressForbidden; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; @@ -129,35 +130,13 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { .useHttp1_1(true) // HTTP/1.1: every request gets its own TCP connection .build(); - // Fake TCP server: accepts exactly NUM_RETRY_REQUESTS connections and holds them open. - // Once all are established (semaphore exhausted), closes all with RST simultaneously. - ServerSocket fakeServer = new ServerSocket(0); - CountDownLatch allConnected = new CountDownLatch(NUM_RETRY_REQUESTS); - List fakeConnections = Collections.synchronizedList(new ArrayList<>()); - - Thread fakeServerThread = - new Thread( - () -> { - try { - while (fakeConnections.size() < NUM_RETRY_REQUESTS && !fakeServer.isClosed()) { - Socket s = fakeServer.accept(); - fakeConnections.add(s); - allConnected.countDown(); - } - } catch (IOException ignored) { - } - }, - "fake-tcp-server"); - fakeServerThread.setDaemon(true); - fakeServerThread.start(); - - String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() + "/solr"; String realBaseUrl = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; List> futures = new ArrayList<>(); - try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + try (FakeTcpServer fakeServer = new FakeTcpServer(NUM_RETRY_REQUESTS); + LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { assertEquals( "All permits should be available before the test (verifies sysprop was applied)", @@ -176,7 +155,7 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { new LBSolrClient.Req( qr, List.of( - new LBSolrClient.Endpoint(fakeBaseUrl), + new LBSolrClient.Endpoint(fakeServer.baseUrl()), new LBSolrClient.Endpoint(realBaseUrl))); futures.add(lbClient.requestAsync(req)); } @@ -191,9 +170,9 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { "All " + NUM_RETRY_REQUESTS + " connections should be established within 15 s, but only " - + (NUM_RETRY_REQUESTS - allConnected.getCount()) + + fakeServer.connectionCount() + " were.", - allConnected.await(15, TimeUnit.SECONDS)); + fakeServer.awaitAllConnected(15, TimeUnit.SECONDS)); assertEquals( "Semaphore should be fully consumed after queuing all requests", @@ -203,15 +182,8 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { // Close all fake connections simultaneously with TCP RST. // onFailure fires on the IO thread → LBJettySolrClient retry → acquire() blocks // (semaphore=0). - int connCount = fakeConnections.size(); - log.info("Closing {} fake connections via RST...", connCount); - for (Socket s : fakeConnections) { - try { - s.setSoLinger(true, 0); // send RST instead of FIN - s.close(); - } catch (IOException ignored) { - } - } + log.info("Closing {} fake connections via RST...", fakeServer.connectionCount()); + fakeServer.rstAll(); try { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) @@ -239,7 +211,6 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { MAX_PERMITS, permitsAfterFailures); } finally { - fakeServer.close(); try { testClient.close(); } catch (Exception ignored) { @@ -272,33 +243,13 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio .useHttp1_1(true) .build(); - ServerSocket fakeServer = new ServerSocket(0); - CountDownLatch allConnected = new CountDownLatch(numRequests); - List fakeConnections = Collections.synchronizedList(new ArrayList<>()); - - Thread fakeServerThread = - new Thread( - () -> { - try { - while (fakeConnections.size() < numRequests && !fakeServer.isClosed()) { - Socket s = fakeServer.accept(); - fakeConnections.add(s); - allConnected.countDown(); - } - } catch (IOException ignored) { - } - }, - "fake-tcp-server"); - fakeServerThread.setDaemon(true); - fakeServerThread.start(); - - String fakeBaseUrl = "http://127.0.0.1:" + fakeServer.getLocalPort() + "/solr"; String realBaseUrl = cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; List> futures = new ArrayList<>(); - try (LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { + try (FakeTcpServer fakeServer = new FakeTcpServer(numRequests); + LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { int initialPermits = testClient.asyncTrackerMaxPermits(); assertTrue("numRequests must be well below permit limit", numRequests < initialPermits); @@ -315,7 +266,7 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio new LBSolrClient.Req( new QueryRequest(p), List.of( - new LBSolrClient.Endpoint(fakeBaseUrl), + new LBSolrClient.Endpoint(fakeServer.baseUrl()), new LBSolrClient.Endpoint(realBaseUrl))))); } @@ -326,17 +277,10 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio initialPermits); assertTrue( "All " + numRequests + " connections should be established within 15 s", - allConnected.await(15, TimeUnit.SECONDS)); + fakeServer.awaitAllConnected(15, TimeUnit.SECONDS)); - int rstCount = fakeConnections.size(); - log.info("RST-ing {} fake connections...", rstCount); - for (Socket s : fakeConnections) { - try { - s.setSoLinger(true, 0); - s.close(); - } catch (IOException ignored) { - } - } + log.info("RST-ing {} fake connections...", fakeServer.connectionCount()); + fakeServer.rstAll(); // With permits >> 0, acquire() on the IO thread returns immediately (no blocking). // onComplete fires normally after each retry and restores every permit. @@ -370,7 +314,6 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio permitsAfter); } finally { - fakeServer.close(); try { testClient.close(); } catch (Exception ignored) { @@ -477,4 +420,81 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { } } } + + /** + * A minimal fake TCP server that accepts a fixed number of connections and holds them open, + * allowing tests to simulate connection-level failures by RST-ing all sockets at once. + * + *

Implements {@link AutoCloseable} so that the server socket and any open connections are + * always cleaned up when used in a try-with-resources block, even if the test fails or throws. + */ + private static class FakeTcpServer implements AutoCloseable { + private final ServerSocket serverSocket; + private final List connections = Collections.synchronizedList(new ArrayList<>()); + private final CountDownLatch allConnected; + private final AtomicBoolean closed = new AtomicBoolean(false); + + FakeTcpServer(int expectedConnections) throws IOException { + this.serverSocket = new ServerSocket(0); + this.allConnected = new CountDownLatch(expectedConnections); + Thread acceptThread = + new Thread( + () -> { + try { + while (connections.size() < expectedConnections && !serverSocket.isClosed()) { + Socket s = serverSocket.accept(); + connections.add(s); + allConnected.countDown(); + } + } catch (IOException ignored) { + } + }, + "fake-tcp-server"); + acceptThread.setDaemon(true); + acceptThread.start(); + } + + /** Returns the base URL clients should connect to, e.g. {@code http://127.0.0.1:PORT/solr}. */ + String baseUrl() { + return "http://127.0.0.1:" + serverSocket.getLocalPort() + "/solr"; + } + + /** Waits until all expected connections have been accepted. */ + boolean awaitAllConnected(long timeout, TimeUnit unit) throws InterruptedException { + return allConnected.await(timeout, unit); + } + + /** Returns the number of connections accepted so far. */ + int connectionCount() { + return connections.size(); + } + + /** + * Closes all accepted connections with TCP RST, triggering onFailure on the Jetty IO thread. + */ + void rstAll() { + for (Socket s : connections) { + try { + s.setSoLinger(true, 0); // send RST instead of FIN + s.close(); + } catch (IOException ignored) { + } + } + } + + /** + * RSTs any remaining open connections and closes the server socket, stopping the accept thread. + * Safe to call multiple times. + */ + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + rstAll(); + try { + serverSocket.close(); + } catch (IOException ignored) { + } + } + } + } } diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 56240d53e178..48e9a76c1d8b 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -112,7 +112,8 @@ public class HttpJettySolrClient extends HttpSolrClientBase { public static final String CLIENT_CUSTOMIZER_SYSPROP = "solr.solrj.http.jetty.customizer"; /** System property to cap the maximum number of outstanding async HTTP requests. Default 1000. */ - public static final String ASYNC_REQUESTS_MAX_SYSPROP = "solr.solrj.http.jetty.async_requests.max"; + public static final String ASYNC_REQUESTS_MAX_SYSPROP = + "solr.solrj.http.jetty.async_requests.max"; public static final String REQ_PRINCIPAL_KEY = "solr-req-principal"; private static final String USER_AGENT = From 3f7dcb947b237b512725c68026417d128b38d11e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 18:04:34 +0200 Subject: [PATCH 13/26] Revert to using common executor for failure dispatch due to leak issues with NoCloseHttpJettySolrClient --- .../solrj/jetty/HttpJettySolrClient.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 48e9a76c1d8b..cc6b41bc8282 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -132,16 +132,6 @@ public class HttpJettySolrClient extends HttpSolrClientBase { private ExecutorService executor; private boolean shutdownExecutor; - /** Executor for {@code onFailure} dispatch; unbounded so it never rejects. */ - private final ExecutorService failureDispatchExecutor = - new ExecutorUtil.MDCAwareThreadPoolExecutor( - 1, - 4, - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new SolrNamedThreadFactory("h2sc-failure-dispatch")); - private AuthenticationStoreHolder authenticationStore; private KeyStoreScanner scanner; @@ -387,7 +377,6 @@ public void close() { if (shutdownExecutor) { ExecutorUtil.shutdownAndAwaitTermination(executor); } - ExecutorUtil.shutdownAndAwaitTermination(failureDispatchExecutor); } assert ObjectReleaseTracker.release(this); } @@ -456,11 +445,14 @@ public void onHeaders(Response response) { public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); // Dispatch off the IO thread so any whenComplete retry won't block on - // semaphore.acquire(). - failureDispatchExecutor.execute( - () -> - future.completeExceptionally( - new SolrServerException(failure.getMessage(), failure))); + // semaphore.acquire(). Fall back to the IO thread if the executor rejects + // (shutdown or overloaded) to ensure the future is always completed. + SolrServerException ex = new SolrServerException(failure.getMessage(), failure); + try { + executor.execute(() -> future.completeExceptionally(ex)); + } catch (java.util.concurrent.RejectedExecutionException ree) { + future.completeExceptionally(ex); + } } }); From b7c6c1e57d459c063abca744c52b4c8798ebfd27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 22:59:38 +0200 Subject: [PATCH 14/26] Avoid test hanging due to executor not able to terminate --- .../AsyncTrackerSemaphoreLeakTest.java | 1 - .../client/solrj/jetty/HttpJettySolrClient.java | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index ca4f55e290a0..72b0eceddba4 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -489,7 +489,6 @@ void rstAll() { @Override public void close() { if (closed.compareAndSet(false, true)) { - rstAll(); try { serverSocket.close(); } catch (IOException ignored) { diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index cc6b41bc8282..9976986e1a6e 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -444,9 +443,8 @@ public void onHeaders(Response response) { @Override public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); - // Dispatch off the IO thread so any whenComplete retry won't block on - // semaphore.acquire(). Fall back to the IO thread if the executor rejects - // (shutdown or overloaded) to ensure the future is always completed. + // Dispatch off the IO thread to avoid blocking semaphore.acquire() on retry. + // Fall back to IO thread if executor rejects (shutdown/overloaded). SolrServerException ex = new SolrServerException(failure.getMessage(), failure); try { executor.execute(() -> future.completeExceptionally(ex)); @@ -912,7 +910,16 @@ int availablePermits() { } public void waitForComplete() { - phaser.arriveAndAwaitAdvance(); + // Use awaitAdvanceInterruptibly() instead of arriveAndAwaitAdvance() so that + // ExecutorUtil.shutdownNow() can unblock this during container shutdown. + int phase = phaser.arrive(); + try { + phaser.awaitAdvanceInterruptibly(phase); + } catch (InterruptedException e) { + // Terminate phaser on interrupt so in-flight onComplete callbacks don't stall. + phaser.forceTermination(); + Thread.currentThread().interrupt(); + } phaser.arriveAndDeregister(); } } From a08833c32392435e9e7be3944716c793f3cd2aab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 23:02:13 +0200 Subject: [PATCH 15/26] Review: Use SolrParams.of() --- .../component/AsyncTrackerSemaphoreLeakTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 72b0eceddba4..11111a7daf01 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -40,7 +40,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.cloud.SolrCloudTestCase; -import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.Response; import org.eclipse.jetty.client.Result; @@ -148,9 +148,7 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { // Each requestAsync() call acquires a semaphore permit synchronously during send(). // After NUM_RETRY_REQUESTS calls, the semaphore is at 0. for (int i = 0; i < NUM_RETRY_REQUESTS; i++) { - ModifiableSolrParams qparams = new ModifiableSolrParams(); - qparams.set("q", "*:*"); - QueryRequest qr = new QueryRequest(qparams); + QueryRequest qr = new QueryRequest(SolrParams.of("q", "*:*")); LBSolrClient.Req req = new LBSolrClient.Req( qr, @@ -259,12 +257,10 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio testClient.asyncTrackerAvailablePermits()); for (int i = 0; i < numRequests; i++) { - ModifiableSolrParams p = new ModifiableSolrParams(); - p.set("q", "*:*"); futures.add( lbClient.requestAsync( new LBSolrClient.Req( - new QueryRequest(p), + new QueryRequest(SolrParams.of("q", "*:*")), List.of( new LBSolrClient.Endpoint(fakeServer.baseUrl()), new LBSolrClient.Endpoint(realBaseUrl))))); From f0fdea80c297072f5ce8d7586515ba4f3b86c8f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Tue, 14 Apr 2026 23:10:36 +0200 Subject: [PATCH 16/26] Print debug log instead of silently ignoring Exceptions --- .../component/AsyncTrackerSemaphoreLeakTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 11111a7daf01..8dd7004b4644 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -195,6 +195,7 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { try { testClient.getHttpClient().stop(); } catch (Exception ignored) { + log.debug("Failed to stop HttpClient"); } fail( "BUG (LBJettySolrClient retry deadlock): futures did not complete within 30s." @@ -212,6 +213,7 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { try { testClient.close(); } catch (Exception ignored) { + log.debug("Failed to close LBJettySolrClient"); } for (CompletableFuture f : futures) { f.cancel(true); @@ -313,6 +315,7 @@ public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exceptio try { testClient.close(); } catch (Exception ignored) { + log.debug("Failed to close LBJettySolrClient"); } for (CompletableFuture f : futures) { f.cancel(true); @@ -408,11 +411,13 @@ public void testPermitLeakOnHttp2GoAwayDoubleQueuedListener() throws Exception { Phaser phaser = (Phaser) phaserField.get(asyncTracker); phaser.forceTermination(); } catch (Exception ignored) { + log.debug("Failed to force-terminate Phaser"); } try { testClient.close(); } catch (Exception ignored) { + log.debug("Failed to close HttpJettySolrClient"); } } } @@ -442,7 +447,8 @@ private static class FakeTcpServer implements AutoCloseable { connections.add(s); allConnected.countDown(); } - } catch (IOException ignored) { + } catch (IOException ioe) { + log.debug("Failed to accept connection: {}", ioe.getMessage()); } }, "fake-tcp-server"); @@ -474,6 +480,7 @@ void rstAll() { s.setSoLinger(true, 0); // send RST instead of FIN s.close(); } catch (IOException ignored) { + log.debug("Failed to close connection"); } } } @@ -485,9 +492,11 @@ void rstAll() { @Override public void close() { if (closed.compareAndSet(false, true)) { + rstAll(); try { serverSocket.close(); } catch (IOException ignored) { + log.debug("Failed to close server socket"); } } } From e482cf27850252e4f9d2c932e7f7c43b64a49c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Wed, 15 Apr 2026 11:01:15 +0200 Subject: [PATCH 17/26] Fix unstable test --- .../AsyncTrackerSemaphoreLeakTest.java | 102 ------------------ .../solrj/jetty/HttpJettySolrClient.java | 21 ++-- 2 files changed, 10 insertions(+), 113 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index 8dd7004b4644..c183fc673bbf 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -221,108 +221,6 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { } } - /** - * Verifies that no semaphore permits are permanently leaked when connection-level failures - * trigger LB retries on the Jetty IO selector thread, provided the semaphore is not exhausted. - * - *

Uses only {@code 20} requests, well below the configured limit of {@code 40}. With permits - * still available, {@code acquire()} on the IO thread returns immediately (does not block), so - * {@code onComplete} fires normally and every permit is returned. - * - *

This test passes both with and without the Pattern B fix. The deadlock only manifests - * when the semaphore is fully exhausted (demonstrated by {@link #testSemaphoreLeakOnLBRetry}). - */ - @Test - public void testNoPermitLeakOnLBRetryWhenSemaphoreNotExhausted() throws Exception { - final int numRequests = 20; - - HttpJettySolrClient testClient = - new HttpJettySolrClient.Builder() - .withConnectionTimeout(5, TimeUnit.SECONDS) - .withIdleTimeout(30, TimeUnit.SECONDS) - .useHttp1_1(true) - .build(); - - String realBaseUrl = - cluster.getJettySolrRunners().get(0).getBaseUrl().toString() + "/" + COLLECTION; - - List> futures = new ArrayList<>(); - - try (FakeTcpServer fakeServer = new FakeTcpServer(numRequests); - LBJettySolrClient lbClient = new LBJettySolrClient.Builder(testClient).build()) { - - int initialPermits = testClient.asyncTrackerMaxPermits(); - assertTrue("numRequests must be well below permit limit", numRequests < initialPermits); - assertEquals( - "All permits available before test", - initialPermits, - testClient.asyncTrackerAvailablePermits()); - - for (int i = 0; i < numRequests; i++) { - futures.add( - lbClient.requestAsync( - new LBSolrClient.Req( - new QueryRequest(SolrParams.of("q", "*:*")), - List.of( - new LBSolrClient.Endpoint(fakeServer.baseUrl()), - new LBSolrClient.Endpoint(realBaseUrl))))); - } - - log.info( - "Submitted {} requests ({} permits, semaphore far from exhaustion). " - + "Waiting for connections...", - numRequests, - initialPermits); - assertTrue( - "All " + numRequests + " connections should be established within 15 s", - fakeServer.awaitAllConnected(15, TimeUnit.SECONDS)); - - log.info("RST-ing {} fake connections...", fakeServer.connectionCount()); - fakeServer.rstAll(); - - // With permits >> 0, acquire() on the IO thread returns immediately (no blocking). - // onComplete fires normally after each retry and restores every permit. - // Expect all futures to resolve (via retry to the real server) within 30 s. - try { - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .get(30, TimeUnit.SECONDS); - } catch (Exception e) { - // Retry failure (e.g. transient real-server error): permits are still released by - // onComplete, so we proceed to the assertion. - log.warn("Some futures completed exceptionally", e); - } - - int permitsAfter = testClient.asyncTrackerAvailablePermits(); - long completedCount = futures.stream().filter(CompletableFuture::isDone).count(); - log.info( - "Permits after retries: {}/{}; futures completed: {}/{}", - permitsAfter, - initialPermits, - completedCount, - numRequests); - - assertEquals( - "No permits leaked: with " - + numRequests - + " requests against a " - + initialPermits - + "-permit semaphore, acquire() never blocks the IO thread so onComplete" - + " always fires and restores every permit.", - initialPermits, - permitsAfter); - - } finally { - try { - testClient.close(); - } catch (Exception ignored) { - log.debug("Failed to close LBJettySolrClient"); - } - for (CompletableFuture f : futures) { - f.cancel(true); - } - } - } - /** * Verifies that the {@code PERMIT_ACQUIRED_ATTR} idempotency guard prevents the Pattern A permit * leak where Jetty HTTP/2 re-queues the same exchange after a GOAWAY/connection race, firing diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 9976986e1a6e..7180fdbcda26 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -849,12 +849,12 @@ private static class AsyncTracker { EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000); /** - * Request attribute key used to mark that a semaphore permit has been acquired for a given - * request. Jetty can internally re-queue the same exchange object and re-fire {@code - * onRequestQueued} more than once (e.g. when retrying after a connection-level failure), while - * {@code onComplete} always fires exactly once. This attribute makes the queued/complete - * listeners idempotent: a second {@code onRequestQueued} for the same request is a no-op, and - * {@code onComplete} releases the permit only when one was actually acquired. + * Request attribute key used to guard idempotency across both listeners. Set immediately after + * {@code phaser.register()} — before {@code available.acquire()} — so that {@code onComplete} + * can never fire between registration and attribute-set and leave a phaser party stranded. + * Jetty can re-fire {@code onRequestQueued} for the same exchange (e.g. after a GOAWAY retry); + * the attribute makes the second call a no-op. {@code onComplete} always fires exactly once and + * uses the attribute to call {@code arriveAndDeregister()} + {@code release()} exactly once. */ private static final String PERMIT_ACQUIRED_ATTR = "solr.async_tracker.permit_acquired"; @@ -875,16 +875,15 @@ private static class AsyncTracker { return; } phaser.register(); + // Set the attribute before acquire() so onComplete can never race between + // phaser.register() and attribute-set, which would strand a phaser party forever. + request.attribute(PERMIT_ACQUIRED_ATTR, Boolean.TRUE); try { available.acquire(); } catch (InterruptedException e) { - // Undo phaser registration: no permit was acquired so completeListener must not - // release. - phaser.arriveAndDeregister(); + // completeListener will call arriveAndDeregister() when onComplete fires. Thread.currentThread().interrupt(); - return; } - request.attribute(PERMIT_ACQUIRED_ATTR, Boolean.TRUE); }; completeListener = result -> { From 08e1a3d95e0be963d16828d36832a107b106f37b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Wed, 15 Apr 2026 11:12:34 +0200 Subject: [PATCH 18/26] Rename metric name from "solr.http.client.async_permits" to "solr.client.request.async_permits" --- .../apache/solr/handler/component/HttpShardHandlerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index 9bcd3f2c2021..fa2392c5a1de 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -446,7 +446,7 @@ public void initializeMetrics(SolrMetricsContext parentContext, Attributes attri if (defaultClient != null) { asyncRequestsGauge = solrMetricsContext.observableLongGauge( - "solr.http.client.async_permits", + "solr.client.request.async_permits", "Outstanding async HTTP request permits in the Jetty SolrJ client" + " (state=max: configured ceiling; state=available: currently unused permits).", measurement -> { From 289f847cf757120d8e6f5302a4b291dc85e6ac77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Wed, 15 Apr 2026 12:51:26 +0200 Subject: [PATCH 19/26] Fix precommit - log calls --- .../handler/component/AsyncTrackerSemaphoreLeakTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index c183fc673bbf..bfd23af5807c 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -180,7 +180,8 @@ public void testSemaphoreLeakOnLBRetry() throws Exception { // Close all fake connections simultaneously with TCP RST. // onFailure fires on the IO thread → LBJettySolrClient retry → acquire() blocks // (semaphore=0). - log.info("Closing {} fake connections via RST...", fakeServer.connectionCount()); + int connCount = fakeServer.connectionCount(); + log.info("Closing {} fake connections via RST...", connCount); fakeServer.rstAll(); try { @@ -346,7 +347,7 @@ private static class FakeTcpServer implements AutoCloseable { allConnected.countDown(); } } catch (IOException ioe) { - log.debug("Failed to accept connection: {}", ioe.getMessage()); + log.debug("Failed to accept connection", ioe); } }, "fake-tcp-server"); From 5a92f99d256b435cac06b32d992377107f3907fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Wed, 15 Apr 2026 13:19:37 +0200 Subject: [PATCH 20/26] Precommit fix fully-qualified-class --- .../apache/solr/client/solrj/jetty/HttpJettySolrClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index 7180fdbcda26..fbc75de3b897 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Phaser; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -448,7 +449,7 @@ public void onFailure(Response response, Throwable failure) { SolrServerException ex = new SolrServerException(failure.getMessage(), failure); try { executor.execute(() -> future.completeExceptionally(ex)); - } catch (java.util.concurrent.RejectedExecutionException ree) { + } catch (RejectedExecutionException ree) { future.completeExceptionally(ex); } } From 0f3f382cdac29eedac76fc543937a42e598251a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Wed, 15 Apr 2026 13:51:20 +0200 Subject: [PATCH 21/26] Reference guide additions for sysprop and metrics --- .../configuration-guide/pages/solr-properties.adoc | 2 ++ .../deployment-guide/pages/metrics-reporting.adoc | 11 +++++++++++ .../pages/solrcloud-distributed-requests.adoc | 11 +++++++++++ 3 files changed, 24 insertions(+) diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc index 837c7282b5ed..b9ecaddb1002 100644 --- a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc +++ b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc @@ -106,6 +106,8 @@ NOTE: Properties marked with "!" indicate inverted meaning between pre Solr 10 a |solr.solrj.http.cookies.enabled|!solr.http.disableCookies| false |If Http2SolrClient should support cookies. +|solr.solrj.http.jetty.async_requests.max||1000|Maximum number of outstanding async HTTP requests allowed concurrently in the Jetty-based SolrJ HTTP client. Increase if you observe semaphore exhaustion under heavy distributed query load; decrease to limit resource usage. Related metric: `solr.client.request.async_permits`. + |solr.solrj.http.jetty.customizer|solr.httpclient.builder.factory||A class loaded to customize HttpJettySolrClient upon creation. |solr.streamingexpressions.facet.tiered.enabled|solr.facet.stream.tiered|true|Controls whether tiered faceting is enabled for streaming expressions. diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc index ea074b2ace22..15a2d592f87d 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/metrics-reporting.adoc @@ -124,6 +124,17 @@ The `Overseer Registry` is initialized when running in SolrCloud mode and includ * Size of the Overseer queues (collection work queue and cluster state update queue) +=== HTTP Client Registry + +Solr exposes metrics for the internal Jetty-based HTTP client used for distributed (shard) requests: + +[cols="2,1,3",options="header"] +|=== +| Prometheus Metric Name | Type | Description +| `solr_client_request_async_permits{state="max"}` | gauge | Configured maximum number of outstanding concurrent async HTTP requests (controlled by `solr.solrj.http.jetty.async_requests.max`, default 1000). +| `solr_client_request_async_permits{state="available"}` | gauge | Number of async request permits currently available (i.e., not in use). When this approaches zero, new distributed requests will block waiting for a permit. +|=== + == Core Level Metrics === Index Merge Metrics diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc index f7b25937d9cb..38fa10a3e311 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc @@ -375,6 +375,17 @@ Both threads could receive a top-level request concurrently, and make sub-reques Because there are no more remaining threads to service requests, the incoming requests will be blocked until the other pending requests are finished, but they will not finish since they are waiting for the sub-requests. By ensuring that Solr is configured to handle a sufficient number of threads, you can avoid deadlock situations like this. +A related bottleneck exists on the outbound (client) side: the internal Jetty HTTP client used for shard sub-requests has a configurable limit on the number of concurrent async requests it will issue simultaneously (default: 1000, controlled by the system property `solr.solrj.http.jetty.async_requests.max`). +In large clusters with many shards, a single top-level query can fan out to hundreds of sub-requests per node, quickly exhausting this limit and causing new shard requests to queue inside the client. +If you see distributed queries stalling or timing out on large collections, consider raising this limit: + +[source,plain] +---- +SOLR_OPTS="$SOLR_OPTS -Dsolr.solrj.http.jetty.async_requests.max=2000" +---- + +The current permit utilization can be observed via the `solr_client_request_async_permits` metric (see xref:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). + == Distributed Tracing and Debugging The `debug` parameter with a value of `track` can be used to trace the request as well as find timing information for each phase of a distributed request. From 8c3b376c9dabf8a54cef15f92ab10ef831cc2f52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 16 Apr 2026 01:35:40 +0200 Subject: [PATCH 22/26] Review feedback - add warn log when completing request on IO thread --- .../apache/solr/client/solrj/jetty/HttpJettySolrClient.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index fbc75de3b897..e76c690b8150 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -450,6 +450,9 @@ public void onFailure(Response response, Throwable failure) { try { executor.execute(() -> future.completeExceptionally(ex)); } catch (RejectedExecutionException ree) { + log.warn( + "Failed to complete future exceptionally due to executor rejection, completing on IO thread.", + ree); future.completeExceptionally(ex); } } From a543701fc762d1bd1156ae3ef5833d9566d57cdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 16 Apr 2026 01:45:44 +0200 Subject: [PATCH 23/26] Move the documentation of max distributed requests sysprop from solrcloud-distributed-requests.adoc to upgrade notes. --- .../pages/solrcloud-distributed-requests.adoc | 11 ----------- .../upgrade-notes/pages/major-changes-in-solr-10.adoc | 8 ++++++++ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc index 38fa10a3e311..f7b25937d9cb 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/solrcloud-distributed-requests.adoc @@ -375,17 +375,6 @@ Both threads could receive a top-level request concurrently, and make sub-reques Because there are no more remaining threads to service requests, the incoming requests will be blocked until the other pending requests are finished, but they will not finish since they are waiting for the sub-requests. By ensuring that Solr is configured to handle a sufficient number of threads, you can avoid deadlock situations like this. -A related bottleneck exists on the outbound (client) side: the internal Jetty HTTP client used for shard sub-requests has a configurable limit on the number of concurrent async requests it will issue simultaneously (default: 1000, controlled by the system property `solr.solrj.http.jetty.async_requests.max`). -In large clusters with many shards, a single top-level query can fan out to hundreds of sub-requests per node, quickly exhausting this limit and causing new shard requests to queue inside the client. -If you see distributed queries stalling or timing out on large collections, consider raising this limit: - -[source,plain] ----- -SOLR_OPTS="$SOLR_OPTS -Dsolr.solrj.http.jetty.async_requests.max=2000" ----- - -The current permit utilization can be observed via the `solr_client_request_async_permits` metric (see xref:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). - == Distributed Tracing and Debugging The `debug` parameter with a value of `track` can be used to trace the request as well as find timing information for each phase of a distributed request. diff --git a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc index 64cd50064ec7..aee3c73a3a6e 100644 --- a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc +++ b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc @@ -331,3 +331,11 @@ Older segments will continue to be readable. WARNING: After upgrading to Solr 10.1, downgrading to an earlier Solr 10.0.x version may fail because the older version does not include the `Lucene104` codec needed to read the newly written segments. If you require the ability to roll back, back up your indexes before upgrading. + +=== Max distributed requests now configurable + +The internal HTTP client used for distributed shard sub-requests previously had a hard-coded limit of 1000 concurrent async requests per node. +In large clusters, a single query can fan out to hundreds of sub-requests, quickly exhausting this limit and causing requests to queue, potentially leading to stalls or timeouts. +This limit is now configurable via the system property `solr.solrj.http.jetty.async_requests.max`. + +Current permit utilization can be monitored via the `solr_client_request_async_permits` metric (see xref:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). From 78b0d7a9ae53361127005a58f067b1d422c3ade1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 16 Apr 2026 02:00:05 +0200 Subject: [PATCH 24/26] Change log for FakeTcpServer from debug to warn to aid in debugging potential test failure --- .../solr/handler/component/AsyncTrackerSemaphoreLeakTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java index bfd23af5807c..1ff3ec542ed5 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/AsyncTrackerSemaphoreLeakTest.java @@ -347,7 +347,7 @@ private static class FakeTcpServer implements AutoCloseable { allConnected.countDown(); } } catch (IOException ioe) { - log.debug("Failed to accept connection", ioe); + log.warn("Failed to accept connection", ioe); } }, "fake-tcp-server"); From 88e327bb0095b67e2edd4fef5c3c71db37fb112d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Thu, 16 Apr 2026 09:14:48 +0200 Subject: [PATCH 25/26] Precommit - satisfy :solr:solr-ref-guide:checkSiteLinks --- .../modules/upgrade-notes/pages/major-changes-in-solr-10.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc index aee3c73a3a6e..2598063e8a18 100644 --- a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc +++ b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc @@ -338,4 +338,4 @@ The internal HTTP client used for distributed shard sub-requests previously had In large clusters, a single query can fan out to hundreds of sub-requests, quickly exhausting this limit and causing requests to queue, potentially leading to stalls or timeouts. This limit is now configurable via the system property `solr.solrj.http.jetty.async_requests.max`. -Current permit utilization can be monitored via the `solr_client_request_async_permits` metric (see xref:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). +Current permit utilization can be monitored via the `solr_client_request_async_permits` metric (see xref:deployment-guide:metrics-reporting.adoc#http-client-registry[HTTP Client Registry]). From 011963534d230813e35ebd1599cadfb6fd60567d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=B8ydahl?= Date: Fri, 17 Apr 2026 23:55:46 +0200 Subject: [PATCH 26/26] Fix test race-condition bug by setting maxOutstandingRequests as instance variable --- .../client/solrj/jetty/HttpJettySolrClient.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java index e76c690b8150..05491bb8e698 100644 --- a/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java +++ b/solr/solrj-jetty/src/java/org/apache/solr/client/solrj/jetty/HttpJettySolrClient.java @@ -849,8 +849,11 @@ public void close() { } private static class AsyncTracker { - private static final int MAX_OUTSTANDING_REQUESTS = - EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000); + /** + * Read per-instance so that tests can set the sysprop before constructing a client and have it + * take effect without relying on class-load ordering across test suites in the same JVM. + */ + private final int maxOutstandingRequests; /** * Request attribute key used to guard idempotency across both listeners. Set immediately after @@ -871,8 +874,9 @@ private static class AsyncTracker { AsyncTracker() { // TODO: what about shared instances? + maxOutstandingRequests = EnvUtils.getPropertyAsInteger(ASYNC_REQUESTS_MAX_SYSPROP, 1000); phaser = new Phaser(1); - available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false); + available = new Semaphore(maxOutstandingRequests, false); queuedListener = request -> { if (request.getAttributes().get(PERMIT_ACQUIRED_ATTR) != null) { @@ -901,11 +905,11 @@ private static class AsyncTracker { int getMaxRequestsQueuedPerDestination() { // comfortably above max outstanding requests - return MAX_OUTSTANDING_REQUESTS * 3; + return maxOutstandingRequests * 3; } int maxPermits() { - return MAX_OUTSTANDING_REQUESTS; + return maxOutstandingRequests; } int availablePermits() {