From dee49d55b4cb2c0ba8cd34052eee72bdf9ec2152 Mon Sep 17 00:00:00 2001 From: Pawel Pabich Date: Mon, 10 Mar 2025 13:53:31 +1000 Subject: [PATCH 1/4] Added identity observer --- .../Transport/Protocol/ProtocolFixture.cs | 2 +- .../Transport/SecureClientFixture.cs | 4 +-- .../Transport/SecureListenerFixture.cs | 3 +- source/Halibut/HalibutRuntime.cs | 10 +++++-- source/Halibut/HalibutRuntimeBuilder.cs | 11 ++++++- .../Observability/IIdentityObserver.cs | 23 +++++++++++++++ .../Observability/NoIdentityObserver.cs | 29 +++++++++++++++++++ .../Protocol/MessageExchangeProtocol.cs | 10 ++++++- source/Halibut/Transport/SecureListener.cs | 5 +++- 9 files changed, 87 insertions(+), 10 deletions(-) create mode 100644 source/Halibut/Transport/Observability/IIdentityObserver.cs create mode 100644 source/Halibut/Transport/Observability/NoIdentityObserver.cs diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index 70b11fdd7..f53ef2cff 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -27,7 +27,7 @@ public void SetUp() stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server)); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits); - protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For()); + protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoIdentityObserver.Instance, Substitute.For()); } // TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index 3a8a83673..866159c7e 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var connection = Substitute.For(); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log)); + connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoIdentityObserver.Instance, log)); await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None); } @@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger) { var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger); + return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoIdentityObserver.Instance, logger); } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Transport/SecureListenerFixture.cs b/source/Halibut.Tests/Transport/SecureListenerFixture.cs index 0914aad3b..2fe5f4038 100644 --- a/source/Halibut.Tests/Transport/SecureListenerFixture.cs +++ b/source/Halibut.Tests/Transport/SecureListenerFixture.cs @@ -73,7 +73,8 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow (_, _) => UnauthorizedClientConnectResponse.BlockConnection, timeoutsAndLimits, new StreamFactory(), - NoOpConnectionsObserver.Instance + NoOpConnectionsObserver.Instance, + NoIdentityObserver.Instance ); var idleAverage = CollectCounterValues(opsPerSec) diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index effb8abc9..7e3f81a9e 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -45,6 +45,7 @@ public class HalibutRuntime : IHalibutRuntime readonly IConnectionsObserver connectionsObserver; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; readonly IControlMessageObserver controlMessageObserver; + readonly IIdentityObserver identityObserver; internal HalibutRuntime( IServiceFactory serviceFactory, @@ -59,7 +60,8 @@ internal HalibutRuntime( IStreamFactory streamFactory, IRpcObserver rpcObserver, IConnectionsObserver connectionsObserver, - IControlMessageObserver controlMessageObserver) + IControlMessageObserver controlMessageObserver, + IIdentityObserver identityObserver) { this.serverCertificate = serverCertificate; this.trustProvider = trustProvider; @@ -74,6 +76,7 @@ internal HalibutRuntime( TimeoutsAndLimits = halibutTimeoutsAndLimits; this.connectionsObserver = connectionsObserver; this.controlMessageObserver = controlMessageObserver; + this.identityObserver = identityObserver; connectionManager = new ConnectionManagerAsync(); this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory); @@ -106,7 +109,7 @@ public int Listen(int port) ExchangeProtocolBuilder ExchangeProtocolBuilder() { - return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log); + return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, identityObserver, log); } public int Listen(IPEndPoint endpoint) @@ -122,7 +125,8 @@ public int Listen(IPEndPoint endpoint) HandleUnauthorizedClientConnect, TimeoutsAndLimits, streamFactory, - connectionsObserver); + connectionsObserver, + identityObserver); listeners.DoWithExclusiveAccess(l => { diff --git a/source/Halibut/HalibutRuntimeBuilder.cs b/source/Halibut/HalibutRuntimeBuilder.cs index 377ec3a2e..233c0b182 100644 --- a/source/Halibut/HalibutRuntimeBuilder.cs +++ b/source/Halibut/HalibutRuntimeBuilder.cs @@ -27,6 +27,7 @@ public class HalibutRuntimeBuilder IRpcObserver? rpcObserver; IConnectionsObserver? connectionsObserver; IControlMessageObserver? controlMessageObserver; + IIdentityObserver? identityObserver; public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver) { @@ -125,6 +126,12 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver) return this; } + public HalibutRuntimeBuilder WithIdentityObserver(IIdentityObserver identityObserver) + { + this.identityObserver = identityObserver; + return this; + } + public HalibutRuntime Build() { var halibutTimeoutsAndLimits = this.halibutTimeoutsAndLimits; @@ -157,6 +164,7 @@ public HalibutRuntime Build() var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance; var rpcObserver = this.rpcObserver ?? new NoRpcObserver(); var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver(); + var identityObserver = this.identityObserver ?? NoIdentityObserver.Instance; var halibutRuntime = new HalibutRuntime( serviceFactory, @@ -171,7 +179,8 @@ public HalibutRuntime Build() streamFactory, rpcObserver, connectionsObserver, - controlMessageObserver); + controlMessageObserver, + identityObserver); if (onUnauthorizedClientConnect is not null) { diff --git a/source/Halibut/Transport/Observability/IIdentityObserver.cs b/source/Halibut/Transport/Observability/IIdentityObserver.cs new file mode 100644 index 000000000..2815b02c0 --- /dev/null +++ b/source/Halibut/Transport/Observability/IIdentityObserver.cs @@ -0,0 +1,23 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Halibut.Transport.Protocol; + +namespace Halibut.Transport.Observability +{ + public interface IIdentityObserver + { + void IdentityEstablished(RemoteIdentity identity); + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Observability/NoIdentityObserver.cs b/source/Halibut/Transport/Observability/NoIdentityObserver.cs new file mode 100644 index 000000000..23a5289be --- /dev/null +++ b/source/Halibut/Transport/Observability/NoIdentityObserver.cs @@ -0,0 +1,29 @@ +// Copyright 2012-2013 Octopus Deploy Pty. Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using Halibut.Transport.Protocol; + +namespace Halibut.Transport.Observability +{ + public class NoIdentityObserver : IIdentityObserver + { + static NoIdentityObserver? singleInstance; + public static NoIdentityObserver Instance => singleInstance ??= new NoIdentityObserver(); + + public void IdentityEstablished(RemoteIdentity identity) + { + } + } +} \ No newline at end of file diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 3f156705d..77a9cb456 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -5,6 +5,7 @@ using Halibut.Diagnostics; using Halibut.Exceptions; using Halibut.ServiceModel; +using Halibut.Transport.Observability; namespace Halibut.Transport.Protocol { @@ -20,15 +21,21 @@ public class MessageExchangeProtocol readonly IMessageExchangeStream stream; readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; + readonly IIdentityObserver identityObserver; readonly ILog log; bool identified; volatile bool acceptClientRequests = true; - public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log) + public MessageExchangeProtocol(IMessageExchangeStream stream, + HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, + IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, + IIdentityObserver identityObserver, + ILog log) { this.stream = stream; this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter; + this.identityObserver = identityObserver; this.log = log; } @@ -105,6 +112,7 @@ async Task ReceiveAndProcessRequestAsSubscriberAsync(IMessageExchangeStream stre public async Task ExchangeAsServerAsync(Func> incomingRequestProcessor, Func pendingRequests, CancellationToken cancellationToken) { var identity = await GetRemoteIdentityAsync(cancellationToken); + identityObserver.IdentityEstablished(identity); //We might need to limit the connection, so by default, we create an unlimited connection lease var limitedConnectionLease = activeTcpConnectionsLimiter.CreateUnlimitedLease(); diff --git a/source/Halibut/Transport/SecureListener.cs b/source/Halibut/Transport/SecureListener.cs index 59f666f84..e448d1493 100644 --- a/source/Halibut/Transport/SecureListener.cs +++ b/source/Halibut/Transport/SecureListener.cs @@ -48,6 +48,7 @@ public class SecureListener : IAsyncDisposable readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IStreamFactory streamFactory; readonly IConnectionsObserver connectionsObserver; + readonly IIdentityObserver identityObserver; ILog log; TcpListener listener; Thread? backgroundThread; @@ -67,7 +68,8 @@ public SecureListener( Func unauthorizedClientConnect, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IStreamFactory streamFactory, - IConnectionsObserver connectionsObserver) + IConnectionsObserver connectionsObserver, + IIdentityObserver identityObserver) { this.endPoint = endPoint; this.serverCertificate = serverCertificate; @@ -81,6 +83,7 @@ public SecureListener( this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.streamFactory = streamFactory; this.connectionsObserver = connectionsObserver; + this.identityObserver = identityObserver; this.cts = new CancellationTokenSource(); this.cancellationToken = cts.Token; From d1e3009106a6cb46647235c0dce35770cfe46acd Mon Sep 17 00:00:00 2001 From: Pawel Pabich Date: Mon, 10 Mar 2025 15:14:51 +1000 Subject: [PATCH 2/4] Added async option --- .../CancelWhenRequestDequeuedPendingRequestQueueFactory.cs | 5 +++++ .../CancelWhenRequestQueuedPendingRequestQueueFactory.cs | 5 +++++ source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs | 6 ++++++ source/Halibut/HalibutRuntime.cs | 2 +- source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs | 2 ++ .../Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs | 6 ++++++ 6 files changed, 25 insertions(+), 1 deletion(-) diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs index 1cb9371d9..524bb5d5a 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs @@ -29,6 +29,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied); } + public Task CreateQueueAsync(Uri endpoint) + { + return Task.FromResult(CreateQueue(endpoint)); + } + class Decorator : IPendingRequestQueue { readonly CancellationTokenSource cancellationTokenSource; diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs index 63e0ca75b..c78e98063 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs @@ -28,6 +28,11 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); } + public Task CreateQueueAsync(Uri endpoint) + { + return Task.FromResult(CreateQueue(endpoint)); + } + class Decorator : IPendingRequestQueue { readonly CancellationTokenSource[] cancellationTokenSources; diff --git a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs index c14066816..7a77d693b 100644 --- a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Halibut.ServiceModel; namespace Halibut.Tests.Util @@ -16,5 +17,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) { return createQueue(endpoint); } + + public Task CreateQueueAsync(Uri endpoint) + { + return Task.FromResult(createQueue(endpoint)); + } } } \ No newline at end of file diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 7e3f81a9e..0048b0ed9 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -90,7 +90,7 @@ internal HalibutRuntime( IPendingRequestQueue GetQueue(Uri target) { - return queues.GetOrAdd(target, u => queueFactory.CreateQueue(target)); + return queues.GetOrAdd(target, u => queueFactory.CreateQueueAsync(target).Result); } public int Listen() diff --git a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs index 8d79150da..5e76de8b4 100644 --- a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs +++ b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs @@ -1,9 +1,11 @@ using System; +using System.Threading.Tasks; namespace Halibut.ServiceModel { public interface IPendingRequestQueueFactory { IPendingRequestQueue CreateQueue(Uri endpoint); + Task CreateQueueAsync(Uri endpoint); } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs index 3143ef372..98c694c32 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using Halibut.Diagnostics; namespace Halibut.ServiceModel @@ -18,5 +19,10 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) { return new PendingRequestQueueAsync(halibutTimeoutsAndLimits, logFactory.ForEndpoint(endpoint)); } + + public Task CreateQueueAsync(Uri endpoint) + { + return Task.FromResult(CreateQueue(endpoint)); + } } } \ No newline at end of file From a12e35b2116d0102d4ac3b2b7c86402246b94949 Mon Sep 17 00:00:00 2001 From: Pawel Pabich Date: Mon, 17 Mar 2025 13:32:44 +1000 Subject: [PATCH 3/4] Observe subscribers --- .../Transport/Protocol/ProtocolFixture.cs | 2 +- .../Transport/SecureClientFixture.cs | 4 ++-- .../Transport/SecureListenerFixture.cs | 2 +- source/Halibut/HalibutRuntime.cs | 10 ++++----- source/Halibut/HalibutRuntimeBuilder.cs | 8 +++---- ...ityObserver.cs => ISubscribersObserver.cs} | 6 ++++-- ...tyObserver.cs => NoSubscribersObserver.cs} | 11 ++++++---- .../Protocol/MessageExchangeProtocol.cs | 21 ++++++++++++------- source/Halibut/Transport/SecureListener.cs | 6 +++--- 9 files changed, 41 insertions(+), 29 deletions(-) rename source/Halibut/Transport/Observability/{IIdentityObserver.cs => ISubscribersObserver.cs} (81%) rename source/Halibut/Transport/Observability/{NoIdentityObserver.cs => NoSubscribersObserver.cs} (67%) diff --git a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs index f53ef2cff..a03e0ef59 100644 --- a/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs +++ b/source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs @@ -27,7 +27,7 @@ public void SetUp() stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server)); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits); - protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoIdentityObserver.Instance, Substitute.For()); + protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoSubscribersObserver.Instance, Substitute.For()); } // TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation diff --git a/source/Halibut.Tests/Transport/SecureClientFixture.cs b/source/Halibut.Tests/Transport/SecureClientFixture.cs index 866159c7e..3ef1af270 100644 --- a/source/Halibut.Tests/Transport/SecureClientFixture.cs +++ b/source/Halibut.Tests/Transport/SecureClientFixture.cs @@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt() var connection = Substitute.For(); var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoIdentityObserver.Instance, log)); + connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoSubscribersObserver.Instance, log)); await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None); } @@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger) { var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build(); var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits); - return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoIdentityObserver.Instance, logger); + return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoSubscribersObserver.Instance, logger); } } } \ No newline at end of file diff --git a/source/Halibut.Tests/Transport/SecureListenerFixture.cs b/source/Halibut.Tests/Transport/SecureListenerFixture.cs index 2fe5f4038..5ff09696b 100644 --- a/source/Halibut.Tests/Transport/SecureListenerFixture.cs +++ b/source/Halibut.Tests/Transport/SecureListenerFixture.cs @@ -74,7 +74,7 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow timeoutsAndLimits, new StreamFactory(), NoOpConnectionsObserver.Instance, - NoIdentityObserver.Instance + NoSubscribersObserver.Instance ); var idleAverage = CollectCounterValues(opsPerSec) diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index 0048b0ed9..fa497f7c4 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -45,7 +45,7 @@ public class HalibutRuntime : IHalibutRuntime readonly IConnectionsObserver connectionsObserver; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; readonly IControlMessageObserver controlMessageObserver; - readonly IIdentityObserver identityObserver; + readonly ISubscribersObserver subscribersObserver; internal HalibutRuntime( IServiceFactory serviceFactory, @@ -61,7 +61,7 @@ internal HalibutRuntime( IRpcObserver rpcObserver, IConnectionsObserver connectionsObserver, IControlMessageObserver controlMessageObserver, - IIdentityObserver identityObserver) + ISubscribersObserver subscribersObserver) { this.serverCertificate = serverCertificate; this.trustProvider = trustProvider; @@ -76,7 +76,7 @@ internal HalibutRuntime( TimeoutsAndLimits = halibutTimeoutsAndLimits; this.connectionsObserver = connectionsObserver; this.controlMessageObserver = controlMessageObserver; - this.identityObserver = identityObserver; + this.subscribersObserver = subscribersObserver; connectionManager = new ConnectionManagerAsync(); this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory); @@ -109,7 +109,7 @@ public int Listen(int port) ExchangeProtocolBuilder ExchangeProtocolBuilder() { - return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, identityObserver, log); + return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, subscribersObserver, log); } public int Listen(IPEndPoint endpoint) @@ -126,7 +126,7 @@ public int Listen(IPEndPoint endpoint) TimeoutsAndLimits, streamFactory, connectionsObserver, - identityObserver); + subscribersObserver); listeners.DoWithExclusiveAccess(l => { diff --git a/source/Halibut/HalibutRuntimeBuilder.cs b/source/Halibut/HalibutRuntimeBuilder.cs index 233c0b182..689bb9e79 100644 --- a/source/Halibut/HalibutRuntimeBuilder.cs +++ b/source/Halibut/HalibutRuntimeBuilder.cs @@ -27,7 +27,7 @@ public class HalibutRuntimeBuilder IRpcObserver? rpcObserver; IConnectionsObserver? connectionsObserver; IControlMessageObserver? controlMessageObserver; - IIdentityObserver? identityObserver; + ISubscribersObserver? identityObserver; public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver) { @@ -126,9 +126,9 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver) return this; } - public HalibutRuntimeBuilder WithIdentityObserver(IIdentityObserver identityObserver) + public HalibutRuntimeBuilder WithIdentityObserver(ISubscribersObserver subscribersObserver) { - this.identityObserver = identityObserver; + this.identityObserver = subscribersObserver; return this; } @@ -164,7 +164,7 @@ public HalibutRuntime Build() var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance; var rpcObserver = this.rpcObserver ?? new NoRpcObserver(); var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver(); - var identityObserver = this.identityObserver ?? NoIdentityObserver.Instance; + var identityObserver = this.identityObserver ?? NoSubscribersObserver.Instance; var halibutRuntime = new HalibutRuntime( serviceFactory, diff --git a/source/Halibut/Transport/Observability/IIdentityObserver.cs b/source/Halibut/Transport/Observability/ISubscribersObserver.cs similarity index 81% rename from source/Halibut/Transport/Observability/IIdentityObserver.cs rename to source/Halibut/Transport/Observability/ISubscribersObserver.cs index 2815b02c0..ffbf8c4d2 100644 --- a/source/Halibut/Transport/Observability/IIdentityObserver.cs +++ b/source/Halibut/Transport/Observability/ISubscribersObserver.cs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System; using Halibut.Transport.Protocol; namespace Halibut.Transport.Observability { - public interface IIdentityObserver + public interface ISubscribersObserver { - void IdentityEstablished(RemoteIdentity identity); + void SubscriberJoined(Uri subscriptionId); + void SubscriberLeft(Uri subscriptionId); } } \ No newline at end of file diff --git a/source/Halibut/Transport/Observability/NoIdentityObserver.cs b/source/Halibut/Transport/Observability/NoSubscribersObserver.cs similarity index 67% rename from source/Halibut/Transport/Observability/NoIdentityObserver.cs rename to source/Halibut/Transport/Observability/NoSubscribersObserver.cs index 23a5289be..e2aaa2265 100644 --- a/source/Halibut/Transport/Observability/NoIdentityObserver.cs +++ b/source/Halibut/Transport/Observability/NoSubscribersObserver.cs @@ -17,12 +17,15 @@ namespace Halibut.Transport.Observability { - public class NoIdentityObserver : IIdentityObserver + public class NoSubscribersObserver : ISubscribersObserver { - static NoIdentityObserver? singleInstance; - public static NoIdentityObserver Instance => singleInstance ??= new NoIdentityObserver(); + static NoSubscribersObserver? singleInstance; + public static NoSubscribersObserver Instance => singleInstance ??= new NoSubscribersObserver(); + public void SubscriberJoined(Uri subscriptionId) + { + } - public void IdentityEstablished(RemoteIdentity identity) + public void SubscriberLeft(Uri subscriptionId) { } } diff --git a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs index 77a9cb456..8379d05f5 100644 --- a/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs +++ b/source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs @@ -21,7 +21,7 @@ public class MessageExchangeProtocol readonly IMessageExchangeStream stream; readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits; readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter; - readonly IIdentityObserver identityObserver; + readonly ISubscribersObserver subscribersObserver; readonly ILog log; bool identified; volatile bool acceptClientRequests = true; @@ -29,13 +29,13 @@ public class MessageExchangeProtocol public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, - IIdentityObserver identityObserver, + ISubscribersObserver subscribersObserver, ILog log) { this.stream = stream; this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits; this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter; - this.identityObserver = identityObserver; + this.subscribersObserver = subscribersObserver; this.log = log; } @@ -112,7 +112,6 @@ async Task ReceiveAndProcessRequestAsSubscriberAsync(IMessageExchangeStream stre public async Task ExchangeAsServerAsync(Func> incomingRequestProcessor, Func pendingRequests, CancellationToken cancellationToken) { var identity = await GetRemoteIdentityAsync(cancellationToken); - identityObserver.IdentityEstablished(identity); //We might need to limit the connection, so by default, we create an unlimited connection lease var limitedConnectionLease = activeTcpConnectionsLimiter.CreateUnlimitedLease(); @@ -133,9 +132,17 @@ public async Task ExchangeAsServerAsync(Func Date: Fri, 16 May 2025 14:07:40 +1000 Subject: [PATCH 4/4] Async tweaks --- .../CancelWhenRequestDequeuedPendingRequestQueueFactory.cs | 2 +- .../CancelWhenRequestQueuedPendingRequestQueueFactory.cs | 2 +- source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs | 3 ++- source/Halibut/DataStream.cs | 2 +- source/Halibut/HalibutRuntime.cs | 2 +- source/Halibut/HalibutRuntimeBuilder.cs | 2 +- source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs | 3 ++- source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs | 3 ++- 8 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs index 524bb5d5a..4043a27ec 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestDequeuedPendingRequestQueueFactory.cs @@ -29,7 +29,7 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSource, shouldCancelOnDequeue, onResponseApplied); } - public Task CreateQueueAsync(Uri endpoint) + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) { return Task.FromResult(CreateQueue(endpoint)); } diff --git a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs index c78e98063..4b03d05d2 100644 --- a/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Support/PendingRequestQueueFactories/CancelWhenRequestQueuedPendingRequestQueueFactory.cs @@ -28,7 +28,7 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new Decorator(inner.CreateQueue(endpoint), cancellationTokenSources); } - public Task CreateQueueAsync(Uri endpoint) + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) { return Task.FromResult(CreateQueue(endpoint)); } diff --git a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs index 7a77d693b..7fb8b44c7 100644 --- a/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs +++ b/source/Halibut.Tests/Util/FuncPendingRequestQueueFactory.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Halibut.ServiceModel; @@ -18,7 +19,7 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return createQueue(endpoint); } - public Task CreateQueueAsync(Uri endpoint) + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) { return Task.FromResult(createQueue(endpoint)); } diff --git a/source/Halibut/DataStream.cs b/source/Halibut/DataStream.cs index 414457fc0..9abf2ee4d 100644 --- a/source/Halibut/DataStream.cs +++ b/source/Halibut/DataStream.cs @@ -10,7 +10,7 @@ namespace Halibut { public class DataStream : IEquatable, IDataStreamInternal { - readonly Func writerAsync; + protected readonly Func writerAsync; IDataStreamReceiver? receiver; [JsonConstructor] diff --git a/source/Halibut/HalibutRuntime.cs b/source/Halibut/HalibutRuntime.cs index fa497f7c4..1e7d5b41d 100644 --- a/source/Halibut/HalibutRuntime.cs +++ b/source/Halibut/HalibutRuntime.cs @@ -90,7 +90,7 @@ internal HalibutRuntime( IPendingRequestQueue GetQueue(Uri target) { - return queues.GetOrAdd(target, u => queueFactory.CreateQueueAsync(target).Result); + return queues.GetOrAdd(target, u => queueFactory.CreateQueue(target)); } public int Listen() diff --git a/source/Halibut/HalibutRuntimeBuilder.cs b/source/Halibut/HalibutRuntimeBuilder.cs index 689bb9e79..c742054ec 100644 --- a/source/Halibut/HalibutRuntimeBuilder.cs +++ b/source/Halibut/HalibutRuntimeBuilder.cs @@ -126,7 +126,7 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver) return this; } - public HalibutRuntimeBuilder WithIdentityObserver(ISubscribersObserver subscribersObserver) + public HalibutRuntimeBuilder WithSubscribersObserver(ISubscribersObserver subscribersObserver) { this.identityObserver = subscribersObserver; return this; diff --git a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs index 5e76de8b4..8874c3355 100644 --- a/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs +++ b/source/Halibut/ServiceModel/IPendingRequestQueueFactory.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace Halibut.ServiceModel @@ -6,6 +7,6 @@ namespace Halibut.ServiceModel public interface IPendingRequestQueueFactory { IPendingRequestQueue CreateQueue(Uri endpoint); - Task CreateQueueAsync(Uri endpoint); + Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs index 98c694c32..c215543a9 100644 --- a/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs +++ b/source/Halibut/ServiceModel/PendingRequestQueueFactoryAsync.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Halibut.Diagnostics; @@ -20,7 +21,7 @@ public IPendingRequestQueue CreateQueue(Uri endpoint) return new PendingRequestQueueAsync(halibutTimeoutsAndLimits, logFactory.ForEndpoint(endpoint)); } - public Task CreateQueueAsync(Uri endpoint) + public Task CreateQueueAsync(Uri endpoint, CancellationToken cancellationToken) { return Task.FromResult(CreateQueue(endpoint)); }