Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/oko_api/server/oko_api_server.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ ES_USERNAME="username"
ES_PASSWORD="pw"

TYPEFORM_WEBHOOK_SECRET="typeform-webhook-secret"
TELEGRAM_BOT_TOKEN="telegram-bot-token"
TELEGRAM_BOT_TOKEN="telegram-bot-token"

SLACK_WEBHOOK_URL="https://hooks.slack.com/services/..."
KS_NODE_REPORT_PASSWORD="ks-node-report-password"
8 changes: 8 additions & 0 deletions backend/oko_api/server/src/bin/launch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { makeApp } from "@oko-wallet-api/app";
import { ENV_FILE_NAME, envSchema } from "@oko-wallet-api/envs";
import { getCommitHash } from "@oko-wallet-api/git";
import { startKSNodeHealthCheckRuntime } from "@oko-wallet-api/runtime/health_check_node";
import { startKSNodeHeartbeatRuntime } from "@oko-wallet-api/runtime/ks_node_monitor";

async function main() {
console.log("NODE_ENV: %s", process.env.NODE_ENV);
Expand Down Expand Up @@ -59,6 +60,8 @@ async function main() {
encryption_secret: envs.ENCRYPTION_SECRET!,
typeform_webhook_secret: envs.TYPEFORM_WEBHOOK_SECRET!,
telegram_bot_token: envs.TELEGRAM_BOT_TOKEN!,
slack_webhook_url: envs.SLACK_WEBHOOK_URL ?? null,
ks_node_report_password: envs.KS_NODE_REPORT_PASSWORD!,
});

state.logger.info("Running database migrations...");
Expand Down Expand Up @@ -90,6 +93,11 @@ async function main() {
intervalSeconds: 10 * 60, // 10 minutes
});

startKSNodeHeartbeatRuntime(state.db, state.logger, {
intervalSeconds: 60, // 1 minute
slackWebhookUrl: state.slack_webhook_url,
});

startInactiveCustomerUserReminderRuntime(state.db, state.logger, {
intervalSeconds: 60 * 60, // 1 hour
timeUntilInactiveMs: 7 * 24 * 60 * 60 * 1000, // 7 days in milliseconds
Expand Down
3 changes: 3 additions & 0 deletions backend/oko_api/server/src/envs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export const envSchema = z.object({
TYPEFORM_WEBHOOK_SECRET: z.string(),

TELEGRAM_BOT_TOKEN: z.string(),

SLACK_WEBHOOK_URL: z.string().optional(),
KS_NODE_REPORT_PASSWORD: z.string(),
});
68 changes: 68 additions & 0 deletions backend/oko_api/server/src/runtime/ks_node_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { Pool } from "pg";
import type { Logger } from "winston";
import {
getLatestKSNodeTelemetries,
getKSNodeByPublicKey,
} from "@oko-wallet/oko-pg-interface/ks_nodes";
import { sendSlackAlert } from "@oko-wallet/tss-api";
import dayjs from "dayjs";

const HEARTBEAT_THRESHOLD_MINUTES = 10;

export function startKSNodeHeartbeatRuntime(
db: Pool,
logger: Logger,
options: { intervalSeconds: number; slackWebhookUrl: string | null },
) {
logger.info("Starting KS Node heartbeat runtime");

const run = async () => {
try {
await checkKSNodeHeartbeats(db, logger, options.slackWebhookUrl);
} catch (err) {
logger.error("KS Node heartbeat runtime error: %s", err);
}
};

run().then();
setInterval(run, options.intervalSeconds * 1000);
}

async function checkKSNodeHeartbeats(
db: Pool,
logger: Logger,
slackWebhookUrl: string | null,
) {
const latestTelemetriesRes = await getLatestKSNodeTelemetries(db);
if (!latestTelemetriesRes.success) {
logger.error(
"Failed to get latest KS node telemetries: %s",
latestTelemetriesRes.err,
);
return;
}

const now = dayjs();
const threshold = now.subtract(HEARTBEAT_THRESHOLD_MINUTES, "minute");

for (const telemetry of latestTelemetriesRes.data) {
const lastUpdate = dayjs(telemetry.created_at);

if (lastUpdate.isBefore(threshold)) {
// Node is unresponsive
const publicKey = telemetry.public_key;

// Get node name
const nodeRes = await getKSNodeByPublicKey(db, publicKey);
const nodeName =
nodeRes.success && nodeRes.data
? `${nodeRes.data.node_name} (${publicKey})`
: publicKey;

await sendSlackAlert(
`[KS Node Alert] Node ${nodeName} has not reported telemetry for over ${HEARTBEAT_THRESHOLD_MINUTES} minutes. Last seen: ${lastUpdate.toISOString()}`,
slackWebhookUrl,
);
}
}
}
4 changes: 4 additions & 0 deletions backend/oko_api_server_state/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ export interface ServerState {
encryption_secret: string;
typeform_webhook_secret: string;
telegram_bot_token: string;
slack_webhook_url: string | null;
ks_node_report_password: string;
server_keypair: EddsaKeypair;
}

Expand Down Expand Up @@ -131,6 +133,8 @@ export interface InitStateArgs {
encryption_secret: string;
typeform_webhook_secret: string;
telegram_bot_token: string;
slack_webhook_url: string | null;
ks_node_report_password: string;
}

async function initializeServerKeypair(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Knex } from "knex";

export async function up(knex: Knex): Promise<void> {
await knex.schema
.withSchema("public")
.alterTable("key_share_nodes", (table) => {
table.string("public_key", 255).unique();
});

await knex.schema
.withSchema("public")
.createTable("ks_node_telemetry", (table) => {
table
.uuid("log_id")
.notNullable()
.defaultTo(knex.raw("gen_random_uuid()"))
.primary({ constraintName: "ks_node_telemetry_pkey" });
table.string("public_key", 255).notNullable();
table.integer("key_share_count").notNullable();
table.jsonb("payload").notNullable();
table
.timestamp("created_at", { useTz: true })
.notNullable()
.defaultTo(knex.fn.now());

table.index(["public_key"], "idx_ks_node_telemetry_public_key");
table.index(["created_at"], "idx_ks_node_telemetry_created_at");
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.withSchema("public").dropTableIfExists("ks_node_telemetry");

await knex.schema
.withSchema("public")
.alterTable("key_share_nodes", (table) => {
table.dropColumn("public_key");
});
}
121 changes: 121 additions & 0 deletions backend/oko_pg_interface/src/ks_nodes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,130 @@ import {
type WalletKSNodeWithNodeNameAndServerUrl,
type WalletKSNodeStatus,
type KSNodeHealthCheck,
type KSNodeTelemetry,
} from "@oko-wallet/oko-types/tss";
import type { WithPagination, WithTime } from "@oko-wallet-types/aux_types";

export async function insertKSNodeTelemetry(
db: Pool | PoolClient,
public_key: string,
key_share_count: number,
payload: any,
): Promise<Result<void, string>> {
const query = `
INSERT INTO ks_node_telemetry (
log_id, public_key, key_share_count,
payload
)
VALUES (
$1, $2, $3,
$4
)
`;

try {
await db.query(query, [uuidv4(), public_key, key_share_count, payload]);

return {
success: true,
data: void 0,
};
} catch (error) {
return {
success: false,
err: String(error),
};
}
}

export async function getLastKSNodeTelemetry(
db: Pool | PoolClient,
public_key: string,
): Promise<Result<KSNodeTelemetry | null, string>> {
const query = `
SELECT *
FROM ks_node_telemetry
WHERE public_key = $1
ORDER BY created_at DESC
LIMIT 1
`;

try {
const result = await db.query(query, [public_key]);
const row = result.rows[0];

if (!row) {
return { success: true, data: null };
}

return {
success: true,
data: {
log_id: row.log_id,
public_key: row.public_key,
key_share_count: row.key_share_count,
payload: row.payload,
created_at: row.created_at,
},
};
} catch (error) {
return {
success: false,
err: String(error),
};
}
}

export async function getLatestKSNodeTelemetries(
db: Pool | PoolClient,
): Promise<Result<{ public_key: string; created_at: Date }[], string>> {
const query = `
SELECT DISTINCT ON (public_key) public_key, created_at
FROM ks_node_telemetry
ORDER BY public_key, created_at DESC
`;

try {
const result = await db.query(query);
return {
success: true,
data: result.rows.map((row) => ({
public_key: row.public_key,
created_at: row.created_at,
})),
};
} catch (error) {
return {
success: false,
err: String(error),
};
}
}

export async function getKSNodeByPublicKey(
db: Pool | PoolClient,
public_key: string,
): Promise<Result<KeyShareNode | null, string>> {
const query = `
SELECT *
FROM key_share_nodes
WHERE public_key = $1 AND deleted_at IS NULL
`;

try {
const result = await db.query<KeyShareNode>(query, [public_key]);
return {
success: true,
data: result.rows[0] || null,
};
} catch (error) {
return {
success: false,
err: String(error),
};
}
}

export async function getKSNodeById(
db: Pool | PoolClient,
nodeId: string,
Expand Down
1 change: 1 addition & 0 deletions backend/openapi/src/tss/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from "./sign";
export * from "./tss_session";
export * from "./triples";
export * from "./user";
export * from "./ks_node";
30 changes: 30 additions & 0 deletions backend/openapi/src/tss/ks_node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { z } from "zod";

import { registry } from "../registry";

export const KSNodeTelemetryRequestSchema = registry.register(
"TssKSNodeTelemetryRequest",
z.object({
public_key: z.string().openapi({
description:
"Unique identifier for the key share node telemetry (public key)",
}),
key_share_count: z.number().openapi({
description: "Current number of key shares stored in the node",
}),
payload: z.object({}).loose().openapi({
description:
"Additional telemetry data (e.g., db status, error messages)",
}),
}),
);

export const KSNodeTelemetryResponseSchema = registry.register(
"TssKSNodeTelemetryResponse",
z.object({
success: z.boolean().openapi({
description: "Indicates if the telemetry was successfully processed",
}),
data: z.null().optional(),
}),
);
Loading