diff --git a/packages/sources/deutsche-boerse/src/config/index.ts b/packages/sources/deutsche-boerse/src/config/index.ts index ad7cf6e0cd..cb129d4ea0 100644 --- a/packages/sources/deutsche-boerse/src/config/index.ts +++ b/packages/sources/deutsche-boerse/src/config/index.ts @@ -12,4 +12,9 @@ export const config = new AdapterConfig({ type: 'string', default: 'wss://md.deutsche-boerse.com', }, + CACHE_TTL_REFRESH_MS: { + description: 'Interval to refresh the TTL of active subscriptions', + default: 60000, + type: 'number', + }, }) diff --git a/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts b/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts index 551eede4eb..5de1c7903b 100644 --- a/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts +++ b/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts @@ -35,6 +35,31 @@ export class InstrumentQuoteCache { quote.mid = mid.toNumber() quote.quoteProviderTimeUnixMs = providerTime } + addBid(isin: string, bid: number, providerTime: number) { + const quote = this.get(isin) + if (!quote) { + throw new Error(`Cannot add quote for inactive ISIN ${isin}`) + } + if (quote.ask !== undefined) { + const mid = new Decimal(bid).plus(quote.ask).div(2) + quote.mid = mid.toNumber() + } + quote.bid = bid + quote.quoteProviderTimeUnixMs = providerTime + } + addAsk(isin: string, ask: number, providerTime: number) { + const quote = this.get(isin) + if (!quote) { + throw new Error(`Cannot add quote for inactive ISIN ${isin}`) + } + + if (quote.bid !== undefined) { + const mid = new Decimal(quote.bid).plus(ask).div(2) + quote.mid = mid.toNumber() + } + quote.ask = ask + quote.quoteProviderTimeUnixMs = providerTime + } addTrade(isin: string, lastPrice: number, providerTime: number) { const quote = this.get(isin) if (!quote) { diff --git a/packages/sources/deutsche-boerse/src/transport/lwba.ts b/packages/sources/deutsche-boerse/src/transport/lwba.ts index 25d227e6cb..2bf221fc8b 100644 --- a/packages/sources/deutsche-boerse/src/transport/lwba.ts +++ b/packages/sources/deutsche-boerse/src/transport/lwba.ts @@ -1,4 +1,5 @@ import { create, fromBinary, toBinary } from '@bufbuild/protobuf' +import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports' import { makeLogger } from '@chainlink/external-adapter-framework/util' import { BaseEndpointTypes, Market, MARKETS } from '../endpoint/lwba' import { @@ -12,7 +13,8 @@ import { MarketDataSchema, type MarketData } from '../gen/md_cef_pb' import { InstrumentQuoteCache } from './instrument-quote-cache' import { decimalToNumber, - isSingleQuoteFrame, + hasSingleBidFrame, + hasSingleOfferFrame, isSingleTradeFrame, parseIsin, pickProviderTime, @@ -29,16 +31,38 @@ const logger = makeLogger('DeutscheBoerseTransport') export function createLwbaWsTransport() { const cache = new InstrumentQuoteCache() - - return new ProtobufWsTransport({ + let ttlInterval: ReturnType | undefined + const transport = new ProtobufWsTransport({ url: (context) => `${context.adapterSettings.WS_API_ENDPOINT}/stream?format=proto`, options: async (context) => ({ headers: { 'X-API-Key': context.adapterSettings.API_KEY }, followRedirects: true, }), handlers: { - open: () => { + open: async (_connection, context) => { logger.info('LWBA websocket connection established') + + // Clear any previous interval + if (ttlInterval) { + clearInterval(ttlInterval) + ttlInterval = undefined + } + + const doRefresh = async () => { + try { + await updateTTL(transport, context.adapterSettings.CACHE_MAX_AGE) + logger.info( + { refreshMs: context.adapterSettings.CACHE_TTL_REFRESH_MS }, + 'Refreshed TTL for active subscriptions', + ) + } catch (err) { + logger.error({ err }, 'Failed TTL refresh') + } + } + + // Refresh immediately, then every minute + await doRefresh() + ttlInterval = setInterval(doRefresh, context.adapterSettings.CACHE_TTL_REFRESH_MS) }, error: (errorEvent) => { logger.error({ errorEvent }, 'LWBA websocket error') @@ -48,6 +72,10 @@ export function createLwbaWsTransport() { const reason = (closeEvent as any)?.reason const wasClean = (closeEvent as any)?.wasClean logger.info({ code, reason, wasClean }, 'LWBA websocket closed') + if (ttlInterval) { + clearInterval(ttlInterval) + ttlInterval = undefined + } }, message(buf) { logger.debug( @@ -62,6 +90,7 @@ export function createLwbaWsTransport() { if (!sm) { return [] } + transport.lastMessageReceivedAt = Date.now() const decoded = decodeSingleMarketData(sm) if (!decoded) { return [] @@ -138,6 +167,8 @@ export function createLwbaWsTransport() { }, unsubscribeMessage: (p: { market: string; isin: string }) => { + const err = new Error() + console.error(err.stack) cache.deactivate(p.isin) if (cache.isEmpty()) { const req = create(RequestSchema, { @@ -159,6 +190,7 @@ export function createLwbaWsTransport() { }, }, }) + return transport } // --- helpers ----------------------------------------------------------------- @@ -171,6 +203,10 @@ function decodeStreamMessage(buf: Buffer): StreamMessage | null { } } +const updateTTL = async (transport: WebSocketTransport, ttl: number) => { + const params = await transport.subscriptionSet.getAll() + transport.responseCache.writeTTL(transport.name, params, ttl) +} function processMarketData( md: MarketData, cache: InstrumentQuoteCache, @@ -197,23 +233,41 @@ function processMarketData( if (isSingleTradeFrame(dat)) { const latestPrice = decimalToNumber(dat.Px) cache.addTrade(isin, latestPrice, providerTime) - logger.debug( + logger.info( { isin, latestPrice, providerTimeUnixMs: providerTime }, 'Processed single trade frame', ) return { isin, providerTime } } - - if (isSingleQuoteFrame(dat)) { + if (hasSingleBidFrame(dat) && hasSingleOfferFrame(dat)) { const bidPx = decimalToNumber(dat!.Bid!.Px) const askPx = decimalToNumber(dat!.Offer!.Px) cache.addQuote(isin, bidPx, askPx, providerTime) - logger.debug( + logger.info( { isin, bid: bidPx, ask: askPx, mid: (bidPx + askPx) / 2, providerTimeUnixMs: providerTime }, 'Processed single quote frame', ) return { isin, providerTime } } + if (hasSingleBidFrame(dat)) { + const bidPx = decimalToNumber(dat!.Bid!.Px) + cache.addBid(isin, bidPx, providerTime) + logger.info( + { isin, bid: bidPx, providerTimeUnixMs: providerTime }, + 'Processed single bid frame', + ) + return { isin, providerTime } + } + + if (hasSingleOfferFrame(dat)) { + const askPx = decimalToNumber(dat!.Offer!.Px) + cache.addAsk(isin, askPx, providerTime) + logger.info( + { isin, ask: askPx, providerTimeUnixMs: providerTime }, + 'Processed single offer frame', + ) + return { isin, providerTime } + } logger.debug({ isin, keys: Object.keys(dat ?? {}) }, 'Ignoring unsupported market data frame') return null diff --git a/packages/sources/deutsche-boerse/src/transport/proto-utils.ts b/packages/sources/deutsche-boerse/src/transport/proto-utils.ts index 16ef94a227..d85a4404a0 100644 --- a/packages/sources/deutsche-boerse/src/transport/proto-utils.ts +++ b/packages/sources/deutsche-boerse/src/transport/proto-utils.ts @@ -54,7 +54,12 @@ export function isSingleTradeFrame(dat?: DataProto): boolean { return isDecimalPrice(dat?.Px) } -// true if this frame carries only a single best bid/offer (not multui-level) -export function isSingleQuoteFrame(dat?: DataProto): boolean { - return isDecimalPrice(dat?.Bid?.Px) && isDecimalPrice(dat?.Offer?.Px) +// true if this frame carries a single best bid (not multui-level) +export function hasSingleBidFrame(dat?: DataProto): boolean { + return isDecimalPrice(dat?.Bid?.Px) +} + +// true if this frame carries a single best offer (not multui-level) +export function hasSingleOfferFrame(dat?: DataProto): boolean { + return isDecimalPrice(dat?.Offer?.Px) } diff --git a/packages/sources/deutsche-boerse/test/integration/fixtures.ts b/packages/sources/deutsche-boerse/test/integration/fixtures.ts index 86cd697021..b84a4e2ad5 100644 --- a/packages/sources/deutsche-boerse/test/integration/fixtures.ts +++ b/packages/sources/deutsche-boerse/test/integration/fixtures.ts @@ -1,7 +1,5 @@ -// test/integration/fixtures.ts import { create, toBinary } from '@bufbuild/protobuf' import { MockWebsocketServer } from '@chainlink/external-adapter-framework/util/testing-utils' -// ⬅️ don't rely on Any.pack for tests; just set {typeUrl, value} import { Status as ClientStatus, ResponseSchema, diff --git a/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts b/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts index 2b7395d4d0..4c76e03fb8 100644 --- a/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts +++ b/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts @@ -3,6 +3,7 @@ import { InstrumentQuoteCache } from '../../src/transport/instrument-quote-cache describe('InstrumentQuoteCache', () => { const ISIN = 'IE00B53L3W79' const ISIN2 = 'US0000000001' + test('activate/deactivate/has/isEmpty/get', () => { const cache = new InstrumentQuoteCache() expect(cache.isEmpty()).toBe(true) @@ -20,32 +21,65 @@ describe('InstrumentQuoteCache', () => { cache.activate(ISIN) cache.addQuote(ISIN, 100, 102, 1234) - const q = cache.get(ISIN) - expect(q?.bid).toBe(100) - expect(q?.ask).toBe(102) - expect(q?.mid).toBe(101) - expect(q?.quoteProviderTimeUnixMs).toBe(1234) + const q = cache.get(ISIN)! + expect(q.bid).toBe(100) + expect(q.ask).toBe(102) + expect(q.mid).toBe(101) + expect(q.quoteProviderTimeUnixMs).toBe(1234) }) - test('addTrade sets latestPrice and trade time', () => { + test('addBid then addAsk recomputes mid and updates quote time', () => { const cache = new InstrumentQuoteCache() cache.activate(ISIN) - cache.addTrade(ISIN, 99.5, 2222) - const q = cache.get(ISIN) + cache.addBid(ISIN, 100, 1111) // only bid + let q = cache.get(ISIN)! + expect(q.bid).toBe(100) + expect(q.ask).toBeUndefined() + expect(q.mid).toBeUndefined() + expect(q.quoteProviderTimeUnixMs).toBe(1111) - expect(q?.latestPrice).toBe(99.5) - expect(q?.tradeProviderTimeUnixMs).toBe(2222) + cache.addAsk(ISIN, 102, 2222) // now ask arrives + q = cache.get(ISIN)! + expect(q.ask).toBe(102) + expect(q.mid).toBe(101) + expect(q.quoteProviderTimeUnixMs).toBe(2222) }) - test('addQuote without activate throws', () => { + + test('addAsk then addBid recomputes mid and updates quote time', () => { const cache = new InstrumentQuoteCache() - expect(() => cache.addQuote(ISIN, 100, 102, 1234)).toThrow(/inactive isin/i) + cache.activate(ISIN) + + cache.addAsk(ISIN, 50, 3333) + let q = cache.get(ISIN)! + expect(q.ask).toBe(50) + expect(q.mid).toBeUndefined() + + cache.addBid(ISIN, 48, 4444) + q = cache.get(ISIN)! + expect(q.bid).toBe(48) + expect(q.mid).toBe(49) + expect(q.quoteProviderTimeUnixMs).toBe(4444) + }) + + test('addTrade sets latestPrice and trade time', () => { + const cache = new InstrumentQuoteCache() + cache.activate(ISIN) + + cache.addTrade(ISIN, 99.5, 2222) + const q = cache.get(ISIN)! + expect(q.latestPrice).toBe(99.5) + expect(q.tradeProviderTimeUnixMs).toBe(2222) }) - test('addTrade without activate throws', () => { + test('addQuote/addBid/addAsk/addTrade without activate throws', () => { const cache = new InstrumentQuoteCache() + expect(() => cache.addQuote(ISIN, 100, 102, 1234)).toThrow(/inactive isin/i) + expect(() => cache.addBid(ISIN, 100, 1)).toThrow(/inactive isin/i) + expect(() => cache.addAsk(ISIN, 100, 1)).toThrow(/inactive isin/i) expect(() => cache.addTrade(ISIN, 99.5, 2222)).toThrow(/inactive isin/i) }) + test('deactivate then attempt to add -> throws', () => { const cache = new InstrumentQuoteCache() cache.activate(ISIN) @@ -53,6 +87,7 @@ describe('InstrumentQuoteCache', () => { expect(() => cache.addQuote(ISIN, 1, 2, 3)).toThrow(/inactive isin/i) expect(() => cache.addTrade(ISIN, 1, 3)).toThrow(/inactive isin/i) }) + test('mid is computed correctly for equal sides and edge values', () => { const cache = new InstrumentQuoteCache() cache.activate(ISIN) diff --git a/packages/sources/deutsche-boerse/test/unit/lwba.test.ts b/packages/sources/deutsche-boerse/test/unit/lwba.test.ts index e8f939f808..538daf208b 100644 --- a/packages/sources/deutsche-boerse/test/unit/lwba.test.ts +++ b/packages/sources/deutsche-boerse/test/unit/lwba.test.ts @@ -9,30 +9,27 @@ import { type Decimal, type MarketData, } from '../../src/gen/md_cef_pb' -import { createLwbaWsTransport } from '../../src/transport/lwba' +import { createLwbaWsTransport } from '../../src/transport/lwba' // keep your existing path LoggerFactoryProvider.set() const dec = (m: bigint, e: number): Decimal => create(DecimalSchema, { m, e }) - type MarketDataInit = MessageInitShape - const MARKET = 'md-xetraetfetp' as const +const ISIN = 'IE00B53L3W79' +const OTHER = 'US0000000001' function makeStreamBuffer(md: MarketData | MarketDataInit): Buffer { const mdMsg = create(MarketDataSchema, md as MarketDataInit) const anyMsg: Any = anyPack(MarketDataSchema, mdMsg) const sm = create(StreamMessageSchema, { - subs: MARKET, // include market/stream on the frame - messages: [anyMsg], // exactly one Any payload + subs: MARKET, + messages: [anyMsg], }) return Buffer.from(toBinary(StreamMessageSchema, sm)) } -describe('LWBA transport (more integration cases)', () => { - const ISIN = 'IE00B53L3W79' - const OTHER = 'US0000000001' - +describe('LWBA websocket transport', () => { test('message for non-activated instrument returns []', () => { const t = createLwbaWsTransport() as any const md = create(MarketDataSchema, { @@ -43,7 +40,7 @@ describe('LWBA transport (more integration cases)', () => { Tm: 1_000_000n, } as any), } as any) - const out = t.config.handlers.message(makeStreamBuffer(md), {} as any) + const out = t.config.handlers.message(makeStreamBuffer(md)) expect(out).toEqual([]) }) @@ -51,7 +48,6 @@ describe('LWBA transport (more integration cases)', () => { const t = createLwbaWsTransport() as any const first = t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) const second = t.config.builders.subscribeMessage({ market: MARKET, isin: OTHER }) - expect(first).toBeInstanceOf(Uint8Array) expect(second).toBeUndefined() }) @@ -71,12 +67,10 @@ describe('LWBA transport (more integration cases)', () => { test('missing ISIN: handler returns []', () => { const t = createLwbaWsTransport() as any t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) - const md = create(MarketDataSchema, { Dat: create(DataSchema, { Px: dec(100n, 0), Tm: 1_000_000n } as any), } as any) - - const out = t.config.handlers.message(makeStreamBuffer(md), {} as any) + const out = t.config.handlers.message(makeStreamBuffer(md)) expect(out).toEqual([]) }) @@ -91,13 +85,13 @@ describe('LWBA transport (more integration cases)', () => { Tm: 5_000_000n, } as any) const quoteMd = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: quoteDat } as any) - const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd), {} as any) + const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd)) expect(quoteRes).toEqual([]) // Trade (now latestPrice arrives) -> should emit with full set const tradeDat = create(DataSchema, { Px: dec(9999n, -2), Tm: 6_000_000n } as any) const tradeMd = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: tradeDat } as any) - const tradeRes = t.config.handlers.message(makeStreamBuffer(tradeMd), {} as any) + const tradeRes = t.config.handlers.message(makeStreamBuffer(tradeMd)) expect(tradeRes.length).toBe(1) const [entry] = tradeRes @@ -107,12 +101,70 @@ describe('LWBA transport (more integration cases)', () => { expect(d.ask).toBe(101) expect(d.mid).toBe(100.5) expect(d.latestPrice).toBe(99.99) - - // quote time remains; trade time now populated expect(d.quoteProviderIndicatedTimeUnixMs).toBe(5) expect(d.tradeProviderIndicatedTimeUnixMs).toBe(6) - - // providerIndicatedTime is from the emitted (trade) frame expect(entry.response.timestamps.providerIndicatedTimeUnixMs).toBe(6) }) + + test('bid-only then ask-only then trade → emits once both quote & trade are known', () => { + const t = createLwbaWsTransport() as any + t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) + + // bid-only + const bidOnly = create(MarketDataSchema, { + Instrmt: { Sym: ISIN }, + Dat: create(DataSchema, { Bid: { Px: dec(10000n, -2) }, Tm: 10_000_000n } as any), + } as any) + expect(t.config.handlers.message(makeStreamBuffer(bidOnly))).toEqual([]) + + // ask-only + const askOnly = create(MarketDataSchema, { + Instrmt: { Sym: ISIN }, + Dat: create(DataSchema, { Offer: { Px: dec(10200n, -2) }, Tm: 11_000_000n } as any), + } as any) + expect(t.config.handlers.message(makeStreamBuffer(askOnly))).toEqual([]) + + // trade → emit + const trade = create(MarketDataSchema, { + Instrmt: { Sym: ISIN }, + Dat: create(DataSchema, { Px: dec(10100n, -2), Tm: 12_000_000n } as any), + } as any) + const [entry] = t.config.handlers.message(makeStreamBuffer(trade)) + expect(entry.response.data.bid).toBe(100) + expect(entry.response.data.ask).toBe(102) + expect(entry.response.data.mid).toBe(101) + expect(entry.response.data.latestPrice).toBe(101) + expect(entry.response.data.quoteProviderIndicatedTimeUnixMs).toBe(11) + expect(entry.response.data.tradeProviderIndicatedTimeUnixMs).toBe(12) + }) + + test('defensive decoding: bad buffer returns []', () => { + const t = createLwbaWsTransport() as any + const res = t.config.handlers.message(Buffer.from('not-a-protobuf')) + expect(res).toEqual([]) + }) + + test('open() refreshes TTL immediately and on interval', async () => { + jest.useFakeTimers() // modern timers in your Jest config + const t = createLwbaWsTransport() as any + + // stub framework bits + const writeTTL = jest.fn() + t.responseCache = { writeTTL } + t.subscriptionSet = { getAll: jest.fn().mockResolvedValue([]) } + + const ctx = { + adapterSettings: { WS_API_ENDPOINT: 'wss://example', API_KEY: 'key', CACHE_MAX_AGE: 45_000 }, + } as any + + await t.config.handlers.open({}, ctx) + expect(writeTTL).toHaveBeenCalledTimes(1) + + // Advance one full interval AND await the async callback + await jest.advanceTimersByTimeAsync(60_000) + + expect(writeTTL).toHaveBeenCalledTimes(2) + + jest.useRealTimers() + }) }) diff --git a/packages/sources/deutsche-boerse/test/unit/proto-utils.test.ts b/packages/sources/deutsche-boerse/test/unit/proto-utils.test.ts index 4026e2c351..98bad6b13e 100644 --- a/packages/sources/deutsche-boerse/test/unit/proto-utils.test.ts +++ b/packages/sources/deutsche-boerse/test/unit/proto-utils.test.ts @@ -10,7 +10,8 @@ import { import { convertNsToMs, decimalToNumber, - isSingleQuoteFrame, + hasSingleBidFrame, + hasSingleOfferFrame, isSingleTradeFrame, parseIsin, pickProviderTime, @@ -19,29 +20,20 @@ import { describe('proto-utils', () => { const dec = (m: bigint, e: number): Decimal => create(DecimalSchema, { m, e }) - test('decimalToNumber – basic fractional', () => { + test('decimalToNumber – basic fractional and integer scaling', () => { expect(decimalToNumber(dec(123n, -2))).toBeCloseTo(1.23) - }) - - test('decimalToNumber – integer scaling', () => { expect(decimalToNumber(dec(42n, 0))).toBe(42) expect(decimalToNumber(dec(42n, 1))).toBe(420) }) - // Boundary: exactly 15 significant digits passes test('decimalToNumber – exactly 15 significant digits passes', () => { expect(decimalToNumber(dec(999_999_999_999_999n, 0))).toBe(999_999_999_999_999) }) - // Fail: > 15 significant digits (16-digit mantissa) test('decimalToNumber – throws when value has > 15 significant digits', () => { expect(() => decimalToNumber(dec(1_234_567_890_123_456n, 0))).toThrow( /more than 15 significant digits/i, ) - }) - - // Fail: still > 15 significant digits even after scaling (exponent doesn’t reduce sig-digits) - test('decimalToNumber – throws for 16-digit mantissa with negative exponent', () => { expect(() => decimalToNumber(dec(1_234_567_890_123_456n, -5))).toThrow( /more than 15 significant digits/i, ) @@ -52,7 +44,7 @@ describe('proto-utils', () => { expect(convertNsToMs(1_999_999n)).toBe(1) }) - test('getIsin (uses Instrmt.Sym)', () => { + test('parseIsin uses Instrmt.Sym', () => { const md: MarketData = create(MarketDataSchema, { Instrmt: { Sym: 'IE00B53L3W79' as string }, } as any) @@ -64,20 +56,19 @@ describe('proto-utils', () => { expect(pickProviderTime(dat)).toBe(5) }) - test('isSingleTradeFrame', () => { + test('frame guards: trade/bid/offer', () => { const datWithTrade: Data = create(DataSchema, { Px: dec(100n, -2) } as any) - const datNoTrade: Data = create(DataSchema, {} as any) + const datWithBid: Data = create(DataSchema, { Bid: { Px: dec(9999n, -2) } } as any) + const datWithOffer: Data = create(DataSchema, { Offer: { Px: dec(10050n, -2) } } as any) + const datEmpty: Data = create(DataSchema, {} as any) + expect(isSingleTradeFrame(datWithTrade)).toBe(true) - expect(isSingleTradeFrame(datNoTrade)).toBe(false) - }) + expect(isSingleTradeFrame(datEmpty)).toBe(false) - test('isSingleQuoteFrame', () => { - const datWithQuote: Data = create(DataSchema, { - Bid: { Px: dec(10000n, -2) }, - Offer: { Px: dec(10050n, -2) }, - } as any) - const datMissing: Data = create(DataSchema, { Bid: {} } as any) - expect(isSingleQuoteFrame(datWithQuote)).toBe(true) - expect(isSingleQuoteFrame(datMissing)).toBe(false) + expect(hasSingleBidFrame(datWithBid)).toBe(true) + expect(hasSingleBidFrame(datEmpty)).toBe(false) + + expect(hasSingleOfferFrame(datWithOffer)).toBe(true) + expect(hasSingleOfferFrame(datEmpty)).toBe(false) }) })