diff --git a/src/session.ts b/src/session.ts index 733ce56..866b0fb 100644 --- a/src/session.ts +++ b/src/session.ts @@ -20,10 +20,10 @@ import {event} from './events' import {NamedCache, NamedCacheClient, NamedMap} from './named-cache-client' import {util} from './util' -import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state"; -import {registerResolver} from "@grpc/grpc-js/build/src/resolver"; -import {Endpoint} from "@grpc/grpc-js/build/src/subchannel-address"; -import * as net from "node:net"; +import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state" +import {registerResolver} from "@grpc/grpc-js/build/src/resolver" +import {Endpoint, TcpSubchannelAddress} from "@grpc/grpc-js/build/src/subchannel-address" +import * as net from "node:net" /** * Supported {@link Session} options. @@ -32,7 +32,7 @@ export class Options { /** * Regular expression for basic validation of IPv4 address. */ - private static readonly ADDRESS_REGEXP = RegExp('^(coherence:\\S+|\\S+):\\d{1,5}$') + private static readonly ADDRESS_REGEXP = RegExp('^(coherence:\\S+:\\d{1,5}(\\/[a-zA-Z0-9]*)?|\\S+:\\d{1,5})$') /** * Address of the target Coherence cluster. If not explicitly set, this defaults to {@link Session.DEFAULT_ADDRESS}. @@ -85,7 +85,7 @@ export class Options { /** * Flag indicating mutations are no longer allowed. */ - private locked: boolean = false; + private locked: boolean = false /** * Optional TLS configuration. @@ -114,7 +114,7 @@ export class Options { */ set address (address: string) { if (this.locked) { - return; + return } // ensure address is sane if (!Options.ADDRESS_REGEXP.test(address)) { @@ -140,7 +140,7 @@ export class Options { */ set requestTimeoutInMillis (timeout: number) { if (this.locked) { - return; + return } if (timeout <= 0) { timeout = Number.POSITIVE_INFINITY @@ -154,7 +154,7 @@ export class Options { * @return the ready timeout in `milliseconds` */ get readyTimeoutInMillis(): number { - return this._readyTimeoutInMillis; + return this._readyTimeoutInMillis } /** @@ -164,12 +164,12 @@ export class Options { */ set readyTimeoutInMillis(timeout: number) { if (this.locked) { - return; + return } if (timeout <= 0) { timeout = Number.POSITIVE_INFINITY } - this._readyTimeoutInMillis = timeout; + this._readyTimeoutInMillis = timeout } /** @@ -198,7 +198,7 @@ export class Options { * Return the `gRPC` `ChannelOptions`. */ get channelOptions(): { [p: string]: any } { - return this._channelOptions; + return this._channelOptions } /** @@ -210,9 +210,9 @@ export class Options { */ set channelOptions(value: { [p: string]: any }) { if (this.locked) { - return; + return } - this._channelOptions = value; + this._channelOptions = value } /** @@ -250,7 +250,7 @@ export class Options { */ set callOptions (callOptions: () => CallOptions) { if (this.locked) { - return; + return } this._callOptions = callOptions } @@ -269,7 +269,7 @@ export class Options { * @hidden */ lock(): void { - this.locked = true; + this.locked = true this.tls.lock() } @@ -357,7 +357,7 @@ export class TlsOptions { */ set enabled (value: boolean) { if (this.locked) { - return; + return } this._enabled = value } @@ -378,7 +378,7 @@ export class TlsOptions { */ set caCertPath (value: PathLike | undefined) { if (this.locked) { - return; + return } this._caCertPath = value } @@ -399,7 +399,7 @@ export class TlsOptions { */ set clientCertPath (value: PathLike | undefined) { if (this.locked) { - return; + return } this._clientCertPath = value } @@ -420,7 +420,7 @@ export class TlsOptions { */ set clientKeyPath (value: PathLike | undefined) { if (this.locked) { - return; + return } this._clientKeyPath = value } @@ -565,15 +565,15 @@ export class Session // emit the `disconnect` event. // When transitioning from any other state, // other than SHUTDOWN, to READY, emit the 'reconnect' event. - let connected: boolean = false; - let firstConnect: boolean = true; - let lastState: number = 0; + let connected: boolean = false + let firstConnect: boolean = true + let lastState: number = 0 let callback = async () => { - let state = channel.getConnectivityState(false); - lastState = state; + let state = channel.getConnectivityState(false) + lastState = state if (state === ConnectivityState.SHUTDOWN) { // nothing to do - return; + return } else if (state === ConnectivityState.READY) { if (!firstConnect && !connected) { this.emit(event.SessionLifecycleEvent.RECONNECTED) @@ -586,7 +586,7 @@ export class Session } else { if (connected) { this.emit(event.SessionLifecycleEvent.DISCONNECTED) - connected = false; + connected = false } } let deadline = Number.POSITIVE_INFINITY @@ -867,8 +867,8 @@ enum ResolveState { class CoherenceResolver implements experimental.Resolver { - private static readonly MULTIPLEXED_SOCKET = Buffer.from([90, 193, 224, 0]); - private static readonly NAME_SERVICE_SUB_PORT = Buffer.from([0, 0, 0, 3]); + private static readonly MULTIPLEXED_SOCKET = Buffer.from([90, 193, 224, 0]) + private static readonly NAME_SERVICE_SUB_PORT = Buffer.from([0, 0, 0, 3]) private static readonly CONNECTION_OPEN = Buffer.from([ 0, 1, 2, 0, 66, 0, 1, 14, 0, 0, 66, 166, 182, 159, 222, 178, 81, 1, 65, 227, 243, 228, 221, 15, 2, 65, 143, 246, 186, 153, 1, 3, @@ -884,35 +884,42 @@ class CoherenceResolver implements experimental.Resolver { 111, 99, 97, 108, 104, 111, 115, 116, 10, 78, 5, 50, 48, 50, 51, 51, 12, 78, 16, 67, 111, 104, 101, 114, 101, 110, 99, 101, 67, 111, 110, 115, 111, 108, 101, 64, 64, - ]); + ]) private static readonly CHANNEL_OPEN = Buffer.from([ 0, 11, 2, 0, 66, 1, 1, 78, 19, 78, 97, 109, 101, 83, 101, 114, 118, 105, 99, 101, 80, 114, 111, 116, 111, 99, 111, 108, 2, 78, 11, 78, 97, 109, 101, 83, 101, 114, 118, 105, 99, 101, 64, - ]); + ]) - private static readonly GRPC_PROXY_LOOKUP: string = 'NameService/string/$GRPC:GrpcProxy'; + private static readonly NAME_SERVICE: string = 'NameService/string' + private static readonly CLUSTER_NS_LOOKUP_PREFIX: string = `${CoherenceResolver.NAME_SERVICE}/Cluster/foreign/` + private static readonly CLUSTER_NS_LOOKUP_SUFFIX: string = '/NameService/localPort' + private static readonly GRPC_PROXY_LOOKUP: string = `${CoherenceResolver.NAME_SERVICE}/$GRPC:GrpcProxy` - private static readonly NS_LOOKUP_REQ_ID = Buffer.from([1, 1, 0, 66, 0, 1, 78]); - private static readonly REQ_END_MARKER = Buffer.from([64]); + private static readonly NS_LOOKUP_REQ_ID = Buffer.from([1, 1, 0, 66, 0, 1, 78]) + private static readonly REQ_END_MARKER = Buffer.from([64]) - private readonly target: experimental.GrpcUri; - private readonly listener: experimental.ResolverListener; - private readonly channelOptions: ChannelOptions; - private channel?: Buffer; + private readonly target: experimental.GrpcUri + private readonly listener: experimental.ResolverListener + private readonly channelOptions: ChannelOptions + private channel?: Buffer private resolve?: any + private reject?: any private lookupResult?: string - private waitResolve: Promise = new Promise((resolve) => { - this.resolve = resolve; - }); + private host: string = "localhost" + private lookupTarget = CoherenceResolver.GRPC_PROXY_LOOKUP + private waitResolve: Promise = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) constructor ( target: experimental.GrpcUri, listener: experimental.ResolverListener, channelOptions: ChannelOptions ) { - this.target = target; + this.target = target this.listener = listener this.channelOptions = channelOptions } @@ -923,25 +930,83 @@ class CoherenceResolver implements experimental.Resolver { } updateResolution(): void { - let socket = this.createSocket(); + let socket = this.createSocket() const [host, port] = this.target.path.split(':') - socket.connect(parseInt(port), host) - this.waitResolve.then(() => this.processLookupResult()) + this.host = host + + // check for foreign cluster reference + let idx: number = this.target.path.indexOf('/') + if (idx !== -1) { + let clusterName: string = this.target.path.substring(idx + 1) + this.lookupTarget = `${CoherenceResolver.CLUSTER_NS_LOOKUP_PREFIX}${clusterName}${CoherenceResolver.CLUSTER_NS_LOOKUP_SUFFIX}` + socket.connect(parseInt(port), host) + this.waitResolve.then(() => { + let ep: Endpoint[] = this.parseLookupResult(true) + if (ep.length > 0) { + let tcpEndPoint = (ep[0].addresses[0] as TcpSubchannelAddress) + this.lookupTarget = CoherenceResolver.GRPC_PROXY_LOOKUP + this.waitResolve = new Promise((resolve) => { + this.resolve = resolve + }) + socket = this.createSocket() + socket.connect(tcpEndPoint.port, tcpEndPoint.host) + this.waitResolve.then(() => { + ep = this.parseLookupResult() + if (ep.length > 0) { + this.listener.onSuccessfulResolution(ep, null, null, null, {}) + } else { + this.listener.onError({ + code: status.UNAVAILABLE, + details: `Unable to resolve an address from ${this.target}`, + metadata: new Metadata(), + }) + } + }) + } else { + this.listener.onError({ + code: status.UNAVAILABLE, + details: `Unable to resolve an address from ${this.target}`, + metadata: new Metadata(), + }) + } + }).catch(reason => { + console.log(`NameService lookup failed: ${reason}`) + this.listener.onError({ + code: status.UNAVAILABLE, + details: reason, + metadata: new Metadata(), + }) + }) + } else { + socket.connect(parseInt(port), host) + this.waitResolve.then(() => { + let ep: Endpoint[] = this.parseLookupResult() + if (ep.length > 0) { + this.listener.onSuccessfulResolution(ep, null, null, null, {}) + } else { + this.listener.onError({ + code: status.UNAVAILABLE, + details: `Unable to resolve an address from ${this.target}`, + metadata: new Metadata(), + }) + } + }) + } } // ----- internal --------------------------------------------------------- private processLookupResult() { if (this.lookupResult) { - const parts: string[] = this.lookupResult.substring(1, this.lookupResult.length - 1).split(', '); + const parts: string[] = this.lookupResult.substring(1, this.lookupResult.length - 1).split(', ') if (parts) { - const iterCnt: number = parts.length / 2; + const iterCnt: number = parts.length / 2 const points: Endpoint[] = Array.from({length: iterCnt}, (_, i) => ({ addresses: [{ port: parseInt(parts[i * 2 + 1]), host: parts[i * 2], }], - })); + })) this.lookupResult = undefined this.listener.onSuccessfulResolution(points, null, null, null, {}) } @@ -954,88 +1019,109 @@ class CoherenceResolver implements experimental.Resolver { } } + private parseLookupResult(portOnly: boolean = false): Endpoint[] { + if (this.lookupResult) { + if (portOnly) { + return [{addresses: [{port: parseInt(this.lookupResult), host: this.host}]}] + } + const parts: string[] = this.lookupResult.substring(1, this.lookupResult.length - 1).split(', ') + if (parts) { + const iterCnt: number = parts.length / 2 + return Array.from({length: iterCnt}, (_, i) => ({ + addresses: [{ + port: parseInt(parts[i * 2 + 1]), + host: parts[i * 2], + }], + })) + } else { + return [] + } + } + return [] + } + private readResponse(socket: net.Socket): Buffer { const length: number = this.readPackedInt(socket) return socket.read(length) } private writePackedInt(n: number): Buffer { - let result: Buffer = Buffer.alloc(0); - let b = 0; + let result: Buffer = Buffer.alloc(0) + let b = 0 if (n < 0) { - b = 0x40; - n = ~n; // bitwise negation + b = 0x40 + n = ~n // bitwise negation } - b |= n & 0x3F; - n >>= 6; + b |= n & 0x3F + n >>= 6 while (n !== 0) { - result = Buffer.concat([result, Buffer.from([b | 0x80])]); - b = n & 0x7F; - n >>= 7; + result = Buffer.concat([result, Buffer.from([b | 0x80])]) + b = n & 0x7F + n >>= 7 } - return Buffer.concat([result, Buffer.from([b])]); + return Buffer.concat([result, Buffer.from([b])]) } private readPackedInt(socket: net.Socket): number { - let bits = 6; - let bytes: Buffer = socket.read(1); - let byte = bytes[0]; + let bits = 6 + let bytes: Buffer = socket.read(1) + let byte = bytes[0] let negative: boolean = (byte & 0x40) !== 0 let result = (byte & 0x3F) while ((byte & 0x80) !== 0) { - bytes = socket.read(1); - byte = bytes[0]; - result |= (byte & 0x7F) << bits; - bits += 7; + bytes = socket.read(1) + byte = bytes[0] + result |= (byte & 0x7F) << bits + bits += 7 } return negative ? ~result : result } private readPackedIntFromBuffer(bytes: Buffer): [number, number] { - let bits = 6; + let bits = 6 if (bytes.length > 7) { - let byte = bytes ? bytes[6] : 0; + let byte = bytes ? bytes[6] : 0 let negative: boolean = (byte & 0x40) !== 0 let result = (byte & 0x3F) while ((byte & 0x80) !== 0) { - byte = bytes ? bytes[7] : 0; - result |= (byte & 0x7F) << bits; - bits += 7; + byte = bytes ? bytes[7] : 0 + result |= (byte & 0x7F) << bits + bits += 7 } - return [negative ? ~result : result, bits]; + return [negative ? ~result : result, bits] } return [0, bits] } private sendConnectionOpen(socket: net.Socket) { - socket.write(CoherenceResolver.MULTIPLEXED_SOCKET); - socket.write(CoherenceResolver.NAME_SERVICE_SUB_PORT); - socket.write(this.writePackedInt(CoherenceResolver.CONNECTION_OPEN.length)); - socket.write(CoherenceResolver.CONNECTION_OPEN); + socket.write(CoherenceResolver.MULTIPLEXED_SOCKET) + socket.write(CoherenceResolver.NAME_SERVICE_SUB_PORT) + socket.write(this.writePackedInt(CoherenceResolver.CONNECTION_OPEN.length)) + socket.write(CoherenceResolver.CONNECTION_OPEN) } private sendChannelOpen(socket: net.Socket) { socket.write(this.writePackedInt(CoherenceResolver.CHANNEL_OPEN.length)) - socket.write(CoherenceResolver.CHANNEL_OPEN); + socket.write(CoherenceResolver.CHANNEL_OPEN) } private sendLookup(socket: net.Socket) { const request = Buffer.concat([ this.channel!, CoherenceResolver.NS_LOOKUP_REQ_ID, - this.writePackedInt(CoherenceResolver.GRPC_PROXY_LOOKUP.length), - Buffer.from(CoherenceResolver.GRPC_PROXY_LOOKUP), + this.writePackedInt(this.lookupTarget.length), + Buffer.from(this.lookupTarget), CoherenceResolver.REQ_END_MARKER, - ]); + ]) socket.write(this.writePackedInt(request.length)) socket.write(request) @@ -1044,26 +1130,30 @@ class CoherenceResolver implements experimental.Resolver { // Read a string from the response private readString(data: Buffer): string { let [len, bits] = this.readPackedIntFromBuffer(data) - return data.subarray(7 + (bits / 7), 7 + (bits / 7) + len).toString(); + return data.subarray(7 + (bits / 7), 7 + (bits / 7) + len).toString() } private createSocket(): net.Socket { - let socket = new net.Socket(); - let state: ResolveState = ResolveState.INITIAL; + let socket = new net.Socket() + let state: ResolveState = ResolveState.INITIAL socket.setNoDelay(true) - socket.setTimeout(10 * 1000); // Set timeout + socket.setTimeout(10 * 1000) // Set timeout socket.on("connect", () => { this.sendConnectionOpen(socket) state = ResolveState.CONNECTION } - ); + ) + + socket.on('error', err => { + this.reject(err) + }) socket.on('readable', () => { switch (state) { case ResolveState.DONE: - return; + return case ResolveState.CONNECTION: { while (socket.read() !== null) {} // consume the open connection response @@ -1072,15 +1162,15 @@ class CoherenceResolver implements experimental.Resolver { this.sendChannelOpen(socket) // transition read state - state = ResolveState.CHANNEL; + state = ResolveState.CHANNEL - break; + break } case ResolveState.CHANNEL: { let msg: Buffer = this.readResponse(socket) - this.channel = msg.subarray(8, 8 + msg.length - 9); + this.channel = msg.subarray(8, 8 + msg.length - 9) if (!this.channel) { // no channel parsed, mark state as done and manifest as error to caller @@ -1090,22 +1180,22 @@ class CoherenceResolver implements experimental.Resolver { this.sendLookup(socket) // transition read state - state = ResolveState.LOOKUP; + state = ResolveState.LOOKUP - break; + break } case ResolveState.LOOKUP: { let msg = this.readResponse(socket) msg = msg.subarray(this.channel!.length + 1) - this.lookupResult = this.readString(msg); + this.lookupResult = this.readString(msg) socket.destroy() this.resolve() // signal caller, resolution completed } } - }); + }) - return socket; + return socket } static getDefaultAuthority(target: experimental.GrpcUri): string { diff --git a/test/session-tests.js b/test/session-tests.js index 70aa529..b967a29 100644 --- a/test/session-tests.js +++ b/test/session-tests.js @@ -75,14 +75,21 @@ describe('Session Tests Suite (unit/IT)', () => { }) it('should have active sessions after getCache() is called', async () => { - const sess = new Session({address: "coherence:127.0.0.1:7574"}) - - sess.getCache('sess-cache') + const sess = new Session({address: "coherence:127.0.0.1:7574/test2"}) + + let c = sess.getCache('sess-cache') + try { + await c.get('a') + } catch (e) { + console.log(`OOPS ${e}`) + await sess.close() + return + } assert.equal(sess.activeCacheCount, 1) assert.equal(sess.activeCaches[0].name, 'sess-cache') await sess.close() - }) + }).timeout(50000000) it('should return the same cache instance upon multiple getCache() invocations for the same cache', async () => { const sess = new Session()