diff --git a/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt b/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt index 0d729185a..45233ec80 100644 --- a/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt +++ b/graphql-dgs-subscription-types/src/main/kotlin/com/netflix/graphql/types/subscription/OperationMessage.kt @@ -1,5 +1,5 @@ /* - * Copyright 2021 Netflix, Inc. + * Copyright 2022 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package com.netflix.graphql.types.subscription import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo @@ -81,6 +82,7 @@ data class SSEDataPayload( val type: String = SSE_GQL_SUBSCRIPTION_DATA ) : MessagePayload +@JsonIgnoreProperties(ignoreUnknown = true) data class QueryPayload( @JsonProperty("variables") val variables: Map? = emptyMap(), @@ -89,7 +91,9 @@ data class QueryPayload( @JsonProperty("operationName") val operationName: String? = null, @JsonProperty("query") - val query: String + val query: String, + @JsonProperty("key") + val key: String = "" ) : MessagePayload data class Error(@JsonProperty val message: String = "") diff --git a/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt b/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt index dbed8eea1..b8875b355 100644 --- a/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt +++ b/graphql-dgs-subscriptions-sse/src/main/kotlin/com/netflix/graphql/dgs/subscriptions/sse/DgsSSESubscriptionHandler.kt @@ -31,7 +31,9 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.http.codec.ServerSentEvent -import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.RestController import org.springframework.web.server.ServerErrorException @@ -49,14 +51,24 @@ import com.netflix.graphql.types.subscription.Error as SseError @RestController open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor) { - @RequestMapping("/subscriptions", produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) + @GetMapping("/subscriptions", produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun subscriptionWithId(@RequestParam("query") queryBase64: String): Flux> { val query = try { String(Base64.getDecoder().decode(queryBase64), StandardCharsets.UTF_8) } catch (ex: IllegalArgumentException) { throw ServerWebInputException("Error decoding base64-encoded query") } + return handleSubscription(query) + } + + @PostMapping("/subscriptions", produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) + fun subscriptionFromPost( + @RequestBody body: String + ): Flux> { + return handleSubscription(body) + } + private fun handleSubscription(query: String): Flux> { val queryPayload = try { mapper.readValue(query, QueryPayload::class.java) } catch (ex: Exception) { @@ -88,12 +100,17 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor throw ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc) } - val subscriptionId = UUID.randomUUID().toString() + val subscriptionId = if (queryPayload.key == "") { + UUID.randomUUID().toString() + } else { + queryPayload.key + } return Flux.from(publisher) .map { val payload = SSEDataPayload(data = it.getData(), errors = it.errors, subId = subscriptionId) ServerSentEvent.builder(mapper.writeValueAsString(payload)) .id(UUID.randomUUID().toString()) + .event("next") .build() }.onErrorResume { exc -> logger.warn("An exception occurred on subscription {}", subscriptionId, exc) @@ -102,6 +119,7 @@ open class DgsSSESubscriptionHandler(open val dgsQueryExecutor: DgsQueryExecutor Flux.just( ServerSentEvent.builder(mapper.writeValueAsString(payload)) .id(UUID.randomUUID().toString()) + .event("error") .build() ) }