diff --git a/Dapr.PluggableComponents.AspNetCore/PluggableComponentService.cs b/Dapr.PluggableComponents.AspNetCore/PluggableComponentService.cs index 15822f0..bef40c1 100644 --- a/Dapr.PluggableComponents.AspNetCore/PluggableComponentService.cs +++ b/Dapr.PluggableComponents.AspNetCore/PluggableComponentService.cs @@ -3,72 +3,91 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; -namespace Dapr.PluggableComponents.AspNetCore { +namespace Dapr.PluggableComponents.AspNetCore +{ public class PluggableComponentService { private readonly WebApplication _app; + private readonly WebApplicationBuilder _builder; - public PluggableComponentService(String? socketPath = null, WebApplicationOptions? options = null) + public PluggableComponentService( + String? socketPath = null, + WebApplicationOptions? options = null + ) { - var udsPath = socketPath ?? - Environment.GetEnvironmentVariable(Constants.DaprSocketPathEnvironmentVariable) ?? - "daprcomponent.sock"; - + var udsPath = + socketPath + ?? Environment + .GetEnvironmentVariable(Constants + .DaprSocketPathEnvironmentVariable) + ?? "daprcomponent.sock"; + Console.WriteLine("Starting Dapr pluggable component"); - Console.WriteLine(format: @"Using UNIX socket located at {0}", udsPath); - + Console + .WriteLine(format: @"Using UNIX socket located at {0}", + udsPath); + if (File.Exists(udsPath)) { Console.WriteLine("Removing existing socket"); File.Delete(udsPath); } - - var builder = WebApplication.CreateBuilder(options: options ?? new WebApplicationOptions()); - - builder.WebHost.ConfigureKestrel(options => - { - options.ListenUnixSocket(udsPath); - }); - + + var builder = + WebApplication + .CreateBuilder(options: options + ?? new WebApplicationOptions()); + + builder + .WebHost + .ConfigureKestrel(options => + { + options.ListenUnixSocket(udsPath); + }); + builder.Services.AddGrpc(); + _builder = builder; _app = builder.Build(); } - - public PluggableComponentService WithStateStore() where TService : StateStoreService + + public PluggableComponentService WithStateStore() + where TService : StateStoreService { _app.MapGrpcService(); return this; } - - public PluggableComponentService WithPubSub() where TService : PubSubService + + public PluggableComponentService WithPubSub() + where TService : PubSubService { _app.MapGrpcService(); return this; } - - public PluggableComponentService WithInputBinding() where TService : InputBindingService + + public PluggableComponentService WithInputBinding() + where TService : InputBindingService { _app.MapGrpcService(); return this; } - - public PluggableComponentService WithOutputBinding() where TService : OutputBindingService + + public PluggableComponentService WithOutputBinding() + where TService : OutputBindingService { _app.MapGrpcService(); return this; } - - public PluggableComponentService WithHttpMiddleware() where TService : HttpMiddlewareService + + public PluggableComponentService WithHttpMiddleware() + where TService : HttpMiddlewareService { _app.MapGrpcService(); return this; } - + public void Run(string? url = null) { _app.Run(url); } - - } -} \ No newline at end of file +} diff --git a/Dapr.PluggableComponents.AspNetCore/PluggableComponentServiceBuilder.cs b/Dapr.PluggableComponents.AspNetCore/PluggableComponentServiceBuilder.cs new file mode 100644 index 0000000..329ae13 --- /dev/null +++ b/Dapr.PluggableComponents.AspNetCore/PluggableComponentServiceBuilder.cs @@ -0,0 +1,151 @@ +using Dapr.PluggableComponents.Components; +using Dapr.PluggableComponents.Services; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.PluggableComponents.AspNetCore +{ + public class PluggableComponentServiceBuilder + { + private List> appCallbacks; + private List> builderCallbacks; + + private PluggableComponentServiceBuilder() + { + appCallbacks = new List>(); + builderCallbacks = new List>(); + } + private PluggableComponentServiceBuilder(PluggableComponentServiceBuilder other) : this() + { + this.appCallbacks = new List>(other.appCallbacks); + this.builderCallbacks = new List>(other.builderCallbacks); + } + + public static PluggableComponentServiceBuilder CreateBuilder(string? socketPath = null) + { + var udsPath = + socketPath + ?? Environment + .GetEnvironmentVariable(Constants + .DaprSocketPathEnvironmentVariable) + ?? "daprcomponent.sock"; + + Console.WriteLine("Starting Dapr pluggable component"); + Console + .WriteLine(format: @"Using UNIX socket located at {0}", + udsPath); + + if (File.Exists(udsPath)) + { + Console.WriteLine("Removing existing socket"); + File.Delete(udsPath); + } + return new PluggableComponentServiceBuilder().UseSocket(udsPath).WithBuilderCallback(builder => + { + builder.Services.AddGrpc(); + }); + } + + public PluggableComponentServiceBuilder UseSocket(string socketPath) + { + return this.WithBuilderCallback(builder => + { + builder + .WebHost + .ConfigureKestrel(options => + { + options.ListenUnixSocket(socketPath); + }); + }); + } + + public PluggableComponentServiceBuilder UseStateStore(Func stateStoreFactory) + { + return this.UseServiceFactory(stateStoreFactory); + } + + public PluggableComponentServiceBuilder UseLockStore(Func lockStoreFactory) + { + return this.UseServiceFactory(lockStoreFactory); + } + + public PluggableComponentServiceBuilder UseInputBinding(Func inputBindingFactory) + { + return this.UseServiceFactory(inputBindingFactory); + } + + public PluggableComponentServiceBuilder UseOutputputBinding(Func outputBindingFactory) + { + return this.UseServiceFactory(outputBindingFactory); + } + public PluggableComponentServiceBuilder UseNameResolver(Func nameResolverFactory) + { + return this.UseServiceFactory(nameResolverFactory); + } + public PluggableComponentServiceBuilder UsePubSub(Func pubSubFactory) + { + return this.UseServiceFactory(pubSubFactory); + } + public PluggableComponentServiceBuilder UseHttpMiddleware(Func httpMiddlewareFactory) + { + return this.UseServiceFactory(httpMiddlewareFactory); + } + + public PluggableComponentServiceBuilder UseConfigurationStore(Func configurationStoreFactory) + { + return this.UseServiceFactory(configurationStoreFactory); + } + + public PluggableComponentServiceBuilder UseSecretStore(Func secretStoreFactory) + { + return this.UseServiceFactory(secretStoreFactory); + } + + private PluggableComponentServiceBuilder UseServiceFactory(Func factory) where TImpl : class where TService : class + { + return this.AsScopped(factory).WithAppCallback(WithGrpcService()); + } + + private PluggableComponentServiceBuilder AsScopped(Func factory) where T : class + { + return this.WithBuilderCallback(app => + { + app.Services.AddScoped((_) => factory()); + }); + } + + private PluggableComponentServiceBuilder WithBuilderCallback(Action callback) + { + return new PluggableComponentServiceBuilder(this) + { + builderCallbacks = new List>(this.builderCallbacks) { callback } + }; + } + + private PluggableComponentServiceBuilder WithAppCallback(Action callback) + { + return new PluggableComponentServiceBuilder(this) + { + appCallbacks = new List>(this.appCallbacks) { callback } + }; + } + + private static Action WithGrpcService() where T : class + { + return (app) => + { + app.MapGrpcService(); + }; + } + + public void Run(string? url = null) + { + var builder = WebApplication.CreateBuilder(); + builderCallbacks.ForEach(callback => callback(builder)); + var app = builder.Build(); + appCallbacks.ForEach(callback => callback(app)); + app.Run(url); + } + } +} diff --git a/Dapr.PluggableComponents/Components/IBinding.cs b/Dapr.PluggableComponents/Components/IBinding.cs index b22f0fd..48d90f4 100644 --- a/Dapr.PluggableComponents/Components/IBinding.cs +++ b/Dapr.PluggableComponents/Components/IBinding.cs @@ -1,27 +1,31 @@ -namespace Dapr.PluggableComponents.Components; +namespace Dapr.PluggableComponents.Components; -public struct BindingResult { +public struct BindingResult +{ public string contentType { get; set; } public byte[] data { get; set; } - public Dictionary metadata {get; set; } + public Dictionary metadata { get; set; } } -public interface IInputBinding { +public interface IInputBinding +{ string Name(); void Init(Dictionary properties); - BindingResult Read(); + BindingResult Read(); void Ping(); } -public struct InvokeResult { +public struct InvokeResult +{ public string contentType { get; set; } public byte[] data { get; set; } public Dictionary metadata { get; set; } } -public interface IOutputBinding { +public interface IOutputBinding +{ string Name(); - void Init(Dictionary properties); + void Init(Dictionary properties); InvokeResult Invoke(string operation, byte[] req, Dictionary metadata); - void Ping(); + void Ping(); } diff --git a/Dapr.PluggableComponents/Services/OutputBindingService.cs b/Dapr.PluggableComponents/Services/OutputBindingService.cs index f1d06fe..83b3f68 100644 --- a/Dapr.PluggableComponents/Services/OutputBindingService.cs +++ b/Dapr.PluggableComponents/Services/OutputBindingService.cs @@ -10,33 +10,36 @@ namespace Dapr.PluggableComponents.Services; -public class OutputBindingService : OutputBinding.OutputBindingBase { +public class OutputBindingService : OutputBinding.OutputBindingBase +{ - private readonly ILogger _logger; - private readonly IOutputBinding _backend; + private readonly ILogger _logger; + private readonly IOutputBinding _backend; - public OutputBindingService(ILogger logger, IOutputBinding backend) { - this._logger = logger; + public OutputBindingService(ILogger logger, IOutputBinding backend) + { + this._logger = logger; this._backend = backend; } - public override Task Init(MetadataRequest request, ServerCallContext context) { + public override Task Init(MetadataRequest request, ServerCallContext context) + { this._backend.Init(Utils.ConvertMetadata(request.Properties)); - return Task.FromResult(new Empty()); + return Task.FromResult(new Empty()); } - public override Task Invoke(InvokeRequest request, ServerCallContext context) { + public override Task Invoke(InvokeRequest request, ServerCallContext context) + { _logger.LogDebug("Going to invoke output binding {0}", _backend.Name()); - byte[] data = request.Data.ToArray(); - string operation = request.Operation; + string operation = request.Operation; var metadata = Utils.ConvertMetadata(request.Metadata); var result = _backend.Invoke(operation, data, metadata); - - InvokeResponse resp = new InvokeResponse(); + + InvokeResponse resp = new InvokeResponse(); resp.Data = ByteString.CopyFrom(result.data); - resp.Contenttype = result.contentType; + resp.Contenttype = result.contentType; Utils.MergeDictionaryIntoMetadata(result.metadata, resp.Metadata); return Task.FromResult(resp); diff --git a/Dapr.PluggableComponents/Services/PubSubService.cs b/Dapr.PluggableComponents/Services/PubSubService.cs index 6dad421..55fe4d5 100644 --- a/Dapr.PluggableComponents/Services/PubSubService.cs +++ b/Dapr.PluggableComponents/Services/PubSubService.cs @@ -10,10 +10,10 @@ namespace Dapr.PluggableComponents.Services; public class PubSubService : PubSub.PubSubBase { private readonly ILogger _logger; - private readonly IPubSubComponent _backend; + private readonly IPubSubComponent _backend; public PubSubService(ILogger logger, IPubSubComponent backend) { - this._logger = logger; + this._logger = logger; this._backend = backend; } diff --git a/Dapr.PluggableComponents/Services/StateStoreService.cs b/Dapr.PluggableComponents/Services/StateStoreService.cs index 390b3da..fdc82f4 100644 --- a/Dapr.PluggableComponents/Services/StateStoreService.cs +++ b/Dapr.PluggableComponents/Services/StateStoreService.cs @@ -10,140 +10,132 @@ namespace Dapr.PluggableComponents.Services; public class StateStoreService : StateStore.StateStoreBase { - private readonly ILogger _logger; - private readonly IStateStore _backend; - - public StateStoreService(ILogger logger, IStateStore backend) + private readonly ILogger _logger; + private readonly IStateStore _backend; + + public StateStoreService(ILogger logger, IStateStore backend) + { + this._logger = logger; + this._backend = backend; + } + + public override Task Init(MetadataRequest request, ServerCallContext context) + { + var props = new Dictionary(); + foreach (var k in request.Properties.Keys) { - this._logger = logger; - this._backend = backend; + props[k] = request.Properties[k]; } - - public override Task Init(MetadataRequest request, ServerCallContext context) + _logger.LogInformation("Initializing state store backend"); + _backend.Init(props); + return Task.FromResult(new Empty()); + } + + public override Task Features(Empty request, ServerCallContext context) + { + List unused = _backend.Features(); + var resp = new FeaturesResponse(); + return Task.FromResult(resp); + } + + public override async Task Delete(DeleteRequest request, ServerCallContext context) + { + _logger.LogDebug("Deleting data in store for key {}", request.Key); + await _backend.Delete(request.Key, int.Parse(request.Etag.Value)); + return await base.Delete(request, context); + } + + public override async Task Get(GetRequest request, ServerCallContext context) + { + _logger.LogDebug("Getting data in store for key {}", request.Key); + + var resp = new GetResponse(); + + await _backend.Get(request.Key).ContinueWith(it => { - var props = new Dictionary(); - foreach (var k in request.Properties.Keys) + if (it.Result.HasValue) { - props[k] = request.Properties[k]; + var obj = it.Result; + resp.Data = ByteString.CopyFrom(obj.Value.data); + resp.Etag = new Etag { Value = obj.Value.etag.ToString() }; } - _logger.LogInformation("Initializing state store backend"); - _backend.Init(props); - return Task.FromResult(new Empty()); - } + else + { + resp.Data = ByteString.Empty; + resp.Etag = null; + } + }); - public override Task Features(Empty request, ServerCallContext context) + foreach (var k in request.Metadata.Keys) { - List unused = _backend.Features(); - var resp = new FeaturesResponse(); - return Task.FromResult(resp); + resp.Metadata[k] = request.Metadata[k]; } - public override Task Delete(DeleteRequest request, ServerCallContext context) - { - _logger.LogDebug("Deleting data in store for key {}", request.Key); - _backend.Delete(request.Key, int.Parse(request.Etag.Value)); - return base.Delete(request, context); - } + return resp; + } - public override Task Get(GetRequest request, ServerCallContext context) - { - _logger.LogDebug("Getting data in store for key {}", request.Key); - - var resp = new GetResponse(); - - _backend.Get(request.Key).ContinueWith( it => { - if (it.Result.HasValue) - { - var obj = it.Result; - resp.Data = ByteString.CopyFrom(obj.Value.data); - resp.Etag = new Etag { Value = obj.Value.etag.ToString() }; - } - else - { - resp.Data = ByteString.Empty; - resp.Etag = null; - } - }); - - foreach (var k in request.Metadata.Keys) - { - resp.Metadata[k] = request.Metadata[k]; - } - - return Task.FromResult(resp); - } + public override async Task Set(SetRequest request, ServerCallContext context) + { + _logger.LogDebug("Setting data in store for key {0}", request.Key); - public override Task Set(SetRequest request, ServerCallContext context) - { - _logger.LogDebug("Setting data in store for key {0}", request.Key); - - var obj = new StoreObject { data = request.Value.ToByteArray(), etag = -1 }; + var obj = new StoreObject { data = request.Value.ToByteArray(), etag = -1 }; - _backend.Set(request.Key, obj); - return Task.FromResult(new Empty()); - } + await _backend.Set(request.Key, obj); + return new Empty(); + } - public override Task Ping(Empty request, ServerCallContext context) - { - return Task.FromResult(new Empty()); - } + public override Task Ping(Empty request, ServerCallContext context) + { + return Task.FromResult(new Empty()); + } - public override Task BulkDelete(BulkDeleteRequest request, ServerCallContext context) - { - _logger.LogDebug("Bulk deleting data in store for {} keys", request.Items.Count); + public override async Task BulkDelete(BulkDeleteRequest request, ServerCallContext context) + { + _logger.LogDebug("Bulk deleting data in store for {} keys", request.Items.Count); - foreach (var item in request.Items) - { - _backend.Delete(item.Key, int.Parse(item.Etag.Value)); - } + await Task.WhenAll(request.Items.Select(item => _backend.Delete(item.Key, int.Parse(item.Etag.Value)))); + return new Empty(); + } - return Task.FromResult(new Empty()); - } + public override async Task BulkGet(BulkGetRequest request, ServerCallContext context) + { + _logger.LogDebug("Bulk fetching data in store for {} keys", request.Items.Count); - public override Task BulkGet(BulkGetRequest request, ServerCallContext context) + var response = new BulkGetResponse(); + var responsesTasks = request.Items.Select(async item => { - _logger.LogDebug("Bulk fetching data in store for {} keys", request.Items.Count); - - var response = new BulkGetResponse(); - foreach (var item in request.Items) + var storeObj = await _backend.Get(item.Key); + return storeObj != null ? new BulkStateItem { - _backend.Get(item.Key).ContinueWith( it => { - if (it.Result.HasValue) - { - var obj = it.Result; - response.Items.Add(new BulkStateItem - { - Data = ByteString.CopyFrom(obj.Value.data), - Etag = new Etag { Value = obj.Value.etag.ToString() }, - Key = item.Key, - Error = "none" - }); - } - else - { - response.Items.Add(new BulkStateItem - { - Data = ByteString.Empty, - Etag = new Etag(), - Key = item.Key, - Error = "KeyDoesNotExist" - }); - } - }); - } + Data = ByteString.CopyFrom(storeObj.Value.data), + Etag = new Etag { Value = storeObj.Value.etag.ToString() }, + Key = item.Key, + Error = "none" + } : new BulkStateItem + { + Data = ByteString.Empty, + Etag = new Etag(), + Key = item.Key, + Error = "KeyDoesNotExist" + }; + }); - return Task.FromResult(response); - } + var itemsResponse = await Task.WhenAll(responsesTasks); + response.Items.AddRange(itemsResponse); - public override Task BulkSet(BulkSetRequest request, ServerCallContext context) - { - _logger.LogDebug("Bulk storing data in store for {} keys", request.Items.Count); + return response; + } - foreach (var item in request.Items) - { - Set(item, context); - } + public override async Task BulkSet(BulkSetRequest request, ServerCallContext context) + { + _logger.LogDebug("Bulk storing data in store for {} keys", request.Items.Count); - return Task.FromResult(new Empty()); - } + var setRequests = request.Items.Select(async item => + { + await Set(item, context); + }); + await Task.WhenAll(setRequests); + + return new Empty(); + } } diff --git a/examples/InMemoryComponents/Program.cs b/examples/InMemoryComponents/Program.cs index 2c38edb..aaf9ac9 100644 --- a/examples/InMemoryComponents/Program.cs +++ b/examples/InMemoryComponents/Program.cs @@ -11,14 +11,15 @@ class InMemoryPluggableComponent { static void Main() { - var service = new PluggableComponentService(); - service.WithStateStore(); - service.WithHttpMiddleware(); - service.WithInputBinding(); - service.WithOutputBinding(); - service.WithPubSub(); - - service.Run(); + var builder = PluggableComponentServiceBuilder.CreateBuilder(); + var singletonInMemoryStateStore = new InMemoryStateStore(); + builder + .UseStateStore(() => singletonInMemoryStateStore) + .UseHttpMiddleware(() => new InMemoryHttpMiddleware()) + .UseInputBinding(() => new InMemoryBinding()) + .UseOutputputBinding(() => new InMemoryBinding()) + .UsePubSub(() => new InMemoryPubSubComponent()) + .Run(); } } diff --git a/examples/InMemoryComponents/Properties/launchSettings.json b/examples/InMemoryComponents/Properties/launchSettings.json index cc02b1c..1541d9c 100644 --- a/examples/InMemoryComponents/Properties/launchSettings.json +++ b/examples/InMemoryComponents/Properties/launchSettings.json @@ -7,7 +7,7 @@ "applicationUrl": "http://localhost:5258;https://localhost:7052", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development", - "DAPR_COMPONENT_SOCKET_PATH": "dotnet-inmemory-components.sock" + "DAPR_COMPONENT_SOCKET_PATH": "/tmp/dapr-state.imdotnet-v1-default.sock" } } }