-
Notifications
You must be signed in to change notification settings - Fork 96
feat: Support multiple payload types in push notifications #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,15 @@ | |
| import static io.a2a.client.http.A2AHttpClient.APPLICATION_JSON; | ||
| import static io.a2a.client.http.A2AHttpClient.CONTENT_TYPE; | ||
| import static io.a2a.common.A2AHeaders.X_A2A_NOTIFICATION_TOKEN; | ||
| import static io.a2a.spec.Message.MESSAGE; | ||
| import static io.a2a.spec.Task.TASK; | ||
| import static io.a2a.spec.TaskArtifactUpdateEvent.ARTIFACT_UPDATE; | ||
| import static io.a2a.spec.TaskStatusUpdateEvent.STATUS_UPDATE; | ||
|
|
||
| import io.a2a.spec.Message; | ||
| import io.a2a.spec.StreamingEventKind; | ||
| import io.a2a.spec.TaskArtifactUpdateEvent; | ||
| import io.a2a.spec.TaskStatusUpdateEvent; | ||
| import jakarta.enterprise.context.ApplicationScoped; | ||
| import jakarta.inject.Inject; | ||
|
|
||
|
|
@@ -42,34 +51,45 @@ public BasePushNotificationSender(PushNotificationConfigStore configStore, A2AHt | |
| } | ||
|
|
||
| @Override | ||
| public void sendNotification(Task task) { | ||
| List<PushNotificationConfig> pushConfigs = configStore.getInfo(task.getId()); | ||
| public void sendNotification(StreamingEventKind kind) { | ||
| String taskId = switch (kind.getKind()) { | ||
| case TASK -> ((Task) kind).getId(); | ||
| case MESSAGE -> ((Message)kind).getTaskId(); | ||
| case STATUS_UPDATE -> ((TaskStatusUpdateEvent)kind).getTaskId(); | ||
| case ARTIFACT_UPDATE -> ((TaskArtifactUpdateEvent)kind).getTaskId(); | ||
| default -> null; | ||
| }; | ||
|
Comment on lines
+55
to
+61
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this project uses a modern Java version (as indicated by the use of String taskId = switch (kind) {
case Task t -> t.getId();
case Message m -> m.getTaskId();
case TaskStatusUpdateEvent e -> e.getTaskId();
case TaskArtifactUpdateEvent e -> e.getTaskId();
}; |
||
| if (taskId == null) { | ||
| return; | ||
| } | ||
|
|
||
| List<PushNotificationConfig> pushConfigs = configStore.getInfo(taskId); | ||
| if (pushConfigs == null || pushConfigs.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| List<CompletableFuture<Boolean>> dispatchResults = pushConfigs | ||
| .stream() | ||
| .map(pushConfig -> dispatch(task, pushConfig)) | ||
| .map(pushConfig -> dispatch(kind, pushConfig)) | ||
| .toList(); | ||
| CompletableFuture<Void> allFutures = CompletableFuture.allOf(dispatchResults.toArray(new CompletableFuture[0])); | ||
| CompletableFuture<Boolean> dispatchResult = allFutures.thenApply(v -> dispatchResults.stream() | ||
| .allMatch(CompletableFuture::join)); | ||
| try { | ||
| boolean allSent = dispatchResult.get(); | ||
| if (! allSent) { | ||
| LOGGER.warn("Some push notifications failed to send for taskId: " + task.getId()); | ||
| if (!allSent) { | ||
| LOGGER.warn("Some push notifications failed to send for taskId: " + taskId); | ||
| } | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| LOGGER.warn("Some push notifications failed to send for taskId " + task.getId() + ": {}", e.getMessage(), e); | ||
| LOGGER.warn("Some push notifications failed to send for taskId " + taskId + ": {}", e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| private CompletableFuture<Boolean> dispatch(Task task, PushNotificationConfig pushInfo) { | ||
| return CompletableFuture.supplyAsync(() -> dispatchNotification(task, pushInfo)); | ||
| private CompletableFuture<Boolean> dispatch(StreamingEventKind kind, PushNotificationConfig pushInfo) { | ||
| return CompletableFuture.supplyAsync(() -> dispatchNotification(kind, pushInfo)); | ||
| } | ||
|
|
||
| private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo) { | ||
| private boolean dispatchNotification(StreamingEventKind kind, PushNotificationConfig pushInfo) { | ||
| String url = pushInfo.url(); | ||
| String token = pushInfo.token(); | ||
|
|
||
|
|
@@ -80,7 +100,7 @@ private boolean dispatchNotification(Task task, PushNotificationConfig pushInfo) | |
|
|
||
| String body; | ||
| try { | ||
| body = Utils.OBJECT_MAPPER.writeValueAsString(task); | ||
| body = Utils.OBJECT_MAPPER.writeValueAsString(kind); | ||
| } catch (JsonProcessingException e) { | ||
| LOGGER.debug("Error writing value as string: {}", e.getMessage(), e); | ||
| return false; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,15 @@ | ||
| package io.a2a.server.tasks; | ||
|
|
||
| import io.a2a.spec.Task; | ||
| import io.a2a.spec.StreamingEventKind; | ||
|
|
||
| /** | ||
| * Interface for sending push notifications for tasks. | ||
| */ | ||
| public interface PushNotificationSender { | ||
|
|
||
| /** | ||
| * Sends a push notification containing the latest task state. | ||
| * @param task the task | ||
| * Sends a push notification with a payload related to the task. | ||
| * @param kind the payload to push | ||
| */ | ||
| void sendNotification(Task task); | ||
| void sendNotification(StreamingEventKind kind); | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,6 +16,10 @@ | |||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||
| import java.util.function.Consumer; | ||||||||||||||
|
|
||||||||||||||
| import io.a2a.spec.Message; | ||||||||||||||
| import io.a2a.spec.Part; | ||||||||||||||
| import io.a2a.spec.StreamingEventKind; | ||||||||||||||
| import io.a2a.spec.TextPart; | ||||||||||||||
| import org.junit.jupiter.api.BeforeEach; | ||||||||||||||
| import org.junit.jupiter.api.Test; | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -67,6 +71,7 @@ class TestPostBuilder implements A2AHttpClient.PostBuilder { | |||||||||||||
| @Override | ||||||||||||||
| public PostBuilder body(String body) { | ||||||||||||||
| this.body = body; | ||||||||||||||
| System.out.println("body = " + body); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||
| return this; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -80,6 +85,7 @@ public A2AHttpResponse post() throws IOException, InterruptedException { | |||||||||||||
| Task task = Utils.OBJECT_MAPPER.readValue(body, Task.TYPE_REFERENCE); | ||||||||||||||
| tasks.add(task); | ||||||||||||||
|
Comment on lines
85
to
86
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The mock HTTP client is hardcoded to deserialize the request body into a
Suggested change
|
||||||||||||||
| urls.add(url); | ||||||||||||||
| System.out.println(requestHeaders); | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||
| headers.add(new java.util.HashMap<>(requestHeaders)); | ||||||||||||||
|
|
||||||||||||||
| return new A2AHttpResponse() { | ||||||||||||||
|
|
@@ -95,7 +101,7 @@ public boolean success() { | |||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public String body() { | ||||||||||||||
| return ""; | ||||||||||||||
| return body; | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
| } finally { | ||||||||||||||
|
|
@@ -316,4 +322,45 @@ public void testSendNotificationHttpError() { | |||||||||||||
| // Verify no tasks were successfully processed due to the error | ||||||||||||||
| assertEquals(0, testHttpClient.tasks.size()); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Test | ||||||||||||||
| public void testSendNotificationWithMessage() throws InterruptedException { | ||||||||||||||
| String taskId = "task_send_notification_with_message"; | ||||||||||||||
| Task taskData = createSampleTask(taskId, TaskState.COMPLETED); | ||||||||||||||
| PushNotificationConfig config = createSamplePushConfig("http://notify.me/here", "cfg1", "unique_token"); | ||||||||||||||
|
|
||||||||||||||
| // Set up the configuration in the store | ||||||||||||||
| configStore.setInfo(taskId, config); | ||||||||||||||
|
|
||||||||||||||
| // Set up latch to wait for async completion | ||||||||||||||
| testHttpClient.latch = new CountDownLatch(1); | ||||||||||||||
|
|
||||||||||||||
| Message message = new Message.Builder() | ||||||||||||||
| .taskId(taskId) | ||||||||||||||
| .messageId("task_push_notification_message") | ||||||||||||||
| .parts(Collections.singletonList(new TextPart("Message for task " + taskId))) | ||||||||||||||
| .role(Message.Role.USER) | ||||||||||||||
| .build(); | ||||||||||||||
| sender.sendNotification(message); | ||||||||||||||
|
|
||||||||||||||
| // Wait for the async operation to complete | ||||||||||||||
| assertTrue(testHttpClient.latch.await(5, TimeUnit.SECONDS), "HTTP call should complete within 5 seconds"); | ||||||||||||||
|
|
||||||||||||||
| // Verify the task was sent via HTTP | ||||||||||||||
| assertEquals(1, testHttpClient.tasks.size()); | ||||||||||||||
| Task sentTask = testHttpClient.tasks.get(0); | ||||||||||||||
| assertEquals(taskData.getId(), sentTask.getId()); | ||||||||||||||
|
Comment on lines
+350
to
+352
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These assertions are incorrect. You are sending a |
||||||||||||||
|
|
||||||||||||||
| // Verify that the X-A2A-Notification-Token header is sent with the correct token | ||||||||||||||
| assertEquals(1, testHttpClient.headers.size()); | ||||||||||||||
| Map<String, String> sentHeaders = testHttpClient.headers.get(0); | ||||||||||||||
| assertEquals(2, sentHeaders.size()); | ||||||||||||||
| assertTrue(sentHeaders.containsKey(A2AHeaders.X_A2A_NOTIFICATION_TOKEN)); | ||||||||||||||
| assertEquals(config.token(), sentHeaders.get(A2AHeaders.X_A2A_NOTIFICATION_TOKEN)); | ||||||||||||||
| // Content-Type header should always be present | ||||||||||||||
| assertTrue(sentHeaders.containsKey(CONTENT_TYPE)); | ||||||||||||||
| assertEquals(APPLICATION_JSON, sentHeaders.get(CONTENT_TYPE)); | ||||||||||||||
|
|
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| } | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type of the
capturedvariable is incorrect.mockPushNotificationSender.getCapturedEvents()returns aQueue<StreamingEventKind>, not aQueue<Task>. This will cause a compilation error. You need to update the variable type and add a cast to accessTask-specific methods. It's also safer to check the type before casting.