Skip to content

Feat cache for the queue #496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/large-zoos-approve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

add an optional cache for the durable queue
3 changes: 2 additions & 1 deletion examples/e2e/app-router/open-next.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare";
import kvIncrementalCache from "@opennextjs/cloudflare/overrides/incremental-cache/kv-incremental-cache";
import shardedTagCache from "@opennextjs/cloudflare/overrides/tag-cache/do-sharded-tag-cache";
import doQueue from "@opennextjs/cloudflare/overrides/queue/do-queue";
import queueCache from "@opennextjs/cloudflare/overrides/queue/queue-cache";

export default defineCloudflareConfig({
incrementalCache: kvIncrementalCache,
Expand All @@ -14,5 +15,5 @@ export default defineCloudflareConfig({
numberOfHardReplicas: 2,
},
}),
queue: doQueue,
queue: queueCache(doQueue),
});
2 changes: 1 addition & 1 deletion packages/cloudflare/src/api/overrides/queue/do-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js";
import { getCloudflareContext } from "../../cloudflare-context";

export default {
name: "do-queue",
name: "durable-queue",
send: async (msg: QueueMessage) => {
const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE;
if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation");
Expand Down
112 changes: 112 additions & 0 deletions packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import type { Queue } from "@opennextjs/aws/types/overrides";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";

import queueCache from "./queue-cache";

const mockedQueue = {
name: "mocked-queue",
send: vi.fn(),
} satisfies Queue;

const generateMessage = () => ({
MessageGroupId: "test",
MessageBody: {
eTag: "test",
url: "test",
host: "test",
lastModified: Date.now(),
},
MessageDeduplicationId: "test",
});

const mockedPut = vi.fn();
const mockedMatch = vi.fn().mockReturnValue(null);

describe("queue-cache", () => {
beforeEach(() => {
// @ts-ignore
globalThis.caches = {
open: vi.fn().mockReturnValue({
put: mockedPut,
match: mockedMatch,
}),
};
});

afterEach(() => {
vi.resetAllMocks();
});
test("should send the message to the original queue", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
expect(queue.name).toBe("cached-mocked-queue");
await queue.send(msg);
expect(mockedQueue.send).toHaveBeenCalledWith(msg);
});

test("should use the local cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);

expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
expect(mockedPut).toHaveBeenCalled();

const spiedHas = vi.spyOn(queue.localCache, "has");
await queue.send(msg);
expect(spiedHas).toHaveBeenCalled();

expect(mockedQueue.send).toHaveBeenCalledTimes(1);

expect(mockedMatch).toHaveBeenCalledTimes(1);
});

test("should clear the local cache after 5s", async () => {
vi.useFakeTimers();
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);
expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);

vi.advanceTimersByTime(5001);
const alteredMsg = generateMessage();
alteredMsg.MessageGroupId = "test2";
await queue.send(alteredMsg);
expect(queue.localCache.size).toBe(1);
console.log(queue.localCache);
expect(queue.localCache.has(`queue/test2/test`)).toBe(true);
expect(queue.localCache.has(`queue/test/test`)).toBe(false);
vi.useRealTimers();
});

test("should use the regional cache if not in local cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});
await queue.send(msg);

expect(mockedMatch).toHaveBeenCalledTimes(1);
expect(mockedPut).toHaveBeenCalledTimes(1);
expect(queue.localCache.size).toBe(1);
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
// We need to delete the local cache to test the regional cache
queue.localCache.delete(`queue/test/test`);

const spiedHas = vi.spyOn(queue.localCache, "has");
await queue.send(msg);
expect(spiedHas).toHaveBeenCalled();
expect(mockedMatch).toHaveBeenCalledTimes(2);
});

test("should return early if the message is in the regional cache", async () => {
const msg = generateMessage();
const queue = queueCache(mockedQueue, {});

mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 }));

const spiedSend = mockedQueue.send;
await queue.send(msg);
expect(spiedSend).not.toHaveBeenCalled();
});
});
114 changes: 114 additions & 0 deletions packages/cloudflare/src/api/overrides/queue/queue-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { error } from "@opennextjs/aws/adapters/logger.js";
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";

interface QueueCachingOptions {
/**
* The TTL for the regional cache in seconds.
* @default 5
*/
regionalCacheTtlSec?: number;

/**
* Whether to wait for the queue ack before returning.
* When set to false, the cache will be populated asap and the queue will be called after.
* When set to true, the cache will be populated only after the queue ack is received.
* @default false
*/
waitForQueueAck?: boolean;
}

const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;

class QueueCache implements Queue {
readonly name;
readonly regionalCacheTtlSec: number;
readonly waitForQueueAck: boolean;
cache: Cache | undefined;
localCache: Map<string, number> = new Map();

constructor(
private originalQueue: Queue,
options: QueueCachingOptions
) {
this.name = `cached-${originalQueue.name}`;
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
this.waitForQueueAck = options.waitForQueueAck ?? false;
}

async send(msg: QueueMessage) {
try {
const isCached = await this.isInCache(msg);
if (isCached) {
return;
}
if (!this.waitForQueueAck) {
await this.putToCache(msg);
}

await this.originalQueue.send(msg);
if (this.waitForQueueAck) {
await this.putToCache(msg);
}
} catch (e) {
error("Error sending message to queue", e);
} finally {
this.clearLocalCache();
}
}

private async getCache() {
if (!this.cache) {
this.cache = await caches.open("durable-queue");
}
return this.cache;
}

private getCacheUrlString(msg: QueueMessage) {
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`;
}

private getCacheKey(msg: QueueMessage) {
return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache"));
}

private async putToCache(msg: QueueMessage) {
this.localCache.set(this.getCacheUrlString(msg), Date.now());
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
await cache.put(
cacheKey,
new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${this.regionalCacheTtlSec}` } })
);
}

private async isInCache(msg: QueueMessage) {
if (this.localCache.has(this.getCacheUrlString(msg))) {
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!;
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) {
return true;
}
this.localCache.delete(this.getCacheUrlString(msg));
return false;
}
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
const cachedResponse = await cache.match(cacheKey);
if (cachedResponse) {
return true;
}
}

/**
* Remove any value older than the TTL from the local cache
*/
private clearLocalCache() {
const now = Date.now();
for (const [key, value] of this.localCache.entries()) {
if (now - value > this.regionalCacheTtlSec * 1000) {
this.localCache.delete(key);
}
}
}
}

export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);