diff --git a/.changeset/large-zoos-approve.md b/.changeset/large-zoos-approve.md new file mode 100644 index 00000000..912b9c8b --- /dev/null +++ b/.changeset/large-zoos-approve.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +add an optional cache for the durable queue diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index 6322ba96..56a3759b 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -2,6 +2,7 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare"; import kvIncrementalCache from "@opennextjs/cloudflare/overrides/incremental-cache/kv-incremental-cache"; import shardedTagCache from "@opennextjs/cloudflare/overrides/tag-cache/do-sharded-tag-cache"; import doQueue from "@opennextjs/cloudflare/overrides/queue/do-queue"; +import queueCache from "@opennextjs/cloudflare/overrides/queue/queue-cache"; export default defineCloudflareConfig({ incrementalCache: kvIncrementalCache, @@ -14,5 +15,5 @@ export default defineCloudflareConfig({ numberOfHardReplicas: 2, }, }), - queue: doQueue, + queue: queueCache(doQueue), }); diff --git a/packages/cloudflare/src/api/overrides/queue/do-queue.ts b/packages/cloudflare/src/api/overrides/queue/do-queue.ts index 6f5adf75..4a124c16 100644 --- a/packages/cloudflare/src/api/overrides/queue/do-queue.ts +++ b/packages/cloudflare/src/api/overrides/queue/do-queue.ts @@ -4,7 +4,7 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js"; import { getCloudflareContext } from "../../cloudflare-context"; export default { - name: "do-queue", + name: "durable-queue", send: async (msg: QueueMessage) => { const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE; if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); diff --git a/packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts b/packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts new file mode 100644 index 00000000..dc839000 --- /dev/null +++ b/packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts @@ -0,0 +1,112 @@ +import type { Queue } from "@opennextjs/aws/types/overrides"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +import queueCache from "./queue-cache"; + +const mockedQueue = { + name: "mocked-queue", + send: vi.fn(), +} satisfies Queue; + +const generateMessage = () => ({ + MessageGroupId: "test", + MessageBody: { + eTag: "test", + url: "test", + host: "test", + lastModified: Date.now(), + }, + MessageDeduplicationId: "test", +}); + +const mockedPut = vi.fn(); +const mockedMatch = vi.fn().mockReturnValue(null); + +describe("queue-cache", () => { + beforeEach(() => { + // @ts-ignore + globalThis.caches = { + open: vi.fn().mockReturnValue({ + put: mockedPut, + match: mockedMatch, + }), + }; + }); + + afterEach(() => { + vi.resetAllMocks(); + }); + test("should send the message to the original queue", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + expect(queue.name).toBe("cached-mocked-queue"); + await queue.send(msg); + expect(mockedQueue.send).toHaveBeenCalledWith(msg); + }); + + test("should use the local cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + expect(mockedPut).toHaveBeenCalled(); + + const spiedHas = vi.spyOn(queue.localCache, "has"); + await queue.send(msg); + expect(spiedHas).toHaveBeenCalled(); + + expect(mockedQueue.send).toHaveBeenCalledTimes(1); + + expect(mockedMatch).toHaveBeenCalledTimes(1); + }); + + test("should clear the local cache after 5s", async () => { + vi.useFakeTimers(); + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + + vi.advanceTimersByTime(5001); + const alteredMsg = generateMessage(); + alteredMsg.MessageGroupId = "test2"; + await queue.send(alteredMsg); + expect(queue.localCache.size).toBe(1); + console.log(queue.localCache); + expect(queue.localCache.has(`queue/test2/test`)).toBe(true); + expect(queue.localCache.has(`queue/test/test`)).toBe(false); + vi.useRealTimers(); + }); + + test("should use the regional cache if not in local cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + + expect(mockedMatch).toHaveBeenCalledTimes(1); + expect(mockedPut).toHaveBeenCalledTimes(1); + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + // We need to delete the local cache to test the regional cache + queue.localCache.delete(`queue/test/test`); + + const spiedHas = vi.spyOn(queue.localCache, "has"); + await queue.send(msg); + expect(spiedHas).toHaveBeenCalled(); + expect(mockedMatch).toHaveBeenCalledTimes(2); + }); + + test("should return early if the message is in the regional cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + + mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 })); + + const spiedSend = mockedQueue.send; + await queue.send(msg); + expect(spiedSend).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cloudflare/src/api/overrides/queue/queue-cache.ts b/packages/cloudflare/src/api/overrides/queue/queue-cache.ts new file mode 100644 index 00000000..b4f4c7a6 --- /dev/null +++ b/packages/cloudflare/src/api/overrides/queue/queue-cache.ts @@ -0,0 +1,114 @@ +import { error } from "@opennextjs/aws/adapters/logger.js"; +import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; + +interface QueueCachingOptions { + /** + * The TTL for the regional cache in seconds. + * @default 5 + */ + regionalCacheTtlSec?: number; + + /** + * Whether to wait for the queue ack before returning. + * When set to false, the cache will be populated asap and the queue will be called after. + * When set to true, the cache will be populated only after the queue ack is received. + * @default false + */ + waitForQueueAck?: boolean; +} + +const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; + +class QueueCache implements Queue { + readonly name; + readonly regionalCacheTtlSec: number; + readonly waitForQueueAck: boolean; + cache: Cache | undefined; + localCache: Map<string, number> = new Map(); + + constructor( + private originalQueue: Queue, + options: QueueCachingOptions + ) { + this.name = `cached-${originalQueue.name}`; + this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC; + this.waitForQueueAck = options.waitForQueueAck ?? false; + } + + async send(msg: QueueMessage) { + try { + const isCached = await this.isInCache(msg); + if (isCached) { + return; + } + if (!this.waitForQueueAck) { + await this.putToCache(msg); + } + + await this.originalQueue.send(msg); + if (this.waitForQueueAck) { + await this.putToCache(msg); + } + } catch (e) { + error("Error sending message to queue", e); + } finally { + this.clearLocalCache(); + } + } + + private async getCache() { + if (!this.cache) { + this.cache = await caches.open("durable-queue"); + } + return this.cache; + } + + private getCacheUrlString(msg: QueueMessage) { + return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`; + } + + private getCacheKey(msg: QueueMessage) { + return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache")); + } + + private async putToCache(msg: QueueMessage) { + this.localCache.set(this.getCacheUrlString(msg), Date.now()); + const cacheKey = this.getCacheKey(msg); + const cache = await this.getCache(); + await cache.put( + cacheKey, + new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${this.regionalCacheTtlSec}` } }) + ); + } + + private async isInCache(msg: QueueMessage) { + if (this.localCache.has(this.getCacheUrlString(msg))) { + const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!; + if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) { + return true; + } + this.localCache.delete(this.getCacheUrlString(msg)); + return false; + } + const cacheKey = this.getCacheKey(msg); + const cache = await this.getCache(); + const cachedResponse = await cache.match(cacheKey); + if (cachedResponse) { + return true; + } + } + + /** + * Remove any value older than the TTL from the local cache + */ + private clearLocalCache() { + const now = Date.now(); + for (const [key, value] of this.localCache.entries()) { + if (now - value > this.regionalCacheTtlSec * 1000) { + this.localCache.delete(key); + } + } + } +} + +export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);