From 67461f30294e36c9beeb114857acdc4e5f8a1af8 Mon Sep 17 00:00:00 2001 From: Ryan Lubke Date: Mon, 3 Feb 2025 16:45:38 -0800 Subject: [PATCH] Checkpoint. --- .github/workflows/{node.js.yml => build.yml} | 4 +- .github/workflows/discovery.yml | 68 +++++++ package.json | 1 + src/session.ts | 182 ++++++++++--------- test/{ => discovery}/resolver-tests.js | 42 +++-- 5 files changed, 190 insertions(+), 107 deletions(-) rename .github/workflows/{node.js.yml => build.yml} (95%) create mode 100644 .github/workflows/discovery.yml rename test/{ => discovery}/resolver-tests.js (62%) diff --git a/.github/workflows/node.js.yml b/.github/workflows/build.yml similarity index 95% rename from .github/workflows/node.js.yml rename to .github/workflows/build.yml index af28ca0..927523a 100644 --- a/.github/workflows/node.js.yml +++ b/.github/workflows/build.yml @@ -28,7 +28,7 @@ jobs: fail-fast: false matrix: node-version: [18.x, 19.x, 20.x, 21.x, 22.x, 23.x] - coherence-version: [22.06.11, 24.09.2] + coherence-version: [22.06.11, 14.1.2-0-1, 24.09.2] steps: - uses: actions/checkout@v4 @@ -48,7 +48,7 @@ jobs: - run: COHERENCE_VERSION=${{ matrix.coherence-version }} npm run test-cycle-tls # clean up - name: Archive production artifacts - if: always() + if: failure() uses: actions/upload-artifact@v4 with: name: save-log-file-${{ matrix.node-version }}-${{ matrix.coherence-version }} diff --git a/.github/workflows/discovery.yml b/.github/workflows/discovery.yml new file mode 100644 index 0000000..2651c82 --- /dev/null +++ b/.github/workflows/discovery.yml @@ -0,0 +1,68 @@ +# Copyright 2020, 2023, Oracle Corporation and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at +# https://oss.oracle.com/licenses/upl. + +# --------------------------------------------------------------------------- +# Coherence JavaScript Client GitHub Actions CI build. +# --------------------------------------------------------------------------- + +name: Node.js CI + +on: + schedule: + - cron: "0 5 * * *" + push: + branches-ignore: + - ghpages + pull_request: + types: + - opened + branches: + - 'main' +jobs: + build: + + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + node-version: [18.x, 19.x, 20.x, 21.x, 22.x, 23.x] + coherence-version: [22.06.11, 14.1.2-0-1, 24.09.2] + + steps: + - uses: actions/checkout@v4 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v4 + with: + node-version: ${{ matrix.node-version }} + - run: curl -LO "https://github.com/protocolbuffers/protobuf/releases/download/v22.2/protoc-22.2-linux-x86_64.zip" + - run: unzip protoc-22.2-linux-x86_64.zip -d /tmp/grpc + - run: echo "/tmp/grpc/bin" >> $GITHUB_PATH + - run: npm install + - run: npm run compile + + - name: Run Coherence Server + shell: bash + run: | + export COHERENCE_VERSION=${{ matrix.coherenceVersion }} + curl -sL https://raw.githubusercontent.com/oracle/coherence-cli/main/scripts/install.sh | bash + cohctl version + cohctl set profile grpc-cluster1 -v "-Dcoherence.grpc.server.port=10000" + cohctl create cluster grpc-cluster1 -P grpc-cluster1 -r 1 -v ${{ matrix.coherenceVersion }} -y -a coherence-grpc-proxy + cohctl set profile grpc-cluster2 -v "-Dcoherence.grpc.server.port=10001" + cohctl create cluster grpc-cluster2 -P grpc-cluster2 -r 1 -H 30001 -v ${{ matrix.coherenceVersion }} -y -a coherence-grpc-proxy + sleep 20 + cohctl monitor health -n localhost:7574 -T 40 -w + + - name: Run resolver tests + shell: bash + run: | + npm run test-resolver + + - name: Archive production artifacts + if: failure() + uses: actions/upload-artifact@v4 + with: + name: save-log-file-${{ matrix.node-version }}-${{ matrix.coherence-version }} + path: ~/.cohctl/logs/*.*.log diff --git a/package.json b/package.json index d4470a6..379749c 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "test": "npm run compile && npm exec mocha 'test/**.js' --recursive --exit", "test-cycle": "bin/test-cycle.sh -c", "test-cycle-tls": "bin/test-cycle.sh -s", + "test-resolver": "npm run compile && npm exec mocha test/discovery/resolver-tests.js --recursive --exit", "coh-up": "bin/docker-utils.sh -u", "coh-down": "bin/docker-utils.sh -d", "coverage": "nyc mocha 'test/**.js' --exit", diff --git a/src/session.ts b/src/session.ts index d8410c8..3aee15e 100644 --- a/src/session.ts +++ b/src/session.ts @@ -22,7 +22,7 @@ import {NamedCache, NamedCacheClient, NamedMap} from './named-cache-client' import {util} from './util' import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state" import {registerResolver} from "@grpc/grpc-js/build/src/resolver" -import {Endpoint, TcpSubchannelAddress} from "@grpc/grpc-js/build/src/subchannel-address" +import {Endpoint} from "@grpc/grpc-js/build/src/subchannel-address" import * as net from "node:net" /** @@ -865,6 +865,24 @@ enum ResolveState { LOOKUP, } +class LookupState { + state: ResolveState = ResolveState.INITIAL + query: string + result?: string + channel?: Buffer + resolve?: (value: (PromiseLike | void)) => void; + reject?: (reason?: any) => void + waitResolve: Promise + + constructor(query: string) { + this.query = query + this.waitResolve = new Promise((resolve, reject) => { + this.resolve = resolve + this.reject = reject + }) + } +} + export class CoherenceResolver implements experimental.Resolver { private static readonly MULTIPLEXED_SOCKET = Buffer.from([90, 193, 224, 0]) @@ -900,15 +918,7 @@ export class CoherenceResolver implements experimental.Resolver { private readonly target: experimental.GrpcUri private readonly listener: experimental.ResolverListener - private channel?: Buffer - private resolve?: any - private reject?: any - private lookupResult?: string - private lookupTarget = CoherenceResolver.GRPC_PROXY_LOOKUP - private waitResolve: Promise = new Promise((resolve, reject) => { - this.resolve = resolve - this.reject = reject - }) + private lastResolve?: Endpoint[] constructor ( target: experimental.GrpcUri, @@ -925,23 +935,17 @@ export class CoherenceResolver implements experimental.Resolver { } updateResolution(): void { - let [host, port, clusterName] = this.parseConn() + let [host, port, clusterName] = CoherenceResolver.parseConn(this.target.path) let socket // check for foreign cluster reference if (clusterName.length > 0) { - this.lookupTarget = `${CoherenceResolver.CLUSTER_NS_LOOKUP_PREFIX}${clusterName}${CoherenceResolver.CLUSTER_NS_LOOKUP_SUFFIX}` - socket = this.createSocket() + let clusterNameState: LookupState = this.createForeignLookupState(clusterName) + socket = this.createSocket(clusterNameState) socket.connect(parseInt(port), host) - this.waitResolve.then(() => { - if (this.lookupResult) { - port = this.lookupResult - this.lookupResult = undefined - this.lookupTarget = CoherenceResolver.GRPC_PROXY_LOOKUP - this.waitResolve = new Promise((resolve, reject) => { - this.resolve = resolve - this.reject = reject - }) - this.runLookup(host, parseInt(port)) + clusterNameState.waitResolve.then(() => { + if (clusterNameState.result) { + port = clusterNameState.result + this.runLookup(this.createGrpcLookupState(), host, parseInt(port)) } else { this.listener.onError({ code: status.UNAVAILABLE, @@ -958,18 +962,27 @@ export class CoherenceResolver implements experimental.Resolver { }) }) } else { - this.runLookup(host, parseInt(port)) + this.runLookup(this.createGrpcLookupState(), host, parseInt(port)) } } static getDefaultAuthority(target: experimental.GrpcUri): string { - return target.path + let [host] = this.parseConn(target.path) + return host } // ----- internal --------------------------------------------------------- - parseConn(): [string, string, string] { - let parts: string[] = this.target.path.split(':') + createForeignLookupState(clusterName: string): LookupState { + return new LookupState(`${CoherenceResolver.CLUSTER_NS_LOOKUP_PREFIX}${clusterName}${CoherenceResolver.CLUSTER_NS_LOOKUP_SUFFIX}`) + } + + createGrpcLookupState(): LookupState { + return new LookupState(CoherenceResolver.GRPC_PROXY_LOOKUP) + } + + static parseConn(path: string): [string, string, string] { + let parts: string[] = path.split(':') const host = parts[0] let port = '7574' let clusterName = '' @@ -986,12 +999,14 @@ export class CoherenceResolver implements experimental.Resolver { return [host, port, clusterName] } - private runLookup(host: string, port: number): void { - let socket: net.Socket = this.createSocket() + private runLookup(state: LookupState, host: string, port: number): void { + let socket: net.Socket = this.createSocket(state) socket.connect(port, host) - this.waitResolve.then(() => { - let ep: Endpoint[] = this.parseLookupResult() + state.waitResolve.then(() => { + setTimeout(() => { this.lastResolve = undefined }, 5000) + let ep: Endpoint[] = this.parseLookupResult(state) if (ep.length > 0) { + this.lastResolve = ep this.listener.onSuccessfulResolution(ep, null, null, null, {}) } else { this.listener.onError({ @@ -1003,34 +1018,30 @@ export class CoherenceResolver implements experimental.Resolver { }) } - private parseLookupResult(): Endpoint[] { - try { - if (this.lookupResult) { - const parts: string[] = this.lookupResult.substring(1, this.lookupResult.length - 1).split(', ') - if (parts) { - const iterCnt: number = parts.length / 2 - return Array.from({length: iterCnt}, (_, i) => ({ - addresses: [{ - port: parseInt(parts[i * 2 + 1]), - host: parts[i * 2], - }], - })) - } else { - return [] - } + private parseLookupResult(state: LookupState): Endpoint[] { + if (state.result) { + const parts: string[] = state.result.substring(1, state.result.length - 1).split(', ') + if (parts) { + const iterCnt: number = parts.length / 2 + return Array.from({length: iterCnt}, (_, i) => ({ + addresses: [{ + port: parseInt(parts[i * 2 + 1]), + host: parts[i * 2], + }], + })) + } else { + return [] } - return [] - } finally { - this.lookupResult = undefined } + return [] } - private readResponse(socket: net.Socket): Buffer { - const length: number = this.readPackedInt(socket) + private static readResponse(socket: net.Socket): Buffer { + const length: number = CoherenceResolver.readPackedInt(socket) return socket.read(length) } - private writePackedInt(n: number): Buffer { + private static writePackedInt(n: number): Buffer { let result: Buffer = Buffer.alloc(0) let b = 0 @@ -1051,7 +1062,7 @@ export class CoherenceResolver implements experimental.Resolver { return Buffer.concat([result, Buffer.from([b])]) } - private readPackedInt(socket: net.Socket): number { + private static readPackedInt(socket: net.Socket): number { let bits = 6 let bytes: Buffer = socket.read(1) let byte = bytes[0] @@ -1068,7 +1079,7 @@ export class CoherenceResolver implements experimental.Resolver { return negative ? ~result : result } - private readPackedIntFromBuffer(bytes: Buffer): [number, number] { + private static readPackedIntFromBuffer(bytes: Buffer): [number, number] { let bits = 6 if (bytes.length > 7) { @@ -1087,56 +1098,55 @@ export class CoherenceResolver implements experimental.Resolver { return [0, bits] } - private sendConnectionOpen(socket: net.Socket) { + private static sendConnectionOpen(socket: net.Socket) { socket.write(CoherenceResolver.MULTIPLEXED_SOCKET) socket.write(CoherenceResolver.NAME_SERVICE_SUB_PORT) - socket.write(this.writePackedInt(CoherenceResolver.CONNECTION_OPEN.length)) + socket.write(CoherenceResolver.writePackedInt(CoherenceResolver.CONNECTION_OPEN.length)) socket.write(CoherenceResolver.CONNECTION_OPEN) } - private sendChannelOpen(socket: net.Socket) { - socket.write(this.writePackedInt(CoherenceResolver.CHANNEL_OPEN.length)) + private static sendChannelOpen(socket: net.Socket) { + socket.write(CoherenceResolver.writePackedInt(CoherenceResolver.CHANNEL_OPEN.length)) socket.write(CoherenceResolver.CHANNEL_OPEN) } - private sendLookup(socket: net.Socket) { + private static sendLookup(socket: net.Socket, channel: Buffer, query: string) { const request = Buffer.concat([ - this.channel!, + channel, CoherenceResolver.NS_LOOKUP_REQ_ID, - this.writePackedInt(this.lookupTarget.length), - Buffer.from(this.lookupTarget), + CoherenceResolver.writePackedInt(query.length), + Buffer.from(query), CoherenceResolver.REQ_END_MARKER, ]) - socket.write(this.writePackedInt(request.length)) + socket.write(CoherenceResolver.writePackedInt(request.length)) socket.write(request) } // Read a string from the response - private readString(data: Buffer): string { - let [len, bits] = this.readPackedIntFromBuffer(data) + private static readString(data: Buffer): string { + let [len, bits] = CoherenceResolver.readPackedIntFromBuffer(data) return data.subarray(7 + (bits / 7), 7 + (bits / 7) + len).toString() } - private createSocket(): net.Socket { + private createSocket(state: LookupState): net.Socket { let socket = new net.Socket() - let state: ResolveState = ResolveState.INITIAL socket.setNoDelay(true) socket.setTimeout(10 * 1000) // Set timeout socket.on("connect", () => { - this.sendConnectionOpen(socket) - state = ResolveState.CONNECTION + CoherenceResolver.sendConnectionOpen(socket) + state.state = ResolveState.CONNECTION } ) socket.on('error', err => { - this.reject(err) + state.reject!(err) }) socket.on('readable', () => { - switch (state) { + switch (state.state) { case ResolveState.DONE: return @@ -1144,41 +1154,41 @@ export class CoherenceResolver implements experimental.Resolver { while (socket.read() !== null) {} // consume the open connection response // open the remote channel - this.sendChannelOpen(socket) + CoherenceResolver.sendChannelOpen(socket) // transition read state - state = ResolveState.CHANNEL + state.state = ResolveState.CHANNEL break } case ResolveState.CHANNEL: { - let msg: Buffer = this.readResponse(socket) + let msg: Buffer = CoherenceResolver.readResponse(socket) - this.channel = msg.subarray(8, 8 + msg.length - 9) + state.channel = msg.subarray(8, 8 + msg.length - 9) - if (!this.channel) { - // no channel parsed, mark state as done and manifest as error to caller - state = ResolveState.DONE + if (!state.channel) { + state.state = ResolveState.DONE + state.reject!("Unable to parse channel from response") } - this.sendLookup(socket) + CoherenceResolver.sendLookup(socket, state.channel, state.query) // transition read state - state = ResolveState.LOOKUP + state.state = ResolveState.LOOKUP break } case ResolveState.LOOKUP: { - let msg = this.readResponse(socket) - msg = msg.subarray(this.channel!.length + 1) - this.lookupResult = this.readString(msg) + let msg = CoherenceResolver.readResponse(socket) + msg = msg.subarray(state.channel!.length + 1) + state.result = CoherenceResolver.readString(msg) socket.destroy() - if (this.lookupResult.length > 0) { - this.resolve() // signal caller, resolution completed + if (state.result.length > 0) { + state.resolve!() // signal caller, resolution completed } else { - this.reject("Failure to parse lookup response") + state.reject!("Failure to parse lookup response") } } } diff --git a/test/resolver-tests.js b/test/discovery/resolver-tests.js similarity index 62% rename from test/resolver-tests.js rename to test/discovery/resolver-tests.js index 956d949..237b93f 100644 --- a/test/resolver-tests.js +++ b/test/discovery/resolver-tests.js @@ -12,7 +12,7 @@ const { describe, it } = require('mocha'); describe('CoherenceResolver Test Suite (unit/IT)', () => { describe('A CoherenceResolver', () => { - function createListener(done) { + function createListener(done, expectedPort) { return { onSuccessfulResolution: ( addressList, @@ -24,7 +24,7 @@ describe('CoherenceResolver Test Suite (unit/IT)', () => { try { assert.equal(addressList.length, 1) assert.equal(addressList[0].addresses[0].host, '127.0.0.1') - assert.equal(addressList[0].addresses[0].port, 1408) + assert.equal(addressList[0].addresses[0].port, expectedPort) done() } catch (error) { done(error) @@ -37,38 +37,42 @@ describe('CoherenceResolver Test Suite (unit/IT)', () => { } it('should parse and resolve the connection format \'coherence:[host]\'', (done) => { - const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost'}, createListener(done)) + const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost'}, createListener(done, 10000), null) resolver.updateResolution() }) it('should parse and resolve the connection format \'coherence:[host]:[port]\'', (done) => { - const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:7574'}, createListener(done), null) + const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:7574'}, createListener(done, 10000), null) resolver.updateResolution() }) - it('should parse and resolve the connection format \'coherence:[host]:[clusterName]\'', () => { - const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:test'}, createListener(null), null) - let [host, port, cluster] = resolver.parseConn() - assert.equal(host, 'localhost') - assert.equal(port, '7574') - assert.equal(cluster, 'test') + it('should parse and resolve the connection format \'coherence:[host]:[clusterName]\'', (done) => { + const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:grpc-cluster2'}, createListener(done, 10001), null) + resolver.updateResolution() }) - it('should parse and resolve the connection format \'coherence:[host]:[port]:[clusterName]\'', () => { - const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:8888:test'}, createListener(null), null) - let [host, port, cluster] = resolver.parseConn() - assert.equal(host, 'localhost') - assert.equal(port, '8888') - assert.equal(cluster, 'test') + it('should parse and resolve the connection format \'coherence:[host]:[port]:[clusterName]\'', (done) => { + const resolver = new CoherenceResolver({scheme: 'coherence', path: 'localhost:7574:grpc-cluster2'}, createListener(done, 10001), null) + resolver.updateResolution() }) }) describe('A Session', () => { - it(' should be able to resolve the gRPC Proxy', async () => { + it('should be able to resolve the gRPC Proxy', async () => { const session = new Session({address: 'coherence:localhost'}) const cache = session.getCache('test') - await cache.put('a', 'b') - assert.equals(await cache.get('a'), ) + await cache.set('a', 'b') + assert.equal(await cache.get('a'), 'b') + await session.close() + }) + + it('should be able to resolve the gRPC Proxy of a foreign cluster', async () => { + const session = new Session({address: 'coherence:localhost:grpc-cluster2'}) + const cache = session.getCache('test') + await cache.set('a', 'b') + assert.equal(await cache.get('a'), 'b') + console.log("HERE") + await session.close() }) }) }) \ No newline at end of file