From e4ade680b1c84b458e636b5a85893fe2a1084347 Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Tue, 5 May 2026 12:30:12 +0200 Subject: [PATCH] fix: replace raw SSE string handling with SSEEvent model and SSEParser Introduce SSEEvent (data, id, event type, retry) and a spec-compliant SSEParser with buffer and line-length guards. Update postAsyncSSE and all SSE listeners across JDK, Android, JSONRPC, and REST transports to consume SSEEvent instead of raw strings. Using proper SSE Events Signed-off-by: Emmanuel Hugonnet --- .../transport/jsonrpc/JSONRPCTransport.java | 4 +- .../jsonrpc/sse/SSEEventListener.java | 9 +- .../jsonrpc/sse/SSEEventListenerTest.java | 13 +- .../client/transport/rest/RestTransport.java | 4 +- .../rest/sse/RestSSEEventListener.java | 9 +- .../a2a/client/http/AndroidA2AHttpClient.java | 454 ++++++++-------- .../io/a2a/client/http/A2AHttpClient.java | 4 +- .../io/a2a/client/http/JdkA2AHttpClient.java | 17 +- .../io/a2a/client/http/ServerSentEvent.java | 32 ++ .../client/http/ServerSentEventParser.java | 192 +++++++ .../a2a/client/http/A2ACardResolverTest.java | 2 +- .../http/AbstractA2AHttpClientSSETest.java | 139 ++++- .../http/ServerSentEventParserTest.java | 512 ++++++++++++++++++ .../AbstractA2ARequestHandlerTest.java | 3 +- .../tasks/PushNotificationSenderTest.java | 3 +- .../server/apps/common/TestHttpClient.java | 3 +- 16 files changed, 1135 insertions(+), 265 deletions(-) create mode 100644 http-client/src/main/java/io/a2a/client/http/ServerSentEvent.java create mode 100644 http-client/src/main/java/io/a2a/client/http/ServerSentEventParser.java create mode 100644 http-client/src/test/java/io/a2a/client/http/ServerSentEventParserTest.java diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java index b1c38afcc..3a12cf8a5 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/JSONRPCTransport.java @@ -134,7 +134,7 @@ public void sendMessageStreaming(MessageSendParams request, Consumer sseEventListener.onMessage(msg, ref.get()), + event -> sseEventListener.onMessage(event, ref.get()), throwable -> sseEventListener.onError(throwable, ref.get()), () -> { // Signal normal stream completion to error handler (null error means success) @@ -315,7 +315,7 @@ public void resubscribe(TaskIdParams request, Consumer event try { A2AHttpClient.PostBuilder builder = createPostBuilder(payloadAndHeaders); ref.set(builder.postAsyncSSE( - msg -> sseEventListener.onMessage(msg, ref.get()), + event -> sseEventListener.onMessage(event, ref.get()), throwable -> sseEventListener.onError(throwable, ref.get()), () -> { // Signal normal stream completion to error handler (null error means success) diff --git a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java index 0ab70027a..eef1a5d39 100644 --- a/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java +++ b/client/transport/jsonrpc/src/main/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListener.java @@ -3,6 +3,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; +import io.a2a.client.http.ServerSentEvent; import io.a2a.json.JsonProcessingException; import io.a2a.json.JsonUtil; import io.a2a.spec.JSONRPCError; @@ -25,13 +26,13 @@ public SSEEventListener(Consumer eventHandler, this.errorHandler = errorHandler; } - public void onMessage(String message, Future completableFuture) { + public void onMessage(ServerSentEvent event, Future completableFuture) { try { - handleMessage(JsonParser.parseString(message).getAsJsonObject(), completableFuture); + handleMessage(JsonParser.parseString(event.data()).getAsJsonObject(), completableFuture); } catch (JsonSyntaxException e) { - log.warning("Failed to parse JSON message: " + message); + log.warning("Failed to parse JSON message: " + event.data()); } catch (JsonProcessingException e) { - log.warning("Failed to process JSON message: " + message); + log.warning("Failed to process JSON message: " + event.data()); } } diff --git a/client/transport/jsonrpc/src/test/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListenerTest.java b/client/transport/jsonrpc/src/test/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListenerTest.java index 8c4c1495e..b826d1671 100644 --- a/client/transport/jsonrpc/src/test/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListenerTest.java +++ b/client/transport/jsonrpc/src/test/java/io/a2a/client/transport/jsonrpc/sse/SSEEventListenerTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import io.a2a.client.http.ServerSentEvent; import io.a2a.client.transport.jsonrpc.JsonStreamingMessages; import io.a2a.spec.Artifact; import io.a2a.spec.JSONRPCError; @@ -43,7 +44,7 @@ public void testOnEventWithTaskResult() throws Exception { JsonStreamingMessages.STREAMING_TASK_EVENT.indexOf("{")); // Call the onEvent method directly - listener.onMessage(eventData, null); + listener.onMessage(new ServerSentEvent(eventData), null); // Verify the event was processed correctly assertNotNull(receivedEvent.get()); @@ -68,7 +69,7 @@ public void testOnEventWithMessageResult() throws Exception { JsonStreamingMessages.STREAMING_MESSAGE_EVENT.indexOf("{")); // Call onEvent method - listener.onMessage(eventData, null); + listener.onMessage(new ServerSentEvent(eventData), null); // Verify the event was processed correctly assertNotNull(receivedEvent.get()); @@ -96,7 +97,7 @@ public void testOnEventWithTaskStatusUpdateEventEvent() throws Exception { JsonStreamingMessages.STREAMING_STATUS_UPDATE_EVENT.indexOf("{")); // Call onEvent method - listener.onMessage(eventData, null); + listener.onMessage(new ServerSentEvent(eventData), null); // Verify the event was processed correctly assertNotNull(receivedEvent.get()); @@ -122,7 +123,7 @@ public void testOnEventWithTaskArtifactUpdateEventEvent() throws Exception { JsonStreamingMessages.STREAMING_ARTIFACT_UPDATE_EVENT.indexOf("{")); // Call onEvent method - listener.onMessage(eventData, null); + listener.onMessage(new ServerSentEvent(eventData), null); // Verify the event was processed correctly assertNotNull(receivedEvent.get()); @@ -154,7 +155,7 @@ public void testOnEventWithError() throws Exception { JsonStreamingMessages.STREAMING_ERROR_EVENT.indexOf("{")); // Call onEvent method - listener.onMessage(eventData, null); + listener.onMessage(new ServerSentEvent(eventData), null); // Verify the error was processed correctly assertNotNull(receivedError.get()); @@ -217,7 +218,7 @@ public void testOnEventWithFinalTaskStatusUpdateEventEventCancels() throws Excep // Call onEvent method CancelCapturingFuture future = new CancelCapturingFuture(); - listener.onMessage(eventData, future); + listener.onMessage(new ServerSentEvent(eventData), future); // Verify the event was processed correctly assertNotNull(receivedEvent.get()); diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java index c7a792527..b30b4c54b 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/RestTransport.java @@ -105,7 +105,7 @@ public void sendMessageStreaming(MessageSendParams messageSendParams, Consumer sseEventListener.onMessage(msg, ref.get()), + event -> sseEventListener.onMessage(event, ref.get()), throwable -> sseEventListener.onError(throwable, ref.get()), () -> { // We don't need to do anything special on completion @@ -294,7 +294,7 @@ public void resubscribe(TaskIdParams request, Consumer event String url = agentUrl + String.format("/v1/tasks/%1s:subscribe", request.id()); A2AHttpClient.PostBuilder postBuilder = createPostBuilder(url, payloadAndHeaders); ref.set(postBuilder.postAsyncSSE( - msg -> sseEventListener.onMessage(msg, ref.get()), + event -> sseEventListener.onMessage(event, ref.get()), throwable -> sseEventListener.onError(throwable, ref.get()), () -> { // We don't need to do anything special on completion diff --git a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java index d0b130eee..91a3187e8 100644 --- a/client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java +++ b/client/transport/rest/src/main/java/io/a2a/client/transport/rest/sse/RestSSEEventListener.java @@ -11,6 +11,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; +import io.a2a.client.http.ServerSentEvent; import io.a2a.client.transport.rest.RestErrorMapper; import io.a2a.grpc.StreamResponse; import io.a2a.grpc.utils.ProtoUtils; @@ -29,14 +30,14 @@ public RestSSEEventListener(Consumer eventHandler, this.errorHandler = errorHandler; } - public void onMessage(String message, @Nullable Future completableFuture) { + public void onMessage(ServerSentEvent event, @Nullable Future completableFuture) { try { - log.fine("Streaming message received: " + message); + log.fine("Streaming message received: " + event.data()); io.a2a.grpc.StreamResponse.Builder builder = io.a2a.grpc.StreamResponse.newBuilder(); - JsonFormat.parser().merge(message, builder); + JsonFormat.parser().merge(event.data(), builder); handleMessage(builder.build()); } catch (InvalidProtocolBufferException e) { - errorHandler.accept(RestErrorMapper.mapRestError(message, 500)); + errorHandler.accept(RestErrorMapper.mapRestError(event.data(), 500)); } } diff --git a/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java index c023f083d..1ff7b78ad 100644 --- a/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java +++ b/extras/http-client-android/src/main/java/io/a2a/client/http/AndroidA2AHttpClient.java @@ -24,267 +24,271 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; -/** Android-specific implementation of {@link A2AHttpClient} using {@link HttpURLConnection}. */ +/** + * Android-specific implementation of {@link A2AHttpClient} using {@link HttpURLConnection}. + */ public class AndroidA2AHttpClient implements A2AHttpClient { - private static final Executor NET_EXECUTOR = Executors.newCachedThreadPool(r -> { - Thread t = new Thread(r, "A2A-Android-Net"); - t.setDaemon(true); - return t; - }); - - @Override - public GetBuilder createGet() { - return new AndroidGetBuilder(); - } - - @Override - public PostBuilder createPost() { - return new AndroidPostBuilder(); - } - - @Override - public DeleteBuilder createDelete() { - return new AndroidDeleteBuilder(); - } - - private abstract static class AndroidBuilder> implements Builder { - protected String url = ""; - protected Map headers = new HashMap<>(); + private static final Executor NET_EXECUTOR = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "A2A-Android-Net"); + t.setDaemon(true); + return t; + }); @Override - public T url(String url) { - this.url = url; - return self(); + public GetBuilder createGet() { + return new AndroidGetBuilder(); } @Override - public T addHeader(String name, String value) { - headers.put(name, value); - return self(); + public PostBuilder createPost() { + return new AndroidPostBuilder(); } @Override - public T addHeaders(Map headers) { - if (headers != null) { - this.headers.putAll(headers); - } - return self(); + public DeleteBuilder createDelete() { + return new AndroidDeleteBuilder(); } - @SuppressWarnings("unchecked") - protected T self() { - return (T) this; - } + private abstract static class AndroidBuilder> implements Builder { - protected HttpURLConnection createConnection(String method, boolean isSSE) throws IOException { - URL urlObj; - try { - urlObj = new URI(url).toURL(); - } catch (URISyntaxException e) { - throw new MalformedURLException("Invalid URL: " + url); - } - HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); - connection.setRequestMethod(method); - connection.setConnectTimeout(15000); // 15 seconds - connection.setReadTimeout(60000); // 60 seconds - for (Map.Entry header : headers.entrySet()) { - connection.setRequestProperty(header.getKey(), header.getValue()); - } - if (isSSE) { - connection.setRequestProperty("Accept", "text/event-stream"); - } - return connection; - } + protected String url = ""; + protected Map headers = new HashMap<>(); - protected static String readStreamWithLimit(InputStream is) throws IOException { - if (is == null) { - return ""; - } - int maxResponseSize = 10 * 1024 * 1024; // 10 MB - try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { - StringBuilder sb = new StringBuilder(); - String line; - boolean first = true; - while ((line = reader.readLine()) != null) { - if (sb.length() + line.length() > maxResponseSize) { - throw new IOException("Response size exceeds limit"); - } - if (!first) { - sb.append('\n'); - } - sb.append(line); - first = false; + @Override + public T url(String url) { + this.url = url; + return self(); } - return sb.toString(); - } - } - protected A2AHttpResponse execute(HttpURLConnection connection) throws IOException { - int status = connection.getResponseCode(); - String body = ""; - try (InputStream is = - (status >= HTTP_OK && status < HTTP_MULT_CHOICE) - ? connection.getInputStream() - : connection.getErrorStream()) { - body = readStreamWithLimit(is); - } - - if (status == HTTP_UNAUTHORIZED) { - throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); - } else if (status == HTTP_FORBIDDEN) { - throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); - } - - return new AndroidHttpResponse(status, body); - } + @Override + public T addHeader(String name, String value) { + headers.put(name, value); + return self(); + } + + @Override + public T addHeaders(Map headers) { + if (headers != null) { + this.headers.putAll(headers); + } + return self(); + } - protected void processSSEResponse( - HttpURLConnection connection, - Consumer messageConsumer, - Consumer errorConsumer, - Runnable completeRunnable) { - try { - int status = connection.getResponseCode(); - if (status != HTTP_OK) { - if (status == HTTP_UNAUTHORIZED) { - errorConsumer.accept(new IOException(A2AErrorMessages.AUTHENTICATION_FAILED)); - return; - } else if (status == HTTP_FORBIDDEN) { - errorConsumer.accept(new IOException(A2AErrorMessages.AUTHORIZATION_FAILED)); - return; - } - - String errorBody = ""; - try (InputStream es = connection.getErrorStream()) { - errorBody = readStreamWithLimit(es); - } - errorConsumer.accept( - new IOException("Request failed with status " + status + ":" + errorBody)); - return; + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; } - try (InputStream is = connection.getInputStream(); - BufferedReader reader = - new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { - String line; - while ((line = reader.readLine()) != null) { - if (line.startsWith("data:")) { - String data = line.substring(5).trim(); - if (!data.isEmpty()) { - messageConsumer.accept(data); - } + protected HttpURLConnection createConnection(String method, boolean isSSE) throws IOException { + URL urlObj; + try { + urlObj = new URI(url).toURL(); + } catch (URISyntaxException e) { + throw new MalformedURLException("Invalid URL: " + url); + } + HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection(); + connection.setRequestMethod(method); + connection.setConnectTimeout(15000); // 15 seconds + connection.setReadTimeout(60000); // 60 seconds + for (Map.Entry header : headers.entrySet()) { + connection.setRequestProperty(header.getKey(), header.getValue()); } - } - completeRunnable.run(); + if (isSSE) { + connection.setRequestProperty("Accept", "text/event-stream"); + } + return connection; } - } catch (Exception e) { - errorConsumer.accept(e); - } finally { - connection.disconnect(); - } - } - protected CompletableFuture executeAsyncSSE( - HttpURLConnection connection, - Consumer messageConsumer, - Consumer errorConsumer, - Runnable completeRunnable) { - return CompletableFuture.runAsync( - () -> processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable), - NET_EXECUTOR); - } - } + protected static String readStreamWithLimit(InputStream is) throws IOException { + if (is == null) { + return ""; + } + int maxResponseSize = 10 * 1024 * 1024; // 10 MB + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + StringBuilder sb = new StringBuilder(); + String line; + boolean first = true; + while ((line = reader.readLine()) != null) { + if (sb.length() + line.length() > maxResponseSize) { + throw new IOException("Response size exceeds limit"); + } + if (!first) { + sb.append('\n'); + } + sb.append(line); + first = false; + } + return sb.toString(); + } + } - private static class AndroidGetBuilder extends AndroidBuilder implements GetBuilder { - @Override - public A2AHttpResponse get() throws IOException { - HttpURLConnection connection = createConnection("GET", false); - try { - return execute(connection); - } catch (IOException e) { - connection.disconnect(); - throw e; - } - } + protected A2AHttpResponse execute(HttpURLConnection connection) throws IOException { + int status = connection.getResponseCode(); + String body = ""; + try (InputStream is + = (status >= HTTP_OK && status < HTTP_MULT_CHOICE) + ? connection.getInputStream() + : connection.getErrorStream()) { + body = readStreamWithLimit(is); + } - @Override - public CompletableFuture getAsyncSSE( - Consumer messageConsumer, - Consumer errorConsumer, - Runnable completeRunnable) - throws IOException { - HttpURLConnection connection = createConnection("GET", true); - return executeAsyncSSE(connection, messageConsumer, errorConsumer, completeRunnable); - } - } + if (status == HTTP_UNAUTHORIZED) { + throw new IOException(A2AErrorMessages.AUTHENTICATION_FAILED); + } else if (status == HTTP_FORBIDDEN) { + throw new IOException(A2AErrorMessages.AUTHORIZATION_FAILED); + } - private static class AndroidPostBuilder extends AndroidBuilder - implements PostBuilder { - private String body = ""; + return new AndroidHttpResponse(status, body); + } - @Override - public PostBuilder body(String body) { - this.body = body; - return this; + protected void processSSEResponse( + HttpURLConnection connection, + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) { + try { + int status = connection.getResponseCode(); + if (status != HTTP_OK) { + if (status == HTTP_UNAUTHORIZED) { + errorConsumer.accept(new IOException(A2AErrorMessages.AUTHENTICATION_FAILED)); + return; + } else if (status == HTTP_FORBIDDEN) { + errorConsumer.accept(new IOException(A2AErrorMessages.AUTHORIZATION_FAILED)); + return; + } + + String errorBody = ""; + try (InputStream es = connection.getErrorStream()) { + errorBody = readStreamWithLimit(es); + } + errorConsumer.accept( + new IOException("Request failed with status " + status + ":" + errorBody)); + return; + } + + ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer); + + try (InputStream is = connection.getInputStream(); BufferedReader reader + = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + sseParser.processLine(line); + } + sseParser.flush(); + completeRunnable.run(); + } + } catch (Exception e) { + errorConsumer.accept(e); + } finally { + connection.disconnect(); + } + } + + protected CompletableFuture executeAsyncSSE( + HttpURLConnection connection, + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) { + return CompletableFuture.runAsync( + () -> processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable), + NET_EXECUTOR); + } } - @Override - public A2AHttpResponse post() throws IOException { - HttpURLConnection connection = createConnection("POST", false); - connection.setDoOutput(true); - try { - try (OutputStream os = connection.getOutputStream()) { - os.write(body.getBytes(StandardCharsets.UTF_8)); + private static class AndroidGetBuilder extends AndroidBuilder implements GetBuilder { + + @Override + public A2AHttpResponse get() throws IOException { + HttpURLConnection connection = createConnection("GET", false); + try { + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; + } + } + + @Override + public CompletableFuture getAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) + throws IOException, InterruptedException { + HttpURLConnection connection = createConnection("GET", true); + return executeAsyncSSE(connection, messageConsumer, errorConsumer, completeRunnable); } - return execute(connection); - } catch (IOException e) { - connection.disconnect(); - throw e; - } } - @Override - public CompletableFuture postAsyncSSE( - Consumer messageConsumer, - Consumer errorConsumer, - Runnable completeRunnable) - throws IOException { - HttpURLConnection connection = createConnection("POST", true); - connection.setDoOutput(true); - - return CompletableFuture.runAsync( - () -> { + private static class AndroidPostBuilder extends AndroidBuilder + implements PostBuilder { + + private String body = ""; + + @Override + public PostBuilder body(String body) { + this.body = body; + return this; + } + + @Override + public A2AHttpResponse post() throws IOException { + HttpURLConnection connection = createConnection("POST", false); + connection.setDoOutput(true); try { - try (OutputStream os = connection.getOutputStream()) { - os.write(body.getBytes(StandardCharsets.UTF_8)); - } - processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable); - } catch (Exception e) { - errorConsumer.accept(e); + try (OutputStream os = connection.getOutputStream()) { + os.write(body.getBytes(StandardCharsets.UTF_8)); + } + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; } - }, NET_EXECUTOR); + } + + @Override + public CompletableFuture postAsyncSSE( + Consumer messageConsumer, + Consumer errorConsumer, + Runnable completeRunnable) + throws IOException, InterruptedException { + HttpURLConnection connection = createConnection("POST", true); + connection.setDoOutput(true); + + return CompletableFuture.runAsync( + () -> { + try { + try (OutputStream os = connection.getOutputStream()) { + os.write(body.getBytes(StandardCharsets.UTF_8)); + } + processSSEResponse(connection, messageConsumer, errorConsumer, completeRunnable); + } catch (Exception e) { + errorConsumer.accept(e); + } + }, NET_EXECUTOR); + } } - } - private static class AndroidDeleteBuilder extends AndroidBuilder - implements DeleteBuilder { - @Override - public A2AHttpResponse delete() throws IOException { - HttpURLConnection connection = createConnection("DELETE", false); - try { - return execute(connection); - } catch (IOException e) { - connection.disconnect(); - throw e; - } + private static class AndroidDeleteBuilder extends AndroidBuilder + implements DeleteBuilder { + + @Override + public A2AHttpResponse delete() throws IOException { + HttpURLConnection connection = createConnection("DELETE", false); + try { + return execute(connection); + } catch (IOException e) { + connection.disconnect(); + throw e; + } + } } - } - private record AndroidHttpResponse(int status, String body) implements A2AHttpResponse { - @Override - public boolean success() { - return status >= HTTP_OK && status < HTTP_MULT_CHOICE; + private record AndroidHttpResponse(int status, String body) implements A2AHttpResponse { + + @Override + public boolean success() { + return status >= HTTP_OK && status < HTTP_MULT_CHOICE; + } } - } } diff --git a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java index 52c252a8f..cccfe38ec 100644 --- a/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java +++ b/http-client/src/main/java/io/a2a/client/http/A2AHttpClient.java @@ -22,7 +22,7 @@ interface Builder> { interface GetBuilder extends Builder { A2AHttpResponse get() throws IOException, InterruptedException; CompletableFuture getAsyncSSE( - Consumer messageConsumer, + Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException; } @@ -31,7 +31,7 @@ interface PostBuilder extends Builder { PostBuilder body(String body); A2AHttpResponse post() throws IOException, InterruptedException; CompletableFuture postAsyncSSE( - Consumer messageConsumer, + Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException; } diff --git a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java index 0759ede26..30b7a6cf9 100644 --- a/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java +++ b/http-client/src/main/java/io/a2a/client/http/JdkA2AHttpClient.java @@ -101,10 +101,12 @@ protected void checkAuthErrors(HttpResponse response) throws IOException protected CompletableFuture asyncRequest( HttpRequest request, - Consumer messageConsumer, + Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable ) { + ServerSentEventParser sseParser = new ServerSentEventParser(messageConsumer, errorConsumer); + Flow.Subscriber subscriber = new Flow.Subscriber() { private Flow.@Nullable Subscription subscription; private volatile boolean errorRaised = false; @@ -117,12 +119,8 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(String item) { - // SSE messages sometimes start with "data:". Strip that off - if (item != null && item.startsWith("data:")) { - item = item.substring(5).trim(); - if (!item.isEmpty()) { - messageConsumer.accept(item); - } + if (item != null) { + sseParser.processLine(item); } if (subscription != null) { subscription.request(1); @@ -143,6 +141,7 @@ public void onError(Throwable throwable) { @Override public void onComplete() { if (!errorRaised) { + sseParser.flush(); completeRunnable.run(); } if (subscription != null) { @@ -226,7 +225,7 @@ public A2AHttpResponse get() throws IOException, InterruptedException { @Override public CompletableFuture getAsyncSSE( - Consumer messageConsumer, + Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { HttpRequest request = createRequestBuilder(true) @@ -282,7 +281,7 @@ public A2AHttpResponse post() throws IOException, InterruptedException { @Override public CompletableFuture postAsyncSSE( - Consumer messageConsumer, + Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { HttpRequest request = createRequestBuilder(true) diff --git a/http-client/src/main/java/io/a2a/client/http/ServerSentEvent.java b/http-client/src/main/java/io/a2a/client/http/ServerSentEvent.java new file mode 100644 index 000000000..9b490b4b3 --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/ServerSentEvent.java @@ -0,0 +1,32 @@ +package io.a2a.client.http; + +import io.a2a.util.Assert; +import org.jspecify.annotations.Nullable; + +/** + * Represents a parsed Server-Sent Event (SSE). + *

+ * Each instance carries the fields defined by the SSE specification: + *

    + *
  • {@code data} — the event payload, already concatenated from one or more {@code data:} lines
  • + *
  • {@code eventType} — the event type from the {@code event:} field; never null, defaults to {@code "message"}
  • + *
  • {@code id} — the event ID from the {@code id:} field; null if absent
  • + *
  • {@code retry} — the reconnection interval in milliseconds from the {@code retry:} field; null if absent
  • + *
+ */ +public record ServerSentEvent(String data, String eventType, @Nullable String id, @Nullable Long retry) { + + /** + * Default event type per the SSE specification when no {@code event:} field is present. + */ + public static final String DEFAULT_EVENT_TYPE = "message"; + + public ServerSentEvent { + Assert.checkNotNullParam("data", data); + Assert.checkNotNullParam("eventType", eventType); + } + + public ServerSentEvent(String data) { + this(data, DEFAULT_EVENT_TYPE, null, null); + } +} diff --git a/http-client/src/main/java/io/a2a/client/http/ServerSentEventParser.java b/http-client/src/main/java/io/a2a/client/http/ServerSentEventParser.java new file mode 100644 index 000000000..a4815806b --- /dev/null +++ b/http-client/src/main/java/io/a2a/client/http/ServerSentEventParser.java @@ -0,0 +1,192 @@ +package io.a2a.client.http; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.logging.Logger; +import org.jspecify.annotations.Nullable; + +/** + * Streaming parser for Server-Sent Events (SSE). + *

+ * Feed lines one at a time via {@link #processLine}; call {@link #flush} when the stream ends. + * Not thread-safe — each connection should use its own instance. + */ +public class ServerSentEventParser { + private static final Logger log = Logger.getLogger(ServerSentEventParser.class.getName()); + + private static final int MAX_BUFFER_SIZE = 1000; + private static final int MAX_BUFFER_BYTES = 1024 * 1024; // 1 MB + private static final int MAX_LINE_LENGTH = 65536; // 64 KB + + private final Consumer eventConsumer; + private final @Nullable Consumer errorConsumer; + private final List dataBuffer = new ArrayList<>(); + private int dataBufferBytes = 0; + private @Nullable String eventType; + private @Nullable String currentEventId; + private @Nullable String lastEventId; + private @Nullable Long retry; + // Set when the current event block is corrupt (line too long, buffer overflow). + // All further fields are ignored until the next empty-line boundary. + private boolean skippingCurrentEvent = false; + + public ServerSentEventParser(Consumer eventConsumer) { + this(eventConsumer, null); + } + + public ServerSentEventParser(Consumer eventConsumer, @Nullable Consumer errorConsumer) { + this.eventConsumer = eventConsumer; + this.errorConsumer = errorConsumer; + } + + /** + * Processes a single line from the SSE stream. An empty line dispatches any buffered event. + * A {@code null} line is routed to the error consumer (or logged) without affecting the current event block. + */ + public void processLine(@Nullable String line) { + if (line == null) { + handleError(new IllegalArgumentException("Line cannot be null")); + return; + } + + // Check line length to prevent DoS; corrupt the current event so it is not dispatched + if (line.length() > MAX_LINE_LENGTH) { + handleError(new IllegalArgumentException("Line exceeds maximum length of " + MAX_LINE_LENGTH + " characters")); + skippingCurrentEvent = true; + dataBuffer.clear(); + dataBufferBytes = 0; + return; + } + + if (skippingCurrentEvent && !line.isEmpty()) { + return; + } + + // Empty line - dispatch the buffered event + if (line.isEmpty()) { + dispatchEvent(); + return; + } + + // Comment line - ignore + if (line.startsWith(":")) { + return; + } + + // Parse field and value + int colonIndex = line.indexOf(':'); + if (colonIndex == -1) { + // Field with no value + processField(line, ""); + } else { + String field = line.substring(0, colonIndex); + String value = line.substring(colonIndex + 1); + + // Remove optional leading space from value + if (value.startsWith(" ")) { + value = value.substring(1); + } + + processField(field, value); + } + } + + private void processField(String field, String value) { + switch (field) { + case "data" -> { + // Check line count to prevent DoS; corrupt and skip the rest of this event block + if (dataBuffer.size() >= MAX_BUFFER_SIZE) { + handleError(new IllegalStateException("SSE data buffer exceeded maximum size of " + MAX_BUFFER_SIZE + " lines")); + skippingCurrentEvent = true; + dataBuffer.clear(); + dataBufferBytes = 0; + return; + } + // Check total byte size to prevent OOM on large streams + if (dataBufferBytes + value.length() > MAX_BUFFER_BYTES) { + handleError(new IllegalStateException("SSE data buffer exceeded maximum byte size of " + MAX_BUFFER_BYTES + " bytes")); + skippingCurrentEvent = true; + dataBuffer.clear(); + dataBufferBytes = 0; + return; + } + dataBuffer.add(value); + dataBufferBytes += value.length(); + } + case "event" -> eventType = value; + case "id" -> { + // Per SSE spec: ignore the id field if the value contains a U+0000 NULL character. + // An empty value is valid and clears the last event ID buffer on dispatch. + if (value.indexOf('\0') == -1) { + currentEventId = value; + } + } + case "retry" -> { + // Per SSE spec: ignore the retry field unless the value consists entirely of ASCII digits. + if (!value.isEmpty() && value.chars().allMatch(c -> c >= '0' && c <= '9')) { + try { + retry = Long.parseLong(value); + } catch (NumberFormatException e) { + // Value is all digits but too large for long; log and ignore per spec. + log.fine("Ignoring retry value out of long range: " + value); + } + } else { + log.fine("Ignoring non-digit retry value: " + value); + } + } + default -> { + // Unknown field - ignore per spec + log.fine("Ignoring unknown SSE field: " + field); + } + } + } + + private void dispatchEvent() { + // Per SSE spec: update lastEventId before checking data, so ID-only events (e.g. heartbeats) are tracked + if (currentEventId != null) { + lastEventId = currentEventId; + } + + String data = String.join("\n", dataBuffer); + String type = eventType; + String id = currentEventId; + + // Always reset at event boundary, regardless of whether an event is dispatched + dataBuffer.clear(); + dataBufferBytes = 0; + eventType = null; + skippingCurrentEvent = false; + // currentEventId is NOT reset — it persists per the SSE specification + + // Per SSE spec: don't dispatch if data is empty (also covers limit-violation blocks, whose buffer was cleared) + if (data.isEmpty()) { + return; + } + + eventConsumer.accept(new ServerSentEvent(data, type != null ? type : ServerSentEvent.DEFAULT_EVENT_TYPE, id, retry)); + } + + /** Dispatches any buffered data not yet followed by an empty line. Call when the stream ends. */ + public void flush() { + dispatchEvent(); + } + + /** Returns the last event ID received, or {@code null} if none. */ + public @Nullable String getLastEventId() { + return lastEventId; + } + + /** Returns the reconnection interval in milliseconds from the last {@code retry:} field, or {@code null} if none. */ + public @Nullable Long getRetry() { + return retry; + } + + private void handleError(Throwable error) { + if (errorConsumer != null) { + errorConsumer.accept(error); + } else { + log.warning("SSE parsing error: " + error.getMessage()); + } + } +} diff --git a/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java b/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java index 3acad3b4f..497adc4b2 100644 --- a/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java +++ b/http-client/src/test/java/io/a2a/client/http/A2ACardResolverTest.java @@ -152,7 +152,7 @@ public String body() { } @Override - public CompletableFuture getAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { + public CompletableFuture getAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { return null; } diff --git a/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java index 2b9a0f349..fc36a3339 100644 --- a/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java +++ b/http-client/src/test/java/io/a2a/client/http/AbstractA2AHttpClientSSETest.java @@ -62,7 +62,7 @@ public void testGetAsyncSSE() throws Exception { client.createGet() .url(getBaseUrl() + "/sse") .getAsyncSSE( - events::add, + event -> events.add(event.data()), error::set, latch::countDown ); @@ -95,7 +95,7 @@ public void testPostAsyncSSE() throws Exception { .url(getBaseUrl() + "/sse") .body("{\"subscribe\":true}") .postAsyncSSE( - events::add, + event -> events.add(event.data()), error::set, latch::countDown ); @@ -114,7 +114,7 @@ public void testSSEDataPrefixStripping() throws Exception { .respond(response() .withStatusCode(200) .withHeader("Content-Type", "text/event-stream") - .withBody("data: content here\n\ndata:no space\n\ndata: extra spaces \n\n")); + .withBody("data: content here\n\ndata:no space\n\ndata: extra spaces \n\n")); CountDownLatch latch = new CountDownLatch(1); List events = new ArrayList<>(); @@ -123,7 +123,7 @@ public void testSSEDataPrefixStripping() throws Exception { client.createGet() .url(getBaseUrl() + "/sse") .getAsyncSSE( - events::add, + event -> events.add(event.data()), error::set, latch::countDown ); @@ -132,7 +132,8 @@ public void testSSEDataPrefixStripping() throws Exception { assertNull(error.get()); assertTrue(events.contains("content here"), "Should have stripped 'data: ' prefix"); assertTrue(events.contains("no space"), "Should handle 'data:' without space"); - assertTrue(events.contains("extra spaces"), "Should trim whitespace"); + // SSE spec: only first space after colon is removed, rest is preserved + assertTrue(events.contains(" extra spaces "), "Should preserve whitespace after first space"); } @Test @@ -209,7 +210,7 @@ public void testSSEEmptyLinesIgnored() throws Exception { client.createGet() .url(getBaseUrl() + "/sse") .getAsyncSSE( - events::add, + event -> events.add(event.data()), error::set, latch::countDown ); @@ -243,7 +244,7 @@ public void testSSEHeaderPropagation() throws Exception { .url(getBaseUrl() + "/sse") .addHeader("Authorization", "Bearer token") .getAsyncSSE( - events::add, + event -> events.add(event.data()), error::set, latch::countDown ); @@ -252,4 +253,128 @@ public void testSSEHeaderPropagation() throws Exception { assertNull(error.get()); assertTrue(events.contains("authenticated")); } + + @Test + public void testSSETypedEvents() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("event: update\ndata: payload1\n\nevent: delete\ndata: payload2\n\ndata: no-type\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE(events::add, error::set, latch::countDown); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(3, events.size()); + assertEquals("update", events.get(0).eventType()); + assertEquals("payload1", events.get(0).data()); + assertEquals("delete", events.get(1).eventType()); + assertEquals("payload2", events.get(1).data()); + assertEquals("message", events.get(2).eventType(), "Event without 'event:' field should default to 'message'"); + } + + @Test + public void testSSEEventIdAndLastEventId() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("id: 42\ndata: identified\n\ndata: no-id\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE(events::add, error::set, latch::countDown); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(2, events.size()); + assertEquals("42", events.get(0).id()); + assertEquals("42", events.get(1).id(), "Event without 'id:' field inherits last event ID per SSE spec"); + } + + @Test + public void testSSEMultiLineData() throws Exception { + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: line1\ndata: line2\ndata: line3\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE(events::add, error::set, latch::countDown); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(1, events.size()); + assertEquals("line1\nline2\nline3", events.get(0).data()); + } + + @Test + public void testSSEStreamEndingWithoutTrailingEmptyLine() throws Exception { + // Stream ends mid-event with no terminal empty line — flush() must dispatch the buffered data + mockServer + .when(request().withMethod("GET").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("data: flushed-event")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createGet() + .url(getBaseUrl() + "/sse") + .getAsyncSSE(events::add, error::set, latch::countDown); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(1, events.size(), "Buffered event must be dispatched on stream end"); + assertEquals("flushed-event", events.get(0).data()); + } + + @Test + public void testPostSSETypedEvents() throws Exception { + mockServer + .when(request().withMethod("POST").withPath("/sse")) + .respond(response() + .withStatusCode(200) + .withHeader("Content-Type", "text/event-stream") + .withBody("event: result\nid: 99\ndata: done\n\n")); + + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + + client.createPost() + .url(getBaseUrl() + "/sse") + .body("{}") + .postAsyncSSE(events::add, error::set, latch::countDown); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertNull(error.get()); + assertEquals(1, events.size()); + assertEquals("result", events.get(0).eventType()); + assertEquals("99", events.get(0).id()); + assertEquals("done", events.get(0).data()); + } } diff --git a/http-client/src/test/java/io/a2a/client/http/ServerSentEventParserTest.java b/http-client/src/test/java/io/a2a/client/http/ServerSentEventParserTest.java new file mode 100644 index 000000000..456b54bce --- /dev/null +++ b/http-client/src/test/java/io/a2a/client/http/ServerSentEventParserTest.java @@ -0,0 +1,512 @@ +package io.a2a.client.http; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +public class ServerSentEventParserTest { + + @Test + public void testSimpleDataEvent() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: Hello World"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Hello World", events.get(0).data()); + assertEquals("message", events.get(0).eventType()); + assertNull(events.get(0).id()); + assertNull(events.get(0).retry()); + } + + @Test + public void testMultiLineDataEvent() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: First line"); + parser.processLine("data: Second line"); + parser.processLine("data: Third line"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("First line\nSecond line\nThird line", events.get(0).data()); + } + + @Test + public void testEventWithType() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("event: custom"); + parser.processLine("data: Custom event data"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Custom event data", events.get(0).data()); + assertEquals("custom", events.get(0).eventType()); + } + + @Test + public void testEventWithId() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("id: 123"); + parser.processLine("data: Event with ID"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Event with ID", events.get(0).data()); + assertEquals("123", events.get(0).id()); + assertEquals("123", parser.getLastEventId()); + } + + @Test + public void testEventWithRetry() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("retry: 5000"); + parser.processLine("data: Event with retry"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Event with retry", events.get(0).data()); + assertEquals(5000L, events.get(0).retry()); + assertEquals(5000L, parser.getRetry()); + } + + @Test + public void testCompleteEvent() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("event: notification"); + parser.processLine("id: msg-001"); + parser.processLine("retry: 3000"); + parser.processLine("data: Complete event"); + parser.processLine(""); + + assertEquals(1, events.size()); + ServerSentEvent event = events.get(0); + assertEquals("Complete event", event.data()); + assertEquals("notification", event.eventType()); + assertEquals("msg-001", event.id()); + assertEquals(3000L, event.retry()); + } + + @Test + public void testMultipleEvents() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + // First event + parser.processLine("event: type1"); + parser.processLine("data: First"); + parser.processLine(""); + + // Second event + parser.processLine("event: type2"); + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("First", events.get(0).data()); + assertEquals("type1", events.get(0).eventType()); + assertEquals("Second", events.get(1).data()); + assertEquals("type2", events.get(1).eventType()); + } + + @Test + public void testEmptyIdClearsLastEventId() { + // Per WHATWG SSE spec: id: with an empty value sets lastEventId to "" (clears it). + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("id: initial"); + parser.processLine("data: First"); + parser.processLine(""); + + parser.processLine("id:"); + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("initial", events.get(0).id()); + assertEquals("", events.get(1).id(), "Empty id: should set currentEventId to empty string"); + assertEquals("", parser.getLastEventId(), "Empty id: should clear lastEventId to empty string"); + } + + @Test + public void testInvalidRetryIsIgnored() { + // Per SSE spec: non-digit retry values are silently ignored; the event is still dispatched. + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + assertDoesNotThrow(() -> parser.processLine("retry: not-a-number")); + assertDoesNotThrow(() -> parser.processLine("retry: +100")); + assertDoesNotThrow(() -> parser.processLine("retry: -1")); + assertDoesNotThrow(() -> parser.processLine("retry: 1.5")); + parser.processLine("data: Test"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertNull(events.get(0).retry(), "Retry should remain null after invalid values"); + } + + @Test + public void testCommentLinesIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine(": This is a comment"); + parser.processLine("data: Real data"); + parser.processLine(": Another comment"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Real data", events.get(0).data()); + } + + @Test + public void testDataPrefixStripping() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: with space"); + parser.processLine(""); + parser.processLine("data:no space"); + parser.processLine(""); + parser.processLine("data: extra spaces "); + parser.processLine(""); + + assertEquals(3, events.size()); + assertEquals("with space", events.get(0).data()); + assertEquals("no space", events.get(1).data()); + // SSE spec: remove only the first space after colon, preserve the rest + assertEquals(" extra spaces ", events.get(2).data()); + } + + @Test + public void testEmptyDataFieldIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data:"); + parser.processLine(""); + + assertEquals(0, events.size(), "Empty data field should not dispatch event"); + } + + @Test + public void testMultipleEmptyLinesIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: first"); + parser.processLine(""); + parser.processLine(""); + parser.processLine(""); + parser.processLine("data: second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("first", events.get(0).data()); + assertEquals("second", events.get(1).data()); + } + + @Test + public void testFieldWithoutColon() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data"); + parser.processLine(""); + + assertEquals(0, events.size(), "Field without value should result in empty data"); + } + + @Test + public void testFlush() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: Unflushed"); + assertEquals(0, events.size(), "Event should not be dispatched yet"); + + parser.flush(); + assertEquals(1, events.size(), "Flush should dispatch buffered event"); + assertEquals("Unflushed", events.get(0).data()); + } + + @Test + public void testNullLineIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine(null); + parser.processLine("data: Valid"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Valid", events.get(0).data()); + } + + @Test + public void testEventTypeResetBetweenEvents() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("event: custom"); + parser.processLine("data: First"); + parser.processLine(""); + + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("custom", events.get(0).eventType()); + assertEquals("message", events.get(1).eventType(), "Event type should reset to 'message' after dispatch"); + } + + @Test + public void testIdPersistsAcrossEvents() { + // Per SSE spec, the "last event ID buffer" is never reset between events; + // it persists until explicitly changed by another id: field. + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("id: 100"); + parser.processLine("data: First"); + parser.processLine(""); + + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("100", events.get(0).id()); + assertEquals("100", events.get(1).id(), "ID should carry over to subsequent events per SSE spec"); + assertEquals("100", parser.getLastEventId(), "lastEventId should persist"); + } + + @Test + public void testIdWithNullCharacterIsIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + // id containing U+0000 must be ignored per SSE spec + parser.processLine("id: before"); + parser.processLine("data: First"); + parser.processLine(""); + + parser.processLine("id: invalid\u0000id"); + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals("before", events.get(0).id()); + // The null-containing id is ignored; currentEventId stays "before" (it persists) + assertEquals("before", events.get(1).id()); + // lastEventId should still be "before" since the null id was discarded + assertEquals("before", parser.getLastEventId()); + } + + @Test + public void testRetryPersistsAcrossEvents() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("retry: 2000"); + parser.processLine("data: First"); + parser.processLine(""); + + parser.processLine("data: Second"); + parser.processLine(""); + + assertEquals(2, events.size()); + assertEquals(2000L, events.get(0).retry()); + assertEquals(2000L, events.get(1).retry()); + assertEquals(2000L, parser.getRetry(), "Retry should persist"); + } + + @Test + public void testUnknownFieldIgnored() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("unknown: field"); + parser.processLine("data: Valid"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("Valid", events.get(0).data()); + } + + // --- errorConsumer tests --- + + @Test + public void testErrorConsumerCalledForNullLine() { + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, error::set); + + parser.processLine(null); + + assertNotNull(error.get(), "errorConsumer should be called for null line"); + assertTrue(error.get() instanceof IllegalArgumentException); + assertEquals(0, events.size(), "No events should be dispatched"); + } + + @Test + public void testErrorConsumerCalledForLineTooLong() { + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, error::set); + + // Oversized line mid-event: the whole event block is discarded + parser.processLine("data: before overflow"); + String longLine = "data: " + "x".repeat(65537); + parser.processLine(longLine); + // Subsequent lines in the same block are skipped + parser.processLine("data: should be skipped"); + parser.processLine(""); // end of corrupted block — nothing dispatched + + assertNotNull(error.get(), "errorConsumer should be called for oversized line"); + assertTrue(error.get() instanceof IllegalArgumentException); + assertTrue(error.get().getMessage().contains("maximum length")); + assertEquals(0, events.size(), "Corrupted event block must not be dispatched"); + + // Parser recovers cleanly at the next event boundary + parser.processLine("data: recovered"); + parser.processLine(""); + assertEquals(1, events.size(), "Parser should recover after oversized line"); + assertEquals("recovered", events.get(0).data()); + } + + @Test + public void testErrorConsumerCalledForBufferOverflow() { + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, error::set); + + for (int i = 0; i < 1000; i++) { + parser.processLine("data: line" + i); + } + assertNull(error.get(), "No error expected before limit"); + + parser.processLine("data: overflow"); + assertNotNull(error.get(), "errorConsumer should be called when buffer limit exceeded"); + assertTrue(error.get() instanceof IllegalStateException); + assertTrue(error.get().getMessage().contains("maximum size")); + + // Lines in the same event block after the overflow are skipped + parser.processLine("data: skipped in same block"); + parser.processLine(""); // end of corrupted block — nothing dispatched + assertEquals(0, events.size(), "Corrupted event block must not be dispatched"); + + // Parser recovers cleanly at the next event boundary + parser.processLine("data: recovered"); + parser.processLine(""); + assertEquals(1, events.size(), "Parser should recover after buffer overflow"); + assertEquals("recovered", events.get(0).data()); + } + + @Test + public void testErrorConsumerCalledForBufferByteOverflow() { + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, error::set); + + // Value is 65530 chars so the full line ("data: " + value = 65536) stays within the per-line + // limit; 17 such lines (17 * 65530 = 1,114,010 bytes) exceed the 1MB buffer byte limit. + String bigValue = "x".repeat(65530); + for (int i = 0; i < 17; i++) { + parser.processLine("data: " + bigValue); + } + + assertNotNull(error.get(), "errorConsumer should be called when byte limit exceeded"); + assertTrue(error.get() instanceof IllegalStateException); + assertTrue(error.get().getMessage().contains("maximum byte size")); + + // Lines in the same event block after the overflow are skipped + parser.processLine("data: skipped in same block"); + parser.processLine(""); // end of corrupted block — nothing dispatched + assertEquals(0, events.size(), "Corrupted event block must not be dispatched"); + + // Parser recovers cleanly at the next event boundary + parser.processLine("data: recovered"); + parser.processLine(""); + assertEquals(1, events.size(), "Parser should recover after byte overflow"); + assertEquals("recovered", events.get(0).data()); + } + + @Test + public void testInvalidRetryDoesNotCallErrorConsumer() { + // Per SSE spec: non-digit retry values are silently ignored, not errors. + List events = new ArrayList<>(); + AtomicReference error = new AtomicReference<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, error::set); + + parser.processLine("retry: not-a-number"); + parser.processLine("retry: +100"); + parser.processLine("data: Test"); + parser.processLine(""); + + assertNull(error.get(), "errorConsumer must not be called for non-digit retry values"); + assertEquals(1, events.size(), "Event should still be dispatched"); + assertNull(events.get(0).retry(), "Retry should remain null"); + } + + @Test + public void testProcessingContinuesAfterErrorConsumerInvocation() { + List events = new ArrayList<>(); + List errors = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add, errors::add); + + parser.processLine(null); + parser.processLine("data: recovered"); + parser.processLine(""); + + assertEquals(1, errors.size(), "Should have one error from null line"); + assertEquals(1, events.size(), "Should still dispatch the event after error"); + assertEquals("recovered", events.get(0).data()); + } + + @Test + public void testNullLineWithoutErrorConsumerLogsAndContinues() { + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + // Without errorConsumer: null is logged, not thrown + assertDoesNotThrow(() -> parser.processLine(null)); + + parser.processLine("data: still works"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("still works", events.get(0).data()); + } + + @Test + public void testCRLFLineTerminatorsPreservedInValue() { + // SSEParser processes individual lines after line-splitting by the HTTP client. + // Callers (e.g., BufferedReader.readLine()) strip the \r\n terminator before passing + // the line here, so processLine never receives a bare \r from a CRLF stream. + // If a caller passes a line with a trailing \r (e.g., a non-standard source), it is + // preserved in the data value — stripping is the caller's responsibility. + List events = new ArrayList<>(); + ServerSentEventParser parser = new ServerSentEventParser(events::add); + + parser.processLine("data: value\r"); + parser.processLine(""); + + assertEquals(1, events.size()); + assertEquals("value\r", events.get(0).data()); + } +} diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index 429c27942..32ed93bbb 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -17,6 +17,7 @@ import io.a2a.client.http.A2AHttpClient; import io.a2a.client.http.A2AHttpResponse; +import io.a2a.client.http.ServerSentEvent; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; @@ -229,7 +230,7 @@ public String body() { } @Override - public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { + public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { return null; } diff --git a/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java b/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java index f73c2a7f5..cd42daabf 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java @@ -18,6 +18,7 @@ import io.a2a.client.http.A2AHttpClient; import io.a2a.client.http.A2AHttpResponse; +import io.a2a.client.http.ServerSentEvent; import io.a2a.common.A2AHeaders; import io.a2a.json.JsonProcessingException; import io.a2a.json.JsonUtil; @@ -106,7 +107,7 @@ public String body() { } @Override - public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { + public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { return null; } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java index 7cc62dcfe..37e0a35e4 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/TestHttpClient.java @@ -13,6 +13,7 @@ import io.a2a.client.http.A2AHttpClient; import io.a2a.client.http.A2AHttpResponse; +import io.a2a.client.http.ServerSentEvent; import io.a2a.json.JsonProcessingException; import io.a2a.spec.Task; import io.a2a.json.JsonUtil; @@ -77,7 +78,7 @@ public String body() { } @Override - public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { + public CompletableFuture postAsyncSSE(Consumer messageConsumer, Consumer errorConsumer, Runnable completeRunnable) throws IOException, InterruptedException { return null; }