Skip to content

Drakkar-Software/Whistlers

Repository files navigation

Whistlers

Whistlers

Message-queue → destination bridge
Subscribe to queue topics and forward messages to Firebase, ClickHouse, PostgreSQL, or S3 — in real time.

npm version license


Packages

Package Description
@drakkar.software/whistlers Core library — adapters, bridge, config

Quick Start

import {
  Whistler,
  NatsQueueAdapter,
  FirebaseDestination,
  createConfig,
} from "@drakkar.software/whistlers"
import admin from "firebase-admin"

admin.initializeApp({ credential: admin.credential.applicationDefault() })

const config = createConfig({
  subscriptions: [
    {
      name: "orders",
      topics: ["orders.*"],
      group: "whistlers",
      notification: { title: "New order", body: "An order just came in" },
      dataFields: ["id", "status"],
    },
  ],
})

const whistler = new Whistler({
  queue: new NatsQueueAdapter({ servers: "nats://localhost:4222" }),
  destination: new FirebaseDestination(),
  config,
  logger: {
    info: (msg, ...args) => console.log("[info]", msg, ...args),
    warn: (msg, ...args) => console.warn("[warn]", msg, ...args),
    error: (msg, ...args) => console.error("[error]", msg, ...args),
  },
  onError: (err, ctx) =>
    console.error("Failed to forward", ctx.message.topic, err),
})

await whistler.start()
// ...
await whistler.stop()

Configuration

import { createConfig } from "@drakkar.software/whistlers"

const config = createConfig({
  subscriptions: [
    {
      name: "orders",
      topics: ["orders.*"],
      group: "whistlers",
      destinationTopic: "orders",
      notification: { title: "New order", body: "An order was placed" },
      dataFields: ["id", "status"],
    },
  ],
})

createConfig validates the config and throws a descriptive error if it is invalid.

To load config from a JSON string (e.g. a file read from disk), use parseConfigJson:

import { parseConfigJson } from "@drakkar.software/whistlers"
import { readFileSync } from "fs"

const config = parseConfigJson(readFileSync("whistlers.json", "utf8"))

parseConfigJson throws a descriptive error for invalid JSON or a structurally invalid config.

Subscription fields

Field Type Required Description
name string Unique identifier for this subscription
topics string[] Queue-native topic patterns (see Topic Matching)
group string Consumer group name (NATS queue group / MQTT shared subscription)
destinationTopic string Destination topic name. Defaults to the sanitized source topic (. and /-). Call sanitizeTopic(topic) for custom transformations.
notification { title?, body? } Static notification content passed through to the destination
dataFields string[] Top-level payload fields to forward as string key/value pairs

Namespaces

Namespaces group subscriptions under a named key. Each namespace prefixes its subscriptions' destination topics with {name}- and attaches the namespace name as namespace on the OutgoingNotification so destination adapters can segment traffic.

import type { NamespaceConfig } from "@drakkar.software/whistlers"

function makeTenantNamespace(tenant: string): NamespaceConfig {
  return {
    subscriptions: [
      {
        name: "orders",
        topics: ["orders.*"],
        group: `whistlers-${tenant}`,
        destinationTopic: "orders",   // becomes `{tenant}-orders` at runtime
        notification: { title: "Order update" },
        dataFields: ["id", "status"],
      },
    ],
  }
}

const config = createConfig({
  subscriptions: [
    // root subscriptions — no prefix applied
    { name: "system-alerts", topics: ["system.alerts.>"], destinationTopic: "system-alerts" },
  ],
  namespaces: {
    acme:   makeTenantNamespace("acme"),
    globex: makeTenantNamespace("globex"),
  },
})

A message on orders.created is forwarded twice: once as topic acme-orders (with namespace: "acme") and once as globex-orders (with namespace: "globex"). Mobile clients subscribe to the FCM topic for their tenant.

Rules:

  • Namespace keys must match [a-zA-Z0-9_-]+.
  • Each namespace must contain at least one subscription.
  • Subscription names must be unique within a namespace (root is its own scope — the same name may repeat across scopes).
  • The {namespace}- prefix is applied to destinationTopic (explicit or source-derived). Source topic patterns are not modified.

The namespace field is available on OutgoingNotification for all destination adapters:

new FirebaseDestination({
  format: (n) => ({
    data: { namespace: n.namespace ?? "global" },
  }),
})

Queue Adapters

Adapter Description
NatsQueueAdapter NATS Core with queue group support
MqttQueueAdapter MQTT v3/v5 with shared subscription support

NATS

new NatsQueueAdapter({ servers: "nats://localhost:4222" })
// multiple servers
new NatsQueueAdapter({ servers: ["nats://n1:4222", "nats://n2:4222"] })

Wildcard syntax: orders.* (single token), events.> (all remaining tokens).

When group is set, Whistlers subscribes with a queue group so only one instance in the group processes each message — useful for running multiple Whistlers instances without duplicate deliveries.

MQTT

new MqttQueueAdapter({ url: "mqtt://localhost:1883" })
// with client options
new MqttQueueAdapter({
  url: "mqtts://broker.example.com",
  options: { clientId: "whistlers-1", username: "user", password: "pass" },
})

Wildcard syntax: orders/+ (single level), events/# (all levels).

When group is set, Whistlers uses a shared subscription ($share/{group}/topic) so the broker delivers each message to exactly one subscriber in the group.

Destination Adapters

Adapter Peer dependency What it does
FirebaseDestination firebase-admin Sends FCM push notifications
ClickHouseDestination @clickhouse/client Inserts rows into a ClickHouse table
PostgresDestination pg Inserts rows into a PostgreSQL table
S3Destination @aws-sdk/client-s3 Writes notification JSON objects to S3
SSEDestination none (built-in) Runs an HTTP server and streams notifications to connected SSE clients
NamespaceRoutingDestination none Routes each notification to a per-namespace destination (e.g. one Firebase project per namespace)

Each adapter is an optional peer dependency — install only what you use. SSEDestination uses Node's built-in node:http, so it needs no extra dependency.

Firebase

// uses the default Firebase app (must call admin.initializeApp() first)
new FirebaseDestination()

// supply a specific app instance
new FirebaseDestination({ app: myFirebaseApp })

// custom FCM message — return any FCM fields (notification, data, android, apns, etc.)
new FirebaseDestination({
  format: (n) => ({
    notification: { title: n.notification?.title, body: String(n.rawPayload) },
    data: { id: String((n.rawPayload as Record<string, unknown>)["id"]) },
    android: { priority: "high" },
  }),
})

topic is set from the subscription config and cannot be overridden by format.

To target a combination of topics instead of a single one, return a non-empty condition from format — an FCM condition expression (boolean over up to 5 topics). When present it is sent as condition and the topic is omitted (FCM accepts one or the other, never both); an absent/empty condition falls back to the normal topic send:

// deliver to everyone subscribed to the space topic EXCEPT the author's own devices
new FirebaseDestination({
  format: (n) => {
    const authorId = (n.rawPayload as { identity?: string }).identity
    return {
      notification: { title: "New message" },
      ...(authorId
        ? { condition: `'${n.topic}' in topics && !('user-${authorId}' in topics)` }
        : {}), // no identity → plain topic send (backward-compatible)
    }
  },
})
pnpm add firebase-admin

Multiple messages per event

Return an array from format to send several FCM messages for a single event. Each element is addressed independently (the same condition/topic rule applies per element), so you can fan out, e.g., a notification placeholder the OS shows immediately plus a data-only message that wakes the app to replace it with richer content — both carrying the same exclusion condition:

new FirebaseDestination({
  format: (n) => [
    // 1. Placeholder — OS-displayed even if the app can't run (force-quit / Doze).
    {
      notification: { title: "New message", body: "New message in another room" },
      android: { notification: { tag: String((n.rawPayload as Record<string, unknown>)["roomId"]) } },
    },
    // 2. Data-only upgrade — wakes the background handler to show the real content.
    {
      data: { roomId: String((n.rawPayload as Record<string, unknown>)["roomId"]) },
      android: { priority: "high" },
    },
  ],
})

A single object (the common case) is unchanged — exactly one messaging.send. An array goes through messaging.sendEach (one batch round trip); an empty array sends nothing. By default a partial batch failure resolves (a delivered message isn't undone by a sibling's failure) — pass multiSendFailure: "throw" to reject if any message fails; a batch where every message fails always rejects.

Ordering caveat: FCM does not guarantee delivery order between the messages in a batch. In the placeholder/upgrade pattern the OS-rendered placeholder almost always lands before the slower data-driven upgrade, but the reverse race is possible — align the placeholder's android.notification.tag with the client's replacement id and have the client cancel-then-replace to make the swap deterministic.

One Firebase project per namespace

To send each namespace through its own Firebase project (a separate service-account key), give each project its own firebase-admin app and route between them with NamespaceRoutingDestination:

import { initializeApp, applicationDefault, cert } from "firebase-admin/app"
import { FirebaseDestination, NamespaceRoutingDestination } from "@drakkar.software/whistlers"

// default app — handles root (non-namespaced) subscriptions
initializeApp({ credential: applicationDefault() })

// one named app per namespace, each pointing at that namespace's Firebase project
const acmeApp   = initializeApp({ credential: cert("/secrets/acme-sa.json")   }, "acme")
const globexApp = initializeApp({ credential: cert("/secrets/globex-sa.json") }, "globex")

const destination = new NamespaceRoutingDestination({
  routes: {
    acme:   new FirebaseDestination({ app: acmeApp }),
    globex: new FirebaseDestination({ app: globexApp }),
  },
  default: new FirebaseDestination(), // root + any unrouted namespace
})

NamespaceRoutingDestination dispatches each notification by its namespace. A notification with no matching route — root subscriptions (namespace undefined) or an unknown namespace — goes to default; if default is omitted, send() throws (surfaced through onError) rather than dropping the message. close() closes every wrapped adapter (deduplicated by identity). It is destination-agnostic — the routes can be any DestinationAdapter, not just Firebase.

See examples/ts/nats-namespaces-multi-firebase.ts.

From JSON config / the bundled server

When running the bundled server (bin/server.ts) with DESTINATION_TYPE=firebase, add a firebaseCredentials path to any namespace and the server wires the routing for you — it initializes a dedicated app per namespace and falls back to Application Default Credentials for root subscriptions and namespaces without their own key:

{
  "version": 1,
  "subscriptions": [
    { "name": "system-alerts", "topics": ["system.alerts.>"], "destinationTopic": "system-alerts" }
  ],
  "namespaces": {
    "acme": {
      "firebaseCredentials": "/secrets/acme-sa.json",
      "subscriptions": [{ "name": "orders", "topics": ["orders.*"], "destinationTopic": "orders" }]
    },
    "globex": {
      "firebaseCredentials": "/secrets/globex-sa.json",
      "subscriptions": [{ "name": "orders", "topics": ["orders.*"], "destinationTopic": "orders" }]
    }
  }
}

firebaseCredentials is a path to a service-account JSON key file — never inline the credentials. It is read only by the bundled server's Firebase destination; the Whistler bridge and other destination types ignore it.

The default app (Application Default Credentials, e.g. GOOGLE_APPLICATION_CREDENTIALS) is still required whenever something falls through to it — i.e. there are root subscriptions or any namespace without firebaseCredentials. If every namespace has its own key and there are no root subscriptions, the server skips the default app entirely, so no ADC is needed.

ClickHouse

Insert each notification as a row. Default schema:

CREATE TABLE notifications (
    topic         String,
    source_topic  String,
    notification  Nullable(String),
    data          Nullable(String),
    raw_payload   String,
    received_at   DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY received_at;
new ClickHouseDestination({
  url: "http://localhost:8123",
  database: "default",
  table: "notifications",
  username: "default",  // optional
  password: "",         // optional
})

// custom row shape
new ClickHouseDestination({
  url: "http://localhost:8123",
  database: "default",
  table: "events",
  format: (n) => ({
    topic: n.topic,
    payload: JSON.stringify(n.rawPayload),
    received_at: new Date().toISOString(),
  }),
})
pnpm add @clickhouse/client

PostgreSQL

Insert each notification as a row. Default schema:

CREATE TABLE notifications (
    id           BIGSERIAL PRIMARY KEY,
    topic        TEXT NOT NULL,
    source_topic TEXT NOT NULL,
    notification JSONB,
    data         JSONB,
    raw_payload  JSONB NOT NULL,
    received_at  TIMESTAMPTZ DEFAULT NOW()
);
new PostgresDestination({
  connectionString: "postgresql://user:pass@host:5432/db",
  table: "notifications",
})

// custom row shape — keys become column names, values become query parameters
// supply your own timestamp when using format (the default query uses SQL NOW())
new PostgresDestination({
  connectionString: "postgresql://user:pass@host:5432/db",
  table: "events",
  format: (n) => ({
    topic: n.topic,
    payload: JSON.stringify(n.rawPayload),
    created_at: new Date().toISOString(),
  }),
})
pnpm add pg

S3

Write each notification as a JSON object. Keys follow the pattern {prefix}{topic}/{uuid}.json (default prefix: whistlers/).

// uses the AWS credential chain (env vars, IAM role, instance profile, etc.)
new S3Destination({ bucket: "my-bucket" })

// custom region and key prefix
new S3Destination({ bucket: "my-bucket", region: "eu-west-1", prefix: "events/" })

// pre-configured client — useful for LocalStack, MinIO, or custom endpoints
import { S3Client } from "@aws-sdk/client-s3"
new S3Destination({
  bucket: "my-bucket",
  client: new S3Client({ endpoint: "http://localhost:4566", region: "us-east-1" }),
})

// custom body — return an object (JSON-serialised, ContentType: application/json, key ends .json)
// or a string (used as-is, ContentType: text/plain, no .json extension)
new S3Destination({
  bucket: "my-bucket",
  format: (n) => ({ topic: n.topic, payload: n.rawPayload }),
})
pnpm add @aws-sdk/client-s3

Server-Sent Events (SSE)

Run an HTTP server that streams each notification to connected clients as a Server-Sent Events feed. Unlike the other destinations, the SSE server must be started before the bridge so clients can connect first — call listen() (or inject your own http.Server).

// start the server before whistler.start(); use 0 for an ephemeral port
const sse = new SSEDestination({ path: "/events" })
await sse.listen(8080)
new Whistler({ queue, destination: sse, config })

Clients connect with any SSE client and optionally filter by topic with the ?topic= query parameter (repeatable; omit it to receive every topic):

curl -N "http://localhost:8080/events?topic=orders"

Each event defaults to event: <topic>, an id: of a random UUID, and a data: payload of the JSON notification (topic, sourceTopic, notification, data, rawPayload). A format callback customises the event — return a string (used as the data: payload as-is), an object (JSON-serialised into data:), or an SSEEventInit for full control of data/event/id/retry:

new SSEDestination({
  path: "/events",
  heartbeatMs: 15000, // keep-alive comment interval (0 disables)
  headers: { "Access-Control-Allow-Origin": "*" }, // extra response headers, e.g. CORS
  format: (n) => ({ event: n.topic, data: { id: n.data?.["id"], payload: n.rawPayload } }),
})

// mount onto an existing server instead of calling listen() — its lifecycle stays yours
// (close() detaches the handler and ends client streams but never closes your server)
new SSEDestination({ server: myHttpServer })

The bundled bin/server.ts can run this destination directly: set DESTINATION_TYPE=sse (optionally SSE_PORT, default 8080, and SSE_PATH, default /events).

Topic Matching

Each adapter implements queue-native wildcard semantics:

Adapter Single-level wildcard Multi-level wildcard
NATS * > (must be last token)
MQTT + # (must be last level)

A message arriving on orders.created matches the pattern orders.* (NATS). A message on sensors/temp/zone1 matches sensors/# (MQTT). Multiple subscriptions can match the same message — each fires independently.

Error Handling

Destination errors (e.g. connection failure, quota exceeded) are caught per-message. The bridge keeps running.

const whistler = new Whistler({
  // ...
  logger: {
    info: console.log,
    warn: console.warn,
    error: console.error,
  },
  onError: (err, { message, subscription, namespace }) => {
    // called after the logger, with the raw error and context
    metrics.increment("whistlers.forward_error", {
      topic: message.topic,
      namespace: namespace ?? "root",
    })
  },
})

If onError is omitted, errors are only logged (when a logger is provided).

Testing

The package exports MemoryQueueAdapter, MemoryDestination, and CustomQueueAdapter for use in your own test suites.

MemoryQueueAdapter

Inject messages directly with simulate():

import {
  MemoryQueueAdapter,
  MemoryDestination,
  Whistler,
  createConfig,
} from "@drakkar.software/whistlers"

const queue = new MemoryQueueAdapter()
const dest = new MemoryDestination()
const whistler = new Whistler({ queue, destination: dest, config })

await whistler.start()
await queue.simulate({ topic: "orders.created", payload: '{"id":"1"}', timestamp: Date.now() })

console.log(dest.sent[0]) // OutgoingNotification

CustomQueueAdapter

Plug in callbacks to observe or control what the bridge subscribes to:

import { CustomQueueAdapter } from "@drakkar.software/whistlers"
import type { TopicSubscription } from "@drakkar.software/whistlers"

const queue = new CustomQueueAdapter({
  onSubscribe: async (subs: TopicSubscription[]) => {
    console.log("subscribed to", subs)
  },
})

Deployment (Ansible)

An Ansible role is included under infra/ansible/roles/whistlers. It installs Node.js and pnpm, clones this repository, builds it, and runs the standalone server as a systemd service.

Using the role from another repository

Add a requirements.yml to your playbook repo pointing at this repository:

# requirements.yml
roles:
  - name: whistlers
    src: https://github.com/Drakkar-Software/Whistlers.git
    scm: git
    version: main          # pin to a tag or commit SHA in production
    src_path: infra/ansible/roles/whistlers

Install the role before running your playbook:

ansible-galaxy role install -r requirements.yml

Then reference it by name in your playbook:

- name: Deploy Whistlers
  hosts: whistlers_servers
  become: true
  vars:
    whistlers_queue_type: nats
    whistlers_queue_url: "nats://localhost:4222"
    whistlers_subscriptions:
      - name: orders
        topics: ["orders.*"]
        notification: { title: "New order", body: "An order was placed" }
        dataFields: ["id", "status"]
  roles:
    - whistlers

Running the bundled example playbook

The repository ships infra/ansible/site.yml as a ready-to-use example:

ansible-playbook -i inventory.ini infra/ansible/site.yml

Role variables

Variable Default Description
whistlers_queue_type nats nats or mqtt
whistlers_queue_url nats://localhost:4222 Broker URL
whistlers_firebase_credentials_path /etc/whistlers/service-account.json Path to the Firebase service-account on the target host
whistlers_subscriptions [] List of subscription objects (same schema as the JSON config)
whistlers_namespaces {} Map of namespace name → { subscriptions: [], firebaseCredentials?: "" } objects. Optional firebaseCredentials is a path (on the target host) to that namespace's Firebase service-account key, routing it to its own project.
whistlers_version main Git branch, tag, or commit to deploy
whistlers_install_dir /opt/whistlers Where the repo is cloned

The service-account JSON must be placed on the target host before running the playbook (or provisioned separately via Vault / a secrets manager).

About

Message-queue to push-notification bridge

Resources

Stars

Watchers

Forks

Contributors

Languages