Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/sources/deutsche-boerse/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ export const config = new AdapterConfig({
type: 'string',
default: 'wss://md.deutsche-boerse.com',
},
CACHE_TTL_REFRESH_MS: {
description: 'Interval to refresh the TTL of active subscriptions',
default: 60000,
type: 'number',
},
})
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ export class InstrumentQuoteCache {
quote.mid = mid.toNumber()
quote.quoteProviderTimeUnixMs = providerTime
}
addBid(isin: string, bid: number, providerTime: number) {
const quote = this.get(isin)
if (!quote) {
throw new Error(`Cannot add quote for inactive ISIN ${isin}`)
}
if (quote.ask !== undefined) {
const mid = new Decimal(bid).plus(quote.ask).div(2)
quote.mid = mid.toNumber()
}
quote.bid = bid
quote.quoteProviderTimeUnixMs = providerTime
}
addAsk(isin: string, ask: number, providerTime: number) {
const quote = this.get(isin)
if (!quote) {
throw new Error(`Cannot add quote for inactive ISIN ${isin}`)
}

if (quote.bid !== undefined) {
const mid = new Decimal(quote.bid).plus(ask).div(2)
quote.mid = mid.toNumber()
}
quote.ask = ask
quote.quoteProviderTimeUnixMs = providerTime
}
addTrade(isin: string, lastPrice: number, providerTime: number) {
const quote = this.get(isin)
if (!quote) {
Expand Down
70 changes: 62 additions & 8 deletions packages/sources/deutsche-boerse/src/transport/lwba.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { create, fromBinary, toBinary } from '@bufbuild/protobuf'
import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports'
import { makeLogger } from '@chainlink/external-adapter-framework/util'
import { BaseEndpointTypes, Market, MARKETS } from '../endpoint/lwba'
import {
Expand All @@ -12,7 +13,8 @@ import { MarketDataSchema, type MarketData } from '../gen/md_cef_pb'
import { InstrumentQuoteCache } from './instrument-quote-cache'
import {
decimalToNumber,
isSingleQuoteFrame,
hasSingleBidFrame,
hasSingleOfferFrame,
isSingleTradeFrame,
parseIsin,
pickProviderTime,
Expand All @@ -29,16 +31,38 @@ const logger = makeLogger('DeutscheBoerseTransport')

export function createLwbaWsTransport() {
const cache = new InstrumentQuoteCache()

return new ProtobufWsTransport<WsTransportTypes>({
let ttlInterval: ReturnType<typeof setInterval> | undefined
const transport = new ProtobufWsTransport<WsTransportTypes>({
url: (context) => `${context.adapterSettings.WS_API_ENDPOINT}/stream?format=proto`,
options: async (context) => ({
headers: { 'X-API-Key': context.adapterSettings.API_KEY },
followRedirects: true,
}),
handlers: {
open: () => {
open: async (_connection, context) => {
logger.info('LWBA websocket connection established')

// Clear any previous interval
if (ttlInterval) {
clearInterval(ttlInterval)
ttlInterval = undefined
}

const doRefresh = async () => {
try {
await updateTTL(transport, context.adapterSettings.CACHE_MAX_AGE)
logger.info(
{ refreshMs: context.adapterSettings.CACHE_TTL_REFRESH_MS },
'Refreshed TTL for active subscriptions',
)
} catch (err) {
logger.error({ err }, 'Failed TTL refresh')
}
}

// Refresh immediately, then every minute
await doRefresh()
ttlInterval = setInterval(doRefresh, context.adapterSettings.CACHE_TTL_REFRESH_MS)
},
error: (errorEvent) => {
logger.error({ errorEvent }, 'LWBA websocket error')
Expand All @@ -48,6 +72,10 @@ export function createLwbaWsTransport() {
const reason = (closeEvent as any)?.reason
const wasClean = (closeEvent as any)?.wasClean
logger.info({ code, reason, wasClean }, 'LWBA websocket closed')
if (ttlInterval) {
clearInterval(ttlInterval)
ttlInterval = undefined
}
},
message(buf) {
logger.debug(
Expand All @@ -62,6 +90,7 @@ export function createLwbaWsTransport() {
if (!sm) {
return []
}
transport.lastMessageReceivedAt = Date.now()
const decoded = decodeSingleMarketData(sm)
if (!decoded) {
return []
Expand Down Expand Up @@ -138,6 +167,8 @@ export function createLwbaWsTransport() {
},

unsubscribeMessage: (p: { market: string; isin: string }) => {
const err = new Error()
console.error(err.stack)
cache.deactivate(p.isin)
if (cache.isEmpty()) {
const req = create(RequestSchema, {
Expand All @@ -159,6 +190,7 @@ export function createLwbaWsTransport() {
},
},
})
return transport
}

// --- helpers -----------------------------------------------------------------
Expand All @@ -171,6 +203,10 @@ function decodeStreamMessage(buf: Buffer): StreamMessage | null {
}
}

const updateTTL = async (transport: WebSocketTransport<WsTransportTypes>, ttl: number) => {
const params = await transport.subscriptionSet.getAll()
transport.responseCache.writeTTL(transport.name, params, ttl)
}
function processMarketData(
md: MarketData,
cache: InstrumentQuoteCache,
Expand All @@ -197,23 +233,41 @@ function processMarketData(
if (isSingleTradeFrame(dat)) {
const latestPrice = decimalToNumber(dat.Px)
cache.addTrade(isin, latestPrice, providerTime)
logger.debug(
logger.info(
{ isin, latestPrice, providerTimeUnixMs: providerTime },
'Processed single trade frame',
)
return { isin, providerTime }
}

if (isSingleQuoteFrame(dat)) {
if (hasSingleBidFrame(dat) && hasSingleOfferFrame(dat)) {
const bidPx = decimalToNumber(dat!.Bid!.Px)
const askPx = decimalToNumber(dat!.Offer!.Px)
cache.addQuote(isin, bidPx, askPx, providerTime)
logger.debug(
logger.info(
{ isin, bid: bidPx, ask: askPx, mid: (bidPx + askPx) / 2, providerTimeUnixMs: providerTime },
'Processed single quote frame',
)
return { isin, providerTime }
}
if (hasSingleBidFrame(dat)) {
const bidPx = decimalToNumber(dat!.Bid!.Px)
cache.addBid(isin, bidPx, providerTime)
logger.info(
{ isin, bid: bidPx, providerTimeUnixMs: providerTime },
'Processed single bid frame',
)
return { isin, providerTime }
}

if (hasSingleOfferFrame(dat)) {
const askPx = decimalToNumber(dat!.Offer!.Px)
cache.addAsk(isin, askPx, providerTime)
logger.info(
{ isin, ask: askPx, providerTimeUnixMs: providerTime },
'Processed single offer frame',
)
return { isin, providerTime }
}

logger.debug({ isin, keys: Object.keys(dat ?? {}) }, 'Ignoring unsupported market data frame')
return null
Expand Down
11 changes: 8 additions & 3 deletions packages/sources/deutsche-boerse/src/transport/proto-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ export function isSingleTradeFrame(dat?: DataProto): boolean {
return isDecimalPrice(dat?.Px)
}

// true if this frame carries only a single best bid/offer (not multui-level)
export function isSingleQuoteFrame(dat?: DataProto): boolean {
return isDecimalPrice(dat?.Bid?.Px) && isDecimalPrice(dat?.Offer?.Px)
// true if this frame carries a single best bid (not multui-level)
export function hasSingleBidFrame(dat?: DataProto): boolean {
return isDecimalPrice(dat?.Bid?.Px)
}

// true if this frame carries a single best offer (not multui-level)
export function hasSingleOfferFrame(dat?: DataProto): boolean {
return isDecimalPrice(dat?.Offer?.Px)
}
2 changes: 0 additions & 2 deletions packages/sources/deutsche-boerse/test/integration/fixtures.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
// test/integration/fixtures.ts
import { create, toBinary } from '@bufbuild/protobuf'
import { MockWebsocketServer } from '@chainlink/external-adapter-framework/util/testing-utils'
// ⬅️ don't rely on Any.pack for tests; just set {typeUrl, value}
import {
Status as ClientStatus,
ResponseSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { InstrumentQuoteCache } from '../../src/transport/instrument-quote-cache
describe('InstrumentQuoteCache', () => {
const ISIN = 'IE00B53L3W79'
const ISIN2 = 'US0000000001'

test('activate/deactivate/has/isEmpty/get', () => {
const cache = new InstrumentQuoteCache()
expect(cache.isEmpty()).toBe(true)
Expand All @@ -20,39 +21,73 @@ describe('InstrumentQuoteCache', () => {
cache.activate(ISIN)

cache.addQuote(ISIN, 100, 102, 1234)
const q = cache.get(ISIN)
expect(q?.bid).toBe(100)
expect(q?.ask).toBe(102)
expect(q?.mid).toBe(101)
expect(q?.quoteProviderTimeUnixMs).toBe(1234)
const q = cache.get(ISIN)!
expect(q.bid).toBe(100)
expect(q.ask).toBe(102)
expect(q.mid).toBe(101)
expect(q.quoteProviderTimeUnixMs).toBe(1234)
})

test('addTrade sets latestPrice and trade time', () => {
test('addBid then addAsk recomputes mid and updates quote time', () => {
const cache = new InstrumentQuoteCache()
cache.activate(ISIN)

cache.addTrade(ISIN, 99.5, 2222)
const q = cache.get(ISIN)
cache.addBid(ISIN, 100, 1111) // only bid
let q = cache.get(ISIN)!
expect(q.bid).toBe(100)
expect(q.ask).toBeUndefined()
expect(q.mid).toBeUndefined()
expect(q.quoteProviderTimeUnixMs).toBe(1111)

expect(q?.latestPrice).toBe(99.5)
expect(q?.tradeProviderTimeUnixMs).toBe(2222)
cache.addAsk(ISIN, 102, 2222) // now ask arrives
q = cache.get(ISIN)!
expect(q.ask).toBe(102)
expect(q.mid).toBe(101)
expect(q.quoteProviderTimeUnixMs).toBe(2222)
})
test('addQuote without activate throws', () => {

test('addAsk then addBid recomputes mid and updates quote time', () => {
const cache = new InstrumentQuoteCache()
expect(() => cache.addQuote(ISIN, 100, 102, 1234)).toThrow(/inactive isin/i)
cache.activate(ISIN)

cache.addAsk(ISIN, 50, 3333)
let q = cache.get(ISIN)!
expect(q.ask).toBe(50)
expect(q.mid).toBeUndefined()

cache.addBid(ISIN, 48, 4444)
q = cache.get(ISIN)!
expect(q.bid).toBe(48)
expect(q.mid).toBe(49)
expect(q.quoteProviderTimeUnixMs).toBe(4444)
})

test('addTrade sets latestPrice and trade time', () => {
const cache = new InstrumentQuoteCache()
cache.activate(ISIN)

cache.addTrade(ISIN, 99.5, 2222)
const q = cache.get(ISIN)!
expect(q.latestPrice).toBe(99.5)
expect(q.tradeProviderTimeUnixMs).toBe(2222)
})

test('addTrade without activate throws', () => {
test('addQuote/addBid/addAsk/addTrade without activate throws', () => {
const cache = new InstrumentQuoteCache()
expect(() => cache.addQuote(ISIN, 100, 102, 1234)).toThrow(/inactive isin/i)
expect(() => cache.addBid(ISIN, 100, 1)).toThrow(/inactive isin/i)
expect(() => cache.addAsk(ISIN, 100, 1)).toThrow(/inactive isin/i)
expect(() => cache.addTrade(ISIN, 99.5, 2222)).toThrow(/inactive isin/i)
})

test('deactivate then attempt to add -> throws', () => {
const cache = new InstrumentQuoteCache()
cache.activate(ISIN)
cache.deactivate(ISIN)
expect(() => cache.addQuote(ISIN, 1, 2, 3)).toThrow(/inactive isin/i)
expect(() => cache.addTrade(ISIN, 1, 3)).toThrow(/inactive isin/i)
})

test('mid is computed correctly for equal sides and edge values', () => {
const cache = new InstrumentQuoteCache()
cache.activate(ISIN)
Expand Down
Loading
Loading