Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,33 +169,27 @@ type GraphQLWebSocketMiddleware<'Root>
let sendMsg = sendMessageViaSocket serializerOptions socket
let rcv () = socket |> rcvMsgViaSocket serializerOptions

let sendOutput id (output : Output) =
match output.TryGetValue "errors" with
| true, theValue ->
// The specification says: "This message terminates the operation and no further messages will be sent."
subscriptions
|> GraphQLSubscriptionsManagement.removeSubscription (id)
sendMsg (Error (id, unbox theValue))
| false, _ -> sendMsg (Next (id, output))
let sendOutput id (output : SubscriptionExecutionResult) =
sendMsg (Next (id, output))

let sendSubscriptionResponseOutput id subscriptionResult =
match subscriptionResult with
| SubscriptionResult output -> output |> sendOutput id
| SubscriptionResult output -> { Data = ValueSome output; Errors = [] } |> sendOutput id
| SubscriptionErrors (output, errors) ->
logger.LogWarning ("Subscription errors: {subscriptionErrors}", (String.Join ('\n', errors |> Seq.map (fun x -> $"- %s{x.Message}"))))
Task.FromResult ()
{ Data = ValueNone; Errors = errors } |> sendOutput id

let sendDeferredResponseOutput id deferredResult =
match deferredResult with
| DeferredResult (obj, path) ->
let output = obj :?> Dictionary<string, obj>
output |> sendOutput id
{ Data = ValueSome output; Errors = [] } |> sendOutput id
| DeferredErrors (obj, errors, _) ->
logger.LogWarning (
"Deferred response errors: {deferredErrors}",
(String.Join ('\n', errors |> Seq.map (fun x -> $"- %s{x.Message}")))
)
Task.FromResult ()
{ Data = ValueNone; Errors = errors } |> sendOutput id

let sendDeferredResultDelayedBy (ct : CancellationToken) (ms : int) id deferredResult : Task = task {
do! Task.Delay (ms, ct)
Expand All @@ -208,16 +202,16 @@ type GraphQLWebSocketMiddleware<'Root>
(subscriptions, socket, observableOutput, serializerOptions)
|> addClientSubscription id sendSubscriptionResponseOutput
| Deferred (data, errors, observableOutput) ->
do! data |> sendOutput id
do! { Data = ValueSome data; Errors = [] } |> sendOutput id
if errors.IsEmpty then
(subscriptions, socket, observableOutput, serializerOptions)
|> addClientSubscription id (sendDeferredResultDelayedBy cancellationToken 5000)
else
()
| Direct (data, _) -> do! data |> sendOutput id
| Direct (data, _) -> do! { Data = ValueSome data; Errors = [] } |> sendOutput id
| RequestError problemDetails ->
logger.LogWarning("Request errors:\n{errors}", problemDetails)

do! { Data = ValueNone; Errors = problemDetails } |> sendOutput id
}

let logMsgReceivedWithOptionalPayload optionalPayload (msgAsStr : string) =
Expand Down Expand Up @@ -265,22 +259,26 @@ type GraphQLWebSocketMiddleware<'Root>
| ValueNone -> do! ServerPong p |> sendMsg
| ClientPong p -> nameof ClientPong |> logMsgReceivedWithOptionalPayload p
| Subscribe (id, query) ->
nameof Subscribe |> logMsgWithIdReceived id
if subscriptions |> GraphQLSubscriptionsManagement.isIdTaken id then
do!
let warningMsg : FormattableString = $"Subscriber for Id = '{id}' already exists"
logger.LogWarning (String.Format (warningMsg.Format, "id"), id)
socket.CloseAsync (
enum CustomWebSocketStatus.SubscriberAlreadyExists,
warningMsg.ToString (),
CancellationToken.None
)
else
let variables = query.Variables |> Skippable.toOption
let! planExecutionResult =
let root = options.RootFactory httpContext
options.SchemaExecutor.AsyncExecute (query.Query, root, ?variables = variables)
do! planExecutionResult |> applyPlanExecutionResult id socket
try
nameof Subscribe |> logMsgWithIdReceived id
if subscriptions |> GraphQLSubscriptionsManagement.isIdTaken id then
do!
let warningMsg : FormattableString = $"Subscriber for Id = '{id}' already exists"
logger.LogWarning (String.Format (warningMsg.Format, "id"), id)
socket.CloseAsync (
enum CustomWebSocketStatus.SubscriberAlreadyExists,
warningMsg.ToString (),
CancellationToken.None
)
else
let variables = query.Variables |> Skippable.toOption
let! planExecutionResult =
let root = options.RootFactory httpContext
options.SchemaExecutor.AsyncExecute (query.Query, root, ?variables = variables)
do! planExecutionResult |> applyPlanExecutionResult id socket
with ex ->
logger.LogError (ex, "Unexpected error during subscription with id '{id}'", id)
do! sendMsg (Error (id, [new Shared.NameValueLookup ([ ("subscription", "Unexpected error during subscription" :> obj) ])]))
| ClientComplete id ->
"ClientComplete" |> logMsgWithIdReceived id
subscriptions
Expand Down
40 changes: 20 additions & 20 deletions src/FSharp.Data.GraphQL.Server.AspNetCore/StartupExtensions.fs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ module ServiceCollectionExtensions =
executorFactory : Func<IServiceProvider, Executor<'Root>>,
rootFactory : HttpContext -> 'Root,
[<Optional>] additionalConverters : JsonConverter seq,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointUrl : string,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointPath : string,
[<Optional>] configure : Func<GraphQLOptions<'Root>, GraphQLOptions<'Root>>
) =

Expand All @@ -56,7 +56,7 @@ module ServiceCollectionExtensions =

let getOptions sp =
let executor = executorFactory.Invoke sp
let options = createStandardOptions executor rootFactory additionalConverters webSocketEndpointUrl
let options = createStandardOptions executor rootFactory additionalConverters webSocketEndpointPath
match configure with
| null -> options
| _ -> configure.Invoke options
Expand Down Expand Up @@ -99,10 +99,10 @@ module ServiceCollectionExtensions =
executorFactory : Func<IServiceProvider, Executor<'Root>>,
rootFactory : HttpContext -> 'Root,
[<Optional>] additionalConverters : JsonConverter seq,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointUrl : string,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointPath : string,
[<Optional>] configure : Func<GraphQLOptions<'Root>, GraphQLOptions<'Root>>
) =
services.AddGraphQL<'Root, DefaultGraphQLRequestHandler<'Root>> (executorFactory, rootFactory, additionalConverters, webSocketEndpointUrl, configure)
services.AddGraphQL<'Root, DefaultGraphQLRequestHandler<'Root>> (executorFactory, rootFactory, additionalConverters, webSocketEndpointPath, configure)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -118,7 +118,7 @@ module ServiceCollectionExtensions =
rootFactory : HttpContext -> 'Root,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, null, null)
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, configure = null)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -134,7 +134,7 @@ module ServiceCollectionExtensions =
rootFactory : HttpContext -> 'Root,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, null, null)
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, configure = null)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -148,10 +148,10 @@ module ServiceCollectionExtensions =
(
executor : Executor<'Root>,
rootFactory : HttpContext -> 'Root,
webSocketEndpointUrl : string,
webSocketEndpointPath : string,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, webSocketEndpointUrl, null)
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, webSocketEndpointPath, null)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -165,10 +165,10 @@ module ServiceCollectionExtensions =
(
executor : Executor<'Root>,
rootFactory : HttpContext -> 'Root,
webSocketEndpointUrl : string,
webSocketEndpointPath : string,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, webSocketEndpointUrl, null)
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, webSocketEndpointPath, null)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -185,7 +185,7 @@ module ServiceCollectionExtensions =
configure : Func<GraphQLOptions<'Root>, GraphQLOptions<'Root>>,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, null, configure)
services.AddGraphQL<'Root, 'Handler> ((fun _ -> executor), rootFactory, additionalConverters, configure = configure)

/// <summary>
/// Adds GraphQL options and services to the service collection. Requires an executor instance to be provided.
Expand All @@ -202,7 +202,7 @@ module ServiceCollectionExtensions =
configure : Func<GraphQLOptions<'Root>, GraphQLOptions<'Root>>,
[<Optional>] additionalConverters : JsonConverter seq
) =
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, null, configure)
services.AddGraphQL<'Root> ((fun _ -> executor), rootFactory, additionalConverters, configure = configure)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -221,7 +221,7 @@ module ServiceCollectionExtensions =
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, null, null)
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, configure = null)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -240,7 +240,7 @@ module ServiceCollectionExtensions =
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, null, null)
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, configure = null)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -256,11 +256,11 @@ module ServiceCollectionExtensions =
member services.AddGraphQL<'Root, 'Handler when 'Handler :> GraphQLRequestHandler<'Root> and 'Handler : not struct>
(
rootFactory : HttpContext -> 'Root,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointUrl : string,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointPath : string,
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, webSocketEndpointUrl, null)
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, webSocketEndpointPath, null)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -276,11 +276,11 @@ module ServiceCollectionExtensions =
member services.AddGraphQL<'Root>
(
rootFactory : HttpContext -> 'Root,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointUrl : string,
[<Optional; DefaultParameterValue (GraphQLOptionsDefaults.WebSocketEndpoint)>] webSocketEndpointPath : string,
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, webSocketEndpointUrl, null)
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, webSocketEndpointPath, null)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -300,7 +300,7 @@ module ServiceCollectionExtensions =
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, null, configure)
services.AddGraphQL<'Root, 'Handler> (getExecutorService, rootFactory, additionalConverters, configure = null)

/// <summary>
/// Adds GraphQL options and services to the service collection. It gets the executor from the service provider.
Expand All @@ -320,7 +320,7 @@ module ServiceCollectionExtensions =
[<Optional>] additionalConverters : JsonConverter seq
) =
let getExecutorService (sp : IServiceProvider) = sp.GetRequiredService<Executor<'Root>>()
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, null, configure)
services.AddGraphQL<'Root> (getExecutorService, rootFactory, additionalConverters, configure = configure)


[<AutoOpen; Extension>]
Expand Down
7 changes: 5 additions & 2 deletions src/FSharp.Data.GraphQL.Shared/WebSockets.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace FSharp.Data.GraphQL.Shared.WebSockets
open System
open System.Collections.Generic
open System.Text.Json
open FSharp.Data.GraphQL
open FSharp.Data.GraphQL.Shared

type InvalidWebsocketMessageException (explanation : string) =
Expand All @@ -15,8 +16,10 @@ type SubscriptionsDict = IDictionary<SubscriptionId, SubscriptionUnsubscriber *

type RawMessage = { Id : string voption; Type : string; Payload : JsonDocument voption }

type SubscriptionExecutionResult = { Data : Output voption; Errors : GQLProblemDetails list }

type ServerRawPayload =
| ExecutionResult of Output
| ExecutionResult of SubscriptionExecutionResult
| ErrorMessages of NameValueLookup list
| CustomResponse of JsonDocument

Expand All @@ -35,7 +38,7 @@ type ServerMessage =
| ConnectionAck
| ServerPing
| ServerPong of JsonDocument voption
| Next of id : string * payload : Output
| Next of id : string * payload : SubscriptionExecutionResult
| Error of id : string * err : NameValueLookup list
| Complete of id : string

Expand Down