Skip to content

Commit 755c1c2

Browse files
committed
Added identity observer
1 parent 8213168 commit 755c1c2

File tree

9 files changed

+87
-10
lines changed

9 files changed

+87
-10
lines changed

source/Halibut.Tests/Transport/Protocol/ProtocolFixture.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void SetUp()
2727
stream.SetRemoteIdentity(new RemoteIdentity(RemoteIdentityType.Server));
2828
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
2929
var activeConnectionsLimiter = new ActiveTcpConnectionsLimiter(limits);
30-
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, Substitute.For<ILog>());
30+
protocol = new MessageExchangeProtocol(stream, new HalibutTimeoutsAndLimitsForTestsBuilder().Build(), activeConnectionsLimiter, NoIdentityObserver.Instance, Substitute.For<ILog>());
3131
}
3232

3333
// TODO - ASYNC ME UP! ExchangeAsClientAsync cancellation

source/Halibut.Tests/Transport/SecureClientFixture.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task SecureClientClearsPoolWhenAllConnectionsCorrupt()
6767
var connection = Substitute.For<IConnection>();
6868
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
6969
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
70-
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, log));
70+
connection.Protocol.Returns(new MessageExchangeProtocol(stream, limits, activeConnectionLimiter, NoIdentityObserver.Instance, log));
7171

7272
await connectionManager.ReleaseConnectionAsync(endpoint, connection, CancellationToken.None);
7373
}
@@ -96,7 +96,7 @@ static MessageExchangeProtocol GetProtocol(Stream stream, ILog logger)
9696
{
9797
var limits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();
9898
var activeConnectionLimiter = new ActiveTcpConnectionsLimiter(limits);
99-
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, logger);
99+
return new MessageExchangeProtocol(new MessageExchangeStream(stream, new MessageSerializerBuilder(new LogFactory()).Build(), new NoOpControlMessageObserver(), limits, logger), limits, activeConnectionLimiter, NoIdentityObserver.Instance, logger);
100100
}
101101
}
102102
}

source/Halibut.Tests/Transport/SecureListenerFixture.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public async Task SecureListenerDoesNotCreateHundredsOfIoEventsPerSecondOnWindow
7373
(_, _) => UnauthorizedClientConnectResponse.BlockConnection,
7474
timeoutsAndLimits,
7575
new StreamFactory(),
76-
NoOpConnectionsObserver.Instance
76+
NoOpConnectionsObserver.Instance,
77+
NoIdentityObserver.Instance
7778
);
7879

7980
var idleAverage = CollectCounterValues(opsPerSec)

source/Halibut/HalibutRuntime.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class HalibutRuntime : IHalibutRuntime
4545
readonly IConnectionsObserver connectionsObserver;
4646
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
4747
readonly IControlMessageObserver controlMessageObserver;
48+
readonly IIdentityObserver identityObserver;
4849

4950
internal HalibutRuntime(
5051
IServiceFactory serviceFactory,
@@ -59,7 +60,8 @@ internal HalibutRuntime(
5960
IStreamFactory streamFactory,
6061
IRpcObserver rpcObserver,
6162
IConnectionsObserver connectionsObserver,
62-
IControlMessageObserver controlMessageObserver)
63+
IControlMessageObserver controlMessageObserver,
64+
IIdentityObserver identityObserver)
6365
{
6466
this.serverCertificate = serverCertificate;
6567
this.trustProvider = trustProvider;
@@ -74,6 +76,7 @@ internal HalibutRuntime(
7476
TimeoutsAndLimits = halibutTimeoutsAndLimits;
7577
this.connectionsObserver = connectionsObserver;
7678
this.controlMessageObserver = controlMessageObserver;
79+
this.identityObserver = identityObserver;
7780

7881
connectionManager = new ConnectionManagerAsync();
7982
this.tcpConnectionFactory = new TcpConnectionFactory(serverCertificate, TimeoutsAndLimits, streamFactory);
@@ -106,7 +109,7 @@ public int Listen(int port)
106109

107110
ExchangeProtocolBuilder ExchangeProtocolBuilder()
108111
{
109-
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, log);
112+
return (stream, log) => new MessageExchangeProtocol(new MessageExchangeStream(stream, messageSerializer, controlMessageObserver, TimeoutsAndLimits, log), TimeoutsAndLimits, activeTcpConnectionsLimiter, identityObserver, log);
110113
}
111114

112115
public int Listen(IPEndPoint endpoint)
@@ -122,7 +125,8 @@ public int Listen(IPEndPoint endpoint)
122125
HandleUnauthorizedClientConnect,
123126
TimeoutsAndLimits,
124127
streamFactory,
125-
connectionsObserver);
128+
connectionsObserver,
129+
identityObserver);
126130

127131
listeners.DoWithExclusiveAccess(l =>
128132
{

source/Halibut/HalibutRuntimeBuilder.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class HalibutRuntimeBuilder
2727
IRpcObserver? rpcObserver;
2828
IConnectionsObserver? connectionsObserver;
2929
IControlMessageObserver? controlMessageObserver;
30+
IIdentityObserver? identityObserver;
3031

3132
public HalibutRuntimeBuilder WithConnectionsObserver(IConnectionsObserver connectionsObserver)
3233
{
@@ -125,6 +126,12 @@ public HalibutRuntimeBuilder WithRpcObserver(IRpcObserver rpcObserver)
125126
return this;
126127
}
127128

129+
public HalibutRuntimeBuilder WithIdentityObserver(IIdentityObserver identityObserver)
130+
{
131+
this.identityObserver = identityObserver;
132+
return this;
133+
}
134+
128135
public HalibutRuntime Build()
129136
{
130137
var halibutTimeoutsAndLimits = this.halibutTimeoutsAndLimits;
@@ -157,6 +164,7 @@ public HalibutRuntime Build()
157164
var connectionsObserver = this.connectionsObserver ?? NoOpConnectionsObserver.Instance;
158165
var rpcObserver = this.rpcObserver ?? new NoRpcObserver();
159166
var controlMessageObserver = this.controlMessageObserver ?? new NoOpControlMessageObserver();
167+
var identityObserver = this.identityObserver ?? NoIdentityObserver.Instance;
160168

161169
var halibutRuntime = new HalibutRuntime(
162170
serviceFactory,
@@ -171,7 +179,8 @@ public HalibutRuntime Build()
171179
streamFactory,
172180
rpcObserver,
173181
connectionsObserver,
174-
controlMessageObserver);
182+
controlMessageObserver,
183+
identityObserver);
175184

176185
if (onUnauthorizedClientConnect is not null)
177186
{
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Halibut.Transport.Protocol;
16+
17+
namespace Halibut.Transport.Observability
18+
{
19+
public interface IIdentityObserver
20+
{
21+
void IdentityEstablished(RemoteIdentity identity);
22+
}
23+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2012-2013 Octopus Deploy Pty. Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using System;
16+
using Halibut.Transport.Protocol;
17+
18+
namespace Halibut.Transport.Observability
19+
{
20+
public class NoIdentityObserver : IIdentityObserver
21+
{
22+
static NoIdentityObserver? singleInstance;
23+
public static NoIdentityObserver Instance => singleInstance ??= new NoIdentityObserver();
24+
25+
public void IdentityEstablished(RemoteIdentity identity)
26+
{
27+
}
28+
}
29+
}

source/Halibut/Transport/Protocol/MessageExchangeProtocol.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Halibut.Diagnostics;
66
using Halibut.Exceptions;
77
using Halibut.ServiceModel;
8+
using Halibut.Transport.Observability;
89

910
namespace Halibut.Transport.Protocol
1011
{
@@ -20,15 +21,21 @@ public class MessageExchangeProtocol
2021
readonly IMessageExchangeStream stream;
2122
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
2223
readonly IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter;
24+
readonly IIdentityObserver identityObserver;
2325
readonly ILog log;
2426
bool identified;
2527
volatile bool acceptClientRequests = true;
2628

27-
public MessageExchangeProtocol(IMessageExchangeStream stream, HalibutTimeoutsAndLimits halibutTimeoutsAndLimits, IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter, ILog log)
29+
public MessageExchangeProtocol(IMessageExchangeStream stream,
30+
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
31+
IActiveTcpConnectionsLimiter activeTcpConnectionsLimiter,
32+
IIdentityObserver identityObserver,
33+
ILog log)
2834
{
2935
this.stream = stream;
3036
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
3137
this.activeTcpConnectionsLimiter = activeTcpConnectionsLimiter;
38+
this.identityObserver = identityObserver;
3239
this.log = log;
3340
}
3441

@@ -105,6 +112,7 @@ async Task ReceiveAndProcessRequestAsSubscriberAsync(IMessageExchangeStream stre
105112
public async Task ExchangeAsServerAsync(Func<RequestMessage, Task<ResponseMessage>> incomingRequestProcessor, Func<RemoteIdentity, IPendingRequestQueue> pendingRequests, CancellationToken cancellationToken)
106113
{
107114
var identity = await GetRemoteIdentityAsync(cancellationToken);
115+
identityObserver.IdentityEstablished(identity);
108116

109117
//We might need to limit the connection, so by default, we create an unlimited connection lease
110118
var limitedConnectionLease = activeTcpConnectionsLimiter.CreateUnlimitedLease();

source/Halibut/Transport/SecureListener.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class SecureListener : IAsyncDisposable
4848
readonly HalibutTimeoutsAndLimits halibutTimeoutsAndLimits;
4949
readonly IStreamFactory streamFactory;
5050
readonly IConnectionsObserver connectionsObserver;
51+
readonly IIdentityObserver identityObserver;
5152
ILog log;
5253
TcpListener listener;
5354
Thread? backgroundThread;
@@ -67,7 +68,8 @@ public SecureListener(
6768
Func<string, string, UnauthorizedClientConnectResponse> unauthorizedClientConnect,
6869
HalibutTimeoutsAndLimits halibutTimeoutsAndLimits,
6970
IStreamFactory streamFactory,
70-
IConnectionsObserver connectionsObserver)
71+
IConnectionsObserver connectionsObserver,
72+
IIdentityObserver identityObserver)
7173
{
7274
this.endPoint = endPoint;
7375
this.serverCertificate = serverCertificate;
@@ -81,6 +83,7 @@ public SecureListener(
8183
this.halibutTimeoutsAndLimits = halibutTimeoutsAndLimits;
8284
this.streamFactory = streamFactory;
8385
this.connectionsObserver = connectionsObserver;
86+
this.identityObserver = identityObserver;
8487
this.cts = new CancellationTokenSource();
8588
this.cancellationToken = cts.Token;
8689

0 commit comments

Comments
 (0)