Skip to content

Commit abca419

Browse files
committed
feat: Add support for idempotent enqueues
1 parent a96c9b5 commit abca419

11 files changed

+225
-349
lines changed

src/drizzle.config.ts drizzle.config.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type { Config } from "drizzle-kit";
22

33
export default {
4-
schema: "./schema.ts",
5-
out: "./drizzle",
6-
driver: "better-sqlite",
4+
schema: "./src/schema.ts",
5+
out: "./src/drizzle",
6+
dialect: "sqlite",
77
dbCredentials: {
88
url: "data.db",
99
},

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"name": "liteque",
44
"description": "A sqlite-based job queue for Node.js",
55
"author": "Mohamed Bassem <[email protected]>",
6-
"version": "0.1.3",
6+
"version": "0.2.0",
77
"type": "module",
88
"main": "dist/index.js",
99
"types": "dist/index.d.ts",
@@ -32,7 +32,7 @@
3232
"devDependencies": {
3333
"@tsconfig/node21": "^21.0.3",
3434
"@types/better-sqlite3": "^7.6.11",
35-
"drizzle-kit": "^0.20.14",
35+
"drizzle-kit": "^0.24.02",
3636
"typescript": "^5.6.3",
3737
"vitest": "^1.3.1"
3838
},

pnpm-lock.yaml

+11-332
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE `tasks` ADD `idempotencyKey` text;--> statement-breakpoint
2+
CREATE UNIQUE INDEX `tasks_queue_idempotencyKey_unique` ON `tasks` (`queue`,`idempotencyKey`);

src/drizzle/meta/0000_snapshot.json

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
{
2-
"version": "5",
2+
"version": "6",
33
"dialect": "sqlite",
4-
"id": "3094773c-0138-46b2-b617-4b10093b0f53",
5-
"prevId": "00000000-0000-0000-0000-000000000000",
64
"tables": {
75
"tasks": {
86
"name": "tasks",
@@ -123,8 +121,9 @@
123121
},
124122
"enums": {},
125123
"_meta": {
126-
"schemas": {},
127124
"tables": {},
128125
"columns": {}
129-
}
126+
},
127+
"id": "3094773c-0138-46b2-b617-4b10093b0f53",
128+
"prevId": "00000000-0000-0000-0000-000000000000"
130129
}

src/drizzle/meta/0001_snapshot.json

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
{
2+
"version": "6",
3+
"dialect": "sqlite",
4+
"id": "0f918c72-5b27-4e4c-9027-631ead290bf2",
5+
"prevId": "3094773c-0138-46b2-b617-4b10093b0f53",
6+
"tables": {
7+
"tasks": {
8+
"name": "tasks",
9+
"columns": {
10+
"id": {
11+
"name": "id",
12+
"type": "integer",
13+
"primaryKey": true,
14+
"notNull": true,
15+
"autoincrement": true
16+
},
17+
"queue": {
18+
"name": "queue",
19+
"type": "text",
20+
"primaryKey": false,
21+
"notNull": true,
22+
"autoincrement": false
23+
},
24+
"payload": {
25+
"name": "payload",
26+
"type": "text",
27+
"primaryKey": false,
28+
"notNull": true,
29+
"autoincrement": false
30+
},
31+
"createdAt": {
32+
"name": "createdAt",
33+
"type": "integer",
34+
"primaryKey": false,
35+
"notNull": true,
36+
"autoincrement": false
37+
},
38+
"status": {
39+
"name": "status",
40+
"type": "text",
41+
"primaryKey": false,
42+
"notNull": true,
43+
"autoincrement": false,
44+
"default": "'pending'"
45+
},
46+
"expireAt": {
47+
"name": "expireAt",
48+
"type": "integer",
49+
"primaryKey": false,
50+
"notNull": false,
51+
"autoincrement": false
52+
},
53+
"allocationId": {
54+
"name": "allocationId",
55+
"type": "text",
56+
"primaryKey": false,
57+
"notNull": true,
58+
"autoincrement": false
59+
},
60+
"numRunsLeft": {
61+
"name": "numRunsLeft",
62+
"type": "integer",
63+
"primaryKey": false,
64+
"notNull": true,
65+
"autoincrement": false
66+
},
67+
"maxNumRuns": {
68+
"name": "maxNumRuns",
69+
"type": "integer",
70+
"primaryKey": false,
71+
"notNull": true,
72+
"autoincrement": false
73+
},
74+
"idempotencyKey": {
75+
"name": "idempotencyKey",
76+
"type": "text",
77+
"primaryKey": false,
78+
"notNull": false,
79+
"autoincrement": false
80+
}
81+
},
82+
"indexes": {
83+
"tasks_queue_idx": {
84+
"name": "tasks_queue_idx",
85+
"columns": [
86+
"queue"
87+
],
88+
"isUnique": false
89+
},
90+
"tasks_status_idx": {
91+
"name": "tasks_status_idx",
92+
"columns": [
93+
"status"
94+
],
95+
"isUnique": false
96+
},
97+
"tasks_expire_at_idx": {
98+
"name": "tasks_expire_at_idx",
99+
"columns": [
100+
"expireAt"
101+
],
102+
"isUnique": false
103+
},
104+
"tasks_num_runs_left_idx": {
105+
"name": "tasks_num_runs_left_idx",
106+
"columns": [
107+
"numRunsLeft"
108+
],
109+
"isUnique": false
110+
},
111+
"tasks_max_num_runs_idx": {
112+
"name": "tasks_max_num_runs_idx",
113+
"columns": [
114+
"maxNumRuns"
115+
],
116+
"isUnique": false
117+
},
118+
"tasks_allocation_id_idx": {
119+
"name": "tasks_allocation_id_idx",
120+
"columns": [
121+
"allocationId"
122+
],
123+
"isUnique": false
124+
},
125+
"tasks_queue_idempotencyKey_unique": {
126+
"name": "tasks_queue_idempotencyKey_unique",
127+
"columns": [
128+
"queue",
129+
"idempotencyKey"
130+
],
131+
"isUnique": true
132+
}
133+
},
134+
"foreignKeys": {},
135+
"compositePrimaryKeys": {},
136+
"uniqueConstraints": {}
137+
}
138+
},
139+
"enums": {},
140+
"_meta": {
141+
"schemas": {},
142+
"tables": {},
143+
"columns": {}
144+
},
145+
"internal": {
146+
"indexes": {}
147+
}
148+
}

src/drizzle/meta/_journal.json

+7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@
88
"when": 1720992922192,
99
"tag": "0000_wonderful_talisman",
1010
"breakpoints": true
11+
},
12+
{
13+
"idx": 1,
14+
"version": "6",
15+
"when": 1730656128884,
16+
"tag": "0001_wandering_giant_man",
17+
"breakpoints": true
1118
}
1219
]
1320
}

src/options.ts

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ export interface SqliteQueueOptions {
88
};
99
}
1010

11+
export interface EnqueueOptions {
12+
numRetries?: number;
13+
idempotencyKey?: string;
14+
}
15+
1116
export interface RunnerFuncs<T> {
1217
run: (job: DequeuedJob<T>) => Promise<void>;
1318
onComplete?: (job: DequeuedJob<T>) => Promise<void>;

src/queue.ts

+14-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import assert from "node:assert";
22
import { and, asc, count, eq, gt, lt, or } from "drizzle-orm";
33

44
import { buildDBClient } from "./db";
5-
import { SqliteQueueOptions } from "./options";
5+
import { EnqueueOptions, SqliteQueueOptions } from "./options";
66
import { Job, tasksTable } from "./schema";
77

88
// generate random id
@@ -29,19 +29,27 @@ export class SqliteQueue<T> {
2929
return this.queueName;
3030
}
3131

32-
async enqueue(payload: T): Promise<Job> {
33-
const job = await this.db
32+
/**
33+
* Enqueue a job into the queue.
34+
* If a job with the same idempotency key is already enqueued, it will be ignored and undefined will be returned.
35+
*/
36+
async enqueue(payload: T, options?: EnqueueOptions): Promise<Job | undefined> {
37+
const opts = options ?? {};
38+
const numRetries = opts.numRetries ?? this.options.defaultJobArgs.numRetries;
39+
const [job] = await this.db
3440
.insert(tasksTable)
3541
.values({
3642
queue: this.queueName,
3743
payload: JSON.stringify(payload),
38-
numRunsLeft: this.options.defaultJobArgs.numRetries + 1,
39-
maxNumRuns: this.options.defaultJobArgs.numRetries + 1,
44+
numRunsLeft: numRetries + 1,
45+
maxNumRuns: numRetries + 1,
4046
allocationId: generateAllocationId(),
47+
idempotencyKey: opts.idempotencyKey,
4148
})
49+
.onConflictDoNothing({target: [tasksTable.queue, tasksTable.idempotencyKey]})
4250
.returning();
4351

44-
return job[0];
52+
return job;
4553
}
4654

4755
async stats() {

src/runner.test.ts

+26
Original file line numberDiff line numberDiff line change
@@ -437,4 +437,30 @@ describe("SqiteQueueRunner", () => {
437437
expect(results.numCompleted).toEqual(1000);
438438
expect(results.numFailed).toEqual(0);
439439
});
440+
441+
test("idempotency keys", async () => {
442+
const queue = new SqliteQueue<Work>(
443+
"queue1",
444+
buildDBClient(":memory:", true),
445+
{
446+
defaultJobArgs: {
447+
numRetries: 0,
448+
},
449+
},
450+
);
451+
452+
await queue.enqueue({ increment: 1 });
453+
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
454+
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
455+
await queue.enqueue({ increment: 2 }, { idempotencyKey: "2" });
456+
await queue.enqueue({ increment: 3 }, { idempotencyKey: "3" });
457+
458+
expect(await queue.stats()).toEqual({
459+
pending: 3,
460+
running: 0,
461+
pending_retry: 0,
462+
failed: 0,
463+
});
464+
465+
});
440466
});

src/schema.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { index, integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
1+
import { index, integer, sqliteTable, text, unique } from "drizzle-orm/sqlite-core";
22

33
function createdAtField() {
44
return integer("createdAt", { mode: "timestamp" })
@@ -22,6 +22,7 @@ export const tasksTable = sqliteTable(
2222
allocationId: text("allocationId").notNull(),
2323
numRunsLeft: integer("numRunsLeft").notNull(),
2424
maxNumRuns: integer("maxNumRuns").notNull(),
25+
idempotencyKey: text("idempotencyKey"),
2526
},
2627
(tasks) => ({
2728
queueIdx: index("tasks_queue_idx").on(tasks.queue),
@@ -30,6 +31,7 @@ export const tasksTable = sqliteTable(
3031
numRunsLeftIdx: index("tasks_num_runs_left_idx").on(tasks.numRunsLeft),
3132
maxNumRunsIdx: index("tasks_max_num_runs_idx").on(tasks.maxNumRuns),
3233
allocationIdIdx: index("tasks_allocation_id_idx").on(tasks.allocationId),
34+
idempotencyKeyIdx: unique().on(tasks.queue, tasks.idempotencyKey),
3335
}),
3436
);
3537

0 commit comments

Comments
 (0)