Skip to content

Commit 2198b56

Browse files
committed
feat: moved rpcServerAgent into NodeConnectionManager
The agent level RPC is not fully managed by the Nodes domain. [ci skip]
1 parent b0239ae commit 2198b56

File tree

4 files changed

+64
-57
lines changed

4 files changed

+64
-57
lines changed

src/PolykeyAgent.ts

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import * as utils from './utils';
3636
import * as keysUtils from './keys/utils';
3737
import * as keysEvents from './keys/events';
3838
import * as nodesUtils from './nodes/utils';
39-
import * as nodesEvents from './nodes/events';
4039
import * as workersUtils from './workers/utils';
4140
import TaskManager from './tasks/TaskManager';
4241
import { serverManifest as clientServerManifest } from './client/handlers';
@@ -423,28 +422,6 @@ class PolykeyAgent {
423422
pingTimeoutTimeTime: optionsDefaulted.client.keepAliveTimeoutTime,
424423
logger: logger.getChild('WebSocketServer'),
425424
});
426-
rpcServerAgent = await RPCServer.createRPCServer({
427-
manifest: agentServerManifest({
428-
acl: acl,
429-
db: db,
430-
keyRing: keyRing,
431-
logger: logger,
432-
nodeConnectionManager: nodeConnectionManager,
433-
nodeGraph: nodeGraph,
434-
nodeManager: nodeManager,
435-
notificationsManager: notificationsManager,
436-
sigchain: sigchain,
437-
vaultManager: vaultManager,
438-
}),
439-
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
440-
undefined,
441-
optionsDefaulted.rpc.parserBufferSize,
442-
),
443-
sensitive: true,
444-
handlerTimeoutTime: optionsDefaulted.rpc.callTimeoutTime,
445-
handlerTimeoutGraceTime: optionsDefaulted.rpc.callTimeoutTime + 2000,
446-
logger: logger.getChild(RPCServer.name + 'Agent'),
447-
});
448425
} catch (e) {
449426
logger.warn(`Failed Creating ${this.name}`);
450427
await rpcServerAgent?.destroy({ force: true });
@@ -487,7 +464,6 @@ class PolykeyAgent {
487464
sessionManager,
488465
rpcServerClient,
489466
webSocketServerClient,
490-
rpcServerAgent,
491467
fs,
492468
logger,
493469
});
@@ -534,11 +510,6 @@ class PolykeyAgent {
534510
public readonly rpcServerAgent: RPCServer;
535511
protected workerManager: PolykeyWorkerManagerInterface | undefined;
536512

537-
protected handleEventNodeStream = (e: nodesEvents.EventNodeConnectionStream) => {
538-
const stream = e.detail;
539-
this.rpcServerAgent.handleStream(stream);
540-
};
541-
542513
protected handleEventCertManagerCertChange = async (
543514
evt: keysEvents.EventCertManagerCertChange,
544515
) => {
@@ -580,7 +551,6 @@ class PolykeyAgent {
580551
sessionManager,
581552
rpcServerClient,
582553
webSocketServerClient,
583-
rpcServerAgent,
584554
fs,
585555
logger,
586556
}: {
@@ -604,7 +574,6 @@ class PolykeyAgent {
604574
sessionManager: SessionManager;
605575
rpcServerClient: RPCServer;
606576
webSocketServerClient: WebSocketServer;
607-
rpcServerAgent: RPCServer;
608577
fs: FileSystem;
609578
logger: Logger;
610579
}) {
@@ -629,7 +598,6 @@ class PolykeyAgent {
629598
this.sessionManager = sessionManager;
630599
this.rpcServerClient = rpcServerClient;
631600
this.webSocketServerClient = webSocketServerClient;
632-
this.rpcServerAgent = rpcServerAgent;
633601
this.fs = fs;
634602
}
635603

@@ -713,14 +681,22 @@ class PolykeyAgent {
713681
this.rpcServerClient.handleStream(streamPair),
714682
});
715683
await this.nodeManager.start();
716-
this.nodeConnectionManager.addEventListener(
717-
nodesEvents.EventNodeConnectionStream.name,
718-
this.handleEventNodeStream,
719-
);
720684
await this.nodeConnectionManager.start({
721685
host: optionsDefaulted.agentServiceHost,
722686
port: optionsDefaulted.agentServicePort,
723687
ipv6Only: optionsDefaulted.ipv6Only,
688+
manifest: agentServerManifest({
689+
acl: this.acl,
690+
db: this.db,
691+
keyRing: this.keyRing,
692+
logger: this.logger,
693+
nodeConnectionManager: this.nodeConnectionManager,
694+
nodeGraph: this.nodeGraph,
695+
nodeManager: this.nodeManager,
696+
notificationsManager: this.notificationsManager,
697+
sigchain: this.sigchain,
698+
vaultManager: this.vaultManager,
699+
}),
724700
});
725701
await this.nodeGraph.start({ fresh });
726702
await this.nodeManager.syncNodeGraph(false);
@@ -764,10 +740,6 @@ class PolykeyAgent {
764740
await this.discovery?.stop();
765741
await this.nodeGraph?.stop();
766742
await this.nodeConnectionManager?.stop();
767-
this.nodeConnectionManager.removeEventListener(
768-
nodesEvents.EventNodeConnectionStream.name,
769-
this.handleEventNodeStream,
770-
);
771743
await this.nodeManager?.stop();
772744
await this.webSocketServerClient.stop(true);
773745
await this.identitiesManager?.stop();
@@ -804,10 +776,6 @@ class PolykeyAgent {
804776
await this.vaultManager.stop();
805777
await this.discovery.stop();
806778
await this.nodeConnectionManager.stop();
807-
this.nodeConnectionManager.removeEventListener(
808-
nodesEvents.EventNodeConnectionStream.name,
809-
this.handleEventNodeStream,
810-
);
811779
await this.nodeGraph.stop();
812780
await this.nodeManager.stop();
813781
await this.webSocketServerClient.stop(true);

src/nodes/NodeConnectionManager.ts

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import type {
1414
} from './types';
1515
import type KeyRing from '../keys/KeyRing';
1616
import type { Key, CertificatePEM } from '../keys/types';
17-
import type { ConnectionData, Host, Hostname, Port } from '../network/types';
18-
import type { TLSConfig } from '../network/types';
17+
import type { ConnectionData, Host, Hostname, Port, TLSConfig } from '../network/types';
18+
import type { ServerManifest } from '../rpc/types';
1919
import type { HolePunchRelayMessage } from './agent/types';
2020
import Logger from '@matrixai/logger';
2121
import { withF } from '@matrixai/resources';
@@ -37,6 +37,8 @@ import manifestClientAgent from './agent/callers';
3737
import * as utils from '../utils';
3838
import config from '../config';
3939
import { running, status } from "@matrixai/async-init";
40+
import RPCServer from '../rpc/RPCServer';
41+
import * as rpcUtilsMiddleware from '../rpc/utils/middleware';
4042

4143
type ManifestClientAgent = typeof manifestClientAgent;
4244

@@ -109,6 +111,16 @@ class NodeConnectionManager {
109111
*/
110112
public readonly connectionHolePunchIntervalTime: number;
111113

114+
/**
115+
* Max parse buffer size before RPC parser throws an parse error.
116+
*/
117+
public readonly rpcParserBufferSize: number;
118+
119+
/**
120+
* default timeout for RPC handlers
121+
*/
122+
public readonly rpcCallTimeoutTime: number;
123+
112124
protected logger: Logger;
113125
protected keyRing: KeyRing;
114126
protected nodeGraph: NodeGraph;
@@ -134,6 +146,8 @@ class NodeConnectionManager {
134146

135147
protected connectionLocks: LockBox<Lock> = new LockBox();
136148

149+
protected rpcServer?: RPCServer;
150+
137151
/**
138152
* Dispatches a `EventNodeConnectionManagerClose` in response to any `NodeConnectionManager`
139153
* error event. Will trigger stop of the `NodeConnectionManager` via the
@@ -157,6 +171,11 @@ class NodeConnectionManager {
157171
}
158172
}
159173

174+
protected handleEventNodeConnectionStream = async (e: nodesEvents.EventNodeConnectionStream) => {
175+
const stream = e.detail;
176+
this.rpcServer!.handleStream(stream);
177+
}
178+
160179
/**
161180
* redispatches `QUICSOcket` or `QUICServer` error events as `NodeConnectionManager` error events.
162181
* This should trigger the destruction of the `NodeConnection` through the
@@ -245,6 +264,8 @@ class NodeConnectionManager {
245264
.clientKeepAliveIntervalTime,
246265
connectionHolePunchIntervalTime: config.defaultsSystem
247266
.nodesConnectionHolePunchIntervalTime,
267+
rpcParserBufferSize: config.defaultsSystem.rpcParserBufferSize,
268+
rpcCallTimeoutTime: config.defaultsSystem.rpcCallTimeoutTime,
248269
})
249270
this.logger = logger ?? new Logger(this.constructor.name);
250271
this.keyRing = keyRing;
@@ -261,6 +282,8 @@ class NodeConnectionManager {
261282
this.connectionKeepAliveTimeoutTime = optionsDefaulted.connectionKeepAliveTimeoutTime;
262283
this.connectionKeepAliveIntervalTime = optionsDefaulted.connectionKeepAliveIntervalTime;
263284
this.connectionHolePunchIntervalTime = optionsDefaulted.connectionHolePunchIntervalTime;
285+
this.rpcParserBufferSize = optionsDefaulted.rpcParserBufferSize;
286+
this.rpcCallTimeoutTime = optionsDefaulted.rpcCallTimeoutTime;
264287
// Note that all buffers allocated for crypto operations is using
265288
// `allocUnsafeSlow`. Which ensures that the underlying `ArrayBuffer`
266289
// is not shared. Also, all node buffers satisfy the `ArrayBuffer` interface.
@@ -343,18 +366,34 @@ class NodeConnectionManager {
343366
port = 0 as Port,
344367
reuseAddr = false,
345368
ipv6Only = false,
369+
manifest = {},
346370
}: {
347371
host?: Host;
348372
port?: Port;
349373
reuseAddr?: boolean;
350374
ipv6Only?: boolean;
375+
manifest?: ServerManifest;
351376
}) {
352377
const address = networkUtils.buildAddress(host, port);
353378
this.logger.info(`Start ${this.constructor.name} on ${address}`);
354379

355380
// We should expect that seed nodes are already in the node manager
356381
// It should not be managed here!
357382

383+
// setting up RPCServer
384+
this.rpcServer = await RPCServer.createRPCServer({
385+
manifest,
386+
middlewareFactory: rpcUtilsMiddleware.defaultServerMiddlewareWrapper(
387+
undefined,
388+
this.rpcParserBufferSize,
389+
),
390+
sensitive: true,
391+
handlerTimeoutTime: this.rpcCallTimeoutTime,
392+
handlerTimeoutGraceTime: this.rpcCallTimeoutTime + 2000,
393+
logger: this.logger.getChild(RPCServer.name),
394+
});
395+
396+
// Setting up QUICSocket
358397
await this.quicSocket.start({
359398
host,
360399
port,
@@ -397,7 +436,6 @@ class NodeConnectionManager {
397436
EventAll.name,
398437
this.handleEventAll,
399438
);
400-
401439
this.logger.info(`Started ${this.constructor.name}`);
402440
}
403441

@@ -453,6 +491,7 @@ class NodeConnectionManager {
453491
await Promise.all(destroyProms);
454492
await this.quicServer.stop({ force: true });
455493
await this.quicSocket.stop({ force: true });
494+
await this.rpcServer?.destroy({ force: true, reason: Error('TMP NCM stopping')}); // TODO
456495
this.logger.info(`Stopped ${this.constructor.name}`);
457496
}
458497

@@ -928,6 +967,10 @@ class NodeConnectionManager {
928967
// Check if exists in map, this should never happen but better safe than sorry.
929968
if (this.connections.has(nodeIdString)) utils.never();
930969
// Setting up events
970+
nodeConnection.addEventListener(
971+
nodesEvents.EventNodeConnectionStream.name,
972+
this.handleEventNodeConnectionStream,
973+
)
931974
nodeConnection.addEventListener(
932975
EventAll.name,
933976
this.handleEventAll,
@@ -941,6 +984,10 @@ class NodeConnectionManager {
941984
// Already locked so already destroying
942985
if (this.connectionLocks.isLocked(nodeIdString)) return;
943986
await this.destroyConnection(nodeId);
987+
nodeConnection.removeEventListener(
988+
nodesEvents.EventNodeConnectionStream.name,
989+
this.handleEventNodeConnectionStream,
990+
)
944991
nodeConnection.removeEventListener(
945992
EventAll.name,
946993
this.handleEventAll,

src/nodes/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type NodesOptions = {
3333
connectionKeepAliveTimeoutTime: number;
3434
connectionKeepAliveIntervalTime: number;
3535
connectionHolePunchIntervalTime: number;
36+
rpcParserBufferSize: number;
37+
rpcCallTimeoutTime: number;
3638
};
3739

3840
export type {

tests/nodes/NodeConnectionManager.lifecycle.test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import NodeConnectionManager from '@/nodes/NodeConnectionManager';
1414
import { promise, sleep } from '@/utils';
1515
import * as nodesErrors from '@/nodes/errors';
1616
import NodeConnection from '@/nodes/NodeConnection';
17-
import RPCServer from '@/rpc/RPCServer';
1817
import * as tlsUtils from '../utils/tls';
1918

2019
describe(`${NodeConnectionManager.name} lifecycle test`, () => {
@@ -35,7 +34,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
3534
let serverNodeIdEncoded: NodeIdEncoded;
3635
let keyRingPeer: KeyRing;
3736
let nodeConnectionManagerPeer: NodeConnectionManager;
38-
let rpcServer: RPCServer;
3937
let serverAddress: NodeAddress;
4038

4139
let keyRing: KeyRing;
@@ -76,13 +74,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
7674
await nodeConnectionManagerPeer.start({
7775
host: localHost,
7876
})
79-
rpcServer = await RPCServer.createRPCServer({
80-
handlerTimeoutGraceTime: 1000,
81-
handlerTimeoutTime: 5000,
82-
logger: logger.getChild(`${RPCServer.name}`),
83-
manifest: {}, // TODO: test server manifest
84-
sensitive: false,
85-
});
8677

8778
// Setting up client dependencies
8879
const keysPath = path.join(dataDir, 'keys');
@@ -121,7 +112,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
121112
await keyRing.stop();
122113
await keyRing.destroy();
123114

124-
await rpcServer.destroy({ force: true });
125115
await nodeConnectionManagerPeer.stop();
126116
});
127117

0 commit comments

Comments
 (0)