From 2dc337e22f8c863cbd03d101d0828410e5c8c20a Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Mon, 24 Mar 2025 19:54:02 +0100 Subject: [PATCH 1/7] queue cache --- examples/e2e/app-router/open-next.config.ts | 2 +- .../src/api/overrides/queue/do-queue.ts | 66 +++++++++++++++---- 2 files changed, 55 insertions(+), 13 deletions(-) diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index 6322ba96..ad6952a3 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -14,5 +14,5 @@ export default defineCloudflareConfig({ numberOfHardReplicas: 2, }, }), - queue: doQueue, + queue: doQueue({ enableRegionalCache: true, regionalCacheTtlSec: 5 }), }); diff --git a/packages/cloudflare/src/api/overrides/queue/do-queue.ts b/packages/cloudflare/src/api/overrides/queue/do-queue.ts index 6f5adf75..7993b4ed 100644 --- a/packages/cloudflare/src/api/overrides/queue/do-queue.ts +++ b/packages/cloudflare/src/api/overrides/queue/do-queue.ts @@ -3,16 +3,58 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js"; import { getCloudflareContext } from "../../cloudflare-context"; -export default { - name: "do-queue", - send: async (msg: QueueMessage) => { - const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE; - if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); +interface DurableQueueOptions { + /** + * Enables a regional cache for the queue. + * When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds. + * Subsequent similar requests during this period will bypass processing and use the cached result. + * **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature. + * In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds. + * @default false + */ + enableRegionalCache?: boolean; + /** + * The TTL for the regional cache in seconds. + * @default 5 + */ + regionalCacheTtlSec?: number; +} - const id = durableObject.idFromName(msg.MessageGroupId); - const stub = durableObject.get(id); - await stub.revalidate({ - ...msg, - }); - }, -} satisfies Queue; +const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; + +function getCacheKey(msg: QueueMessage) { + return new Request( + new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache") + ); +} + +export default ({enableRegionalCache, regionalCacheTtlSec}: DurableQueueOptions = {}) => { + return { + name: "durable-queue", + send: async (msg: QueueMessage) => { + const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE; + if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); + + if(enableRegionalCache) { + const cacheKey = getCacheKey(msg); + const cache = await caches.open("durable-queue"); + const cachedResponse = await cache.match(cacheKey); + if(cachedResponse) { + return; + } + + // Here we cache the first request to the queue for `regionalCacheTtlSec` seconds + // We want to do it as soon as possible so that subsequent requests can use the cached response + // TODO: Do we really want to cache this before sending the message to the queue? It could be an option to cache it after the message is sent + await cache.put(cacheKey, new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC}` } })); + + } + + const id = durableObject.idFromName(msg.MessageGroupId); + const stub = durableObject.get(id); + await stub.revalidate({ + ...msg, + }); + }, + } satisfies Queue; +} From cd4a0038e6f4a4e5ed8d4ce00b32e88b68f8e224 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Tue, 25 Mar 2025 13:13:48 +0100 Subject: [PATCH 2/7] moved cache out of the durable queue --- .../src/api/overrides/queue/do-queue.ts | 66 +++---------- packages/cloudflare/src/api/queue-cache.ts | 93 +++++++++++++++++++ 2 files changed, 105 insertions(+), 54 deletions(-) create mode 100644 packages/cloudflare/src/api/queue-cache.ts diff --git a/packages/cloudflare/src/api/overrides/queue/do-queue.ts b/packages/cloudflare/src/api/overrides/queue/do-queue.ts index 7993b4ed..4a124c16 100644 --- a/packages/cloudflare/src/api/overrides/queue/do-queue.ts +++ b/packages/cloudflare/src/api/overrides/queue/do-queue.ts @@ -3,58 +3,16 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js"; import { getCloudflareContext } from "../../cloudflare-context"; -interface DurableQueueOptions { - /** - * Enables a regional cache for the queue. - * When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds. - * Subsequent similar requests during this period will bypass processing and use the cached result. - * **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature. - * In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds. - * @default false - */ - enableRegionalCache?: boolean; - /** - * The TTL for the regional cache in seconds. - * @default 5 - */ - regionalCacheTtlSec?: number; -} +export default { + name: "durable-queue", + send: async (msg: QueueMessage) => { + const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE; + if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); -const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; - -function getCacheKey(msg: QueueMessage) { - return new Request( - new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache") - ); -} - -export default ({enableRegionalCache, regionalCacheTtlSec}: DurableQueueOptions = {}) => { - return { - name: "durable-queue", - send: async (msg: QueueMessage) => { - const durableObject = getCloudflareContext().env.NEXT_CACHE_DO_QUEUE; - if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); - - if(enableRegionalCache) { - const cacheKey = getCacheKey(msg); - const cache = await caches.open("durable-queue"); - const cachedResponse = await cache.match(cacheKey); - if(cachedResponse) { - return; - } - - // Here we cache the first request to the queue for `regionalCacheTtlSec` seconds - // We want to do it as soon as possible so that subsequent requests can use the cached response - // TODO: Do we really want to cache this before sending the message to the queue? It could be an option to cache it after the message is sent - await cache.put(cacheKey, new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC}` } })); - - } - - const id = durableObject.idFromName(msg.MessageGroupId); - const stub = durableObject.get(id); - await stub.revalidate({ - ...msg, - }); - }, - } satisfies Queue; -} + const id = durableObject.idFromName(msg.MessageGroupId); + const stub = durableObject.get(id); + await stub.revalidate({ + ...msg, + }); + }, +} satisfies Queue; diff --git a/packages/cloudflare/src/api/queue-cache.ts b/packages/cloudflare/src/api/queue-cache.ts new file mode 100644 index 00000000..d9b4864e --- /dev/null +++ b/packages/cloudflare/src/api/queue-cache.ts @@ -0,0 +1,93 @@ +import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; + +interface QueueCachingOptions { + /** + * Enables a regional cache for the queue. + * When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds. + * Subsequent similar requests during this period will bypass processing and use the cached result. + * **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature. + * In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds. + * @default false + */ + enableRegionalCache?: boolean; + /** + * The TTL for the regional cache in seconds. + * @default 5 + */ + regionalCacheTtlSec?: number; + + /** + * Whether to wait for the queue ack before returning. + * When set to false, the cache will be populated asap and the queue will be called after. + * When set to true, the cache will be populated only after the queue ack is received. + * @default false + */ + waitForQueueAck?: boolean; +} + +const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; + +class QueueCache implements Queue { + readonly name; + readonly enableRegionalCache: boolean; + readonly regionalCacheTtlSec: number; + readonly waitForQueueAck: boolean; + cache: Cache | undefined; + + constructor( + private originalQueue: Queue, + options: QueueCachingOptions + ) { + this.name = `cached-${originalQueue.name}`; + this.enableRegionalCache = options.enableRegionalCache ?? false; + this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC; + this.waitForQueueAck = options.waitForQueueAck ?? false; + } + + async send(msg: QueueMessage) { + if (this.enableRegionalCache) { + const isCached = await this.isInCache(msg); + if (isCached) { + return; + } + if (!this.waitForQueueAck) { + await this.putToCache(msg); + } + } + + await this.originalQueue.send(msg); + if (this.waitForQueueAck) { + await this.putToCache(msg); + } + } + + private async getCache() { + if (!this.cache) { + this.cache = await caches.open("durable-queue"); + } + return this.cache; + } + + private getCacheKey(msg: QueueMessage) { + return new Request( + new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache") + ); + } + + private async putToCache(msg: QueueMessage) { + const cacheKey = this.getCacheKey(msg); + const cache = await this.getCache(); + await cache.put( + cacheKey, + new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${this.regionalCacheTtlSec}` } }) + ); + } + + private async isInCache(msg: QueueMessage) { + const cacheKey = this.getCacheKey(msg); + const cache = await this.getCache(); + return await cache.match(cacheKey); + } +} + +export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts); From 9fce8529c839cd18112dd65e2ab3e06d91ad232c Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Tue, 25 Mar 2025 13:57:08 +0100 Subject: [PATCH 3/7] local queue cache --- packages/cloudflare/src/api/queue-cache.ts | 54 ++++++++++++++-------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/packages/cloudflare/src/api/queue-cache.ts b/packages/cloudflare/src/api/queue-cache.ts index d9b4864e..ae9a68f3 100644 --- a/packages/cloudflare/src/api/queue-cache.ts +++ b/packages/cloudflare/src/api/queue-cache.ts @@ -1,15 +1,7 @@ +import { error } from "@opennextjs/aws/adapters/logger.js"; import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; interface QueueCachingOptions { - /** - * Enables a regional cache for the queue. - * When enabled, the first request to the queue is cached for `regionalCacheTtlSec` seconds. - * Subsequent similar requests during this period will bypass processing and use the cached result. - * **Note:** Ensure the `MAX_REVALIDATE_CONCURRENCY` environment variable is appropriately increased before enabling this feature. - * In case of an error, cache revalidation may be delayed by up to `regionalCacheTtlSec` seconds. - * @default false - */ - enableRegionalCache?: boolean; /** * The TTL for the regional cache in seconds. * @default 5 @@ -29,23 +21,22 @@ const DEFAULT_QUEUE_CACHE_TTL_SEC = 5; class QueueCache implements Queue { readonly name; - readonly enableRegionalCache: boolean; readonly regionalCacheTtlSec: number; readonly waitForQueueAck: boolean; cache: Cache | undefined; + localCache: Map<string, number> = new Map(); constructor( private originalQueue: Queue, options: QueueCachingOptions ) { this.name = `cached-${originalQueue.name}`; - this.enableRegionalCache = options.enableRegionalCache ?? false; this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC; this.waitForQueueAck = options.waitForQueueAck ?? false; } async send(msg: QueueMessage) { - if (this.enableRegionalCache) { + try { const isCached = await this.isInCache(msg); if (isCached) { return; @@ -53,11 +44,13 @@ class QueueCache implements Queue { if (!this.waitForQueueAck) { await this.putToCache(msg); } - } - await this.originalQueue.send(msg); - if (this.waitForQueueAck) { - await this.putToCache(msg); + await this.originalQueue.send(msg); + if (this.waitForQueueAck) { + await this.putToCache(msg); + } + } catch (e) { + error("Error sending message to queue", e); } } @@ -68,13 +61,16 @@ class QueueCache implements Queue { return this.cache; } + private getCacheUrlString(msg: QueueMessage) { + return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`; + } + private getCacheKey(msg: QueueMessage) { - return new Request( - new URL(`queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`, "http://local.cache") - ); + return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache")); } private async putToCache(msg: QueueMessage) { + this.localCache.set(this.getCacheUrlString(msg), Date.now()); const cacheKey = this.getCacheKey(msg); const cache = await this.getCache(); await cache.put( @@ -84,9 +80,27 @@ class QueueCache implements Queue { } private async isInCache(msg: QueueMessage) { + if (this.localCache.has(this.getCacheUrlString(msg))) { + return true; + } const cacheKey = this.getCacheKey(msg); const cache = await this.getCache(); - return await cache.match(cacheKey); + const cachedResponse = await cache.match(cacheKey); + if (cachedResponse) { + return true; + } + } + + /** + * Remove any value older than the TTL from the local cache + */ + private clearLocalCache() { + const now = Date.now(); + for (const [key, value] of this.localCache.entries()) { + if (now - value > this.regionalCacheTtlSec * 1000) { + this.localCache.delete(key); + } + } } } From 3a4d04cf39562f0869b1bf5b41addcafae3c1e95 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Tue, 25 Mar 2025 14:42:57 +0100 Subject: [PATCH 4/7] add local in memory cache --- .../cloudflare/src/api/queue-cache.spec.ts | 112 ++++++++++++++++++ packages/cloudflare/src/api/queue-cache.ts | 9 +- 2 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 packages/cloudflare/src/api/queue-cache.spec.ts diff --git a/packages/cloudflare/src/api/queue-cache.spec.ts b/packages/cloudflare/src/api/queue-cache.spec.ts new file mode 100644 index 00000000..dc839000 --- /dev/null +++ b/packages/cloudflare/src/api/queue-cache.spec.ts @@ -0,0 +1,112 @@ +import type { Queue } from "@opennextjs/aws/types/overrides"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; + +import queueCache from "./queue-cache"; + +const mockedQueue = { + name: "mocked-queue", + send: vi.fn(), +} satisfies Queue; + +const generateMessage = () => ({ + MessageGroupId: "test", + MessageBody: { + eTag: "test", + url: "test", + host: "test", + lastModified: Date.now(), + }, + MessageDeduplicationId: "test", +}); + +const mockedPut = vi.fn(); +const mockedMatch = vi.fn().mockReturnValue(null); + +describe("queue-cache", () => { + beforeEach(() => { + // @ts-ignore + globalThis.caches = { + open: vi.fn().mockReturnValue({ + put: mockedPut, + match: mockedMatch, + }), + }; + }); + + afterEach(() => { + vi.resetAllMocks(); + }); + test("should send the message to the original queue", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + expect(queue.name).toBe("cached-mocked-queue"); + await queue.send(msg); + expect(mockedQueue.send).toHaveBeenCalledWith(msg); + }); + + test("should use the local cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + expect(mockedPut).toHaveBeenCalled(); + + const spiedHas = vi.spyOn(queue.localCache, "has"); + await queue.send(msg); + expect(spiedHas).toHaveBeenCalled(); + + expect(mockedQueue.send).toHaveBeenCalledTimes(1); + + expect(mockedMatch).toHaveBeenCalledTimes(1); + }); + + test("should clear the local cache after 5s", async () => { + vi.useFakeTimers(); + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + + vi.advanceTimersByTime(5001); + const alteredMsg = generateMessage(); + alteredMsg.MessageGroupId = "test2"; + await queue.send(alteredMsg); + expect(queue.localCache.size).toBe(1); + console.log(queue.localCache); + expect(queue.localCache.has(`queue/test2/test`)).toBe(true); + expect(queue.localCache.has(`queue/test/test`)).toBe(false); + vi.useRealTimers(); + }); + + test("should use the regional cache if not in local cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + await queue.send(msg); + + expect(mockedMatch).toHaveBeenCalledTimes(1); + expect(mockedPut).toHaveBeenCalledTimes(1); + expect(queue.localCache.size).toBe(1); + expect(queue.localCache.has(`queue/test/test`)).toBe(true); + // We need to delete the local cache to test the regional cache + queue.localCache.delete(`queue/test/test`); + + const spiedHas = vi.spyOn(queue.localCache, "has"); + await queue.send(msg); + expect(spiedHas).toHaveBeenCalled(); + expect(mockedMatch).toHaveBeenCalledTimes(2); + }); + + test("should return early if the message is in the regional cache", async () => { + const msg = generateMessage(); + const queue = queueCache(mockedQueue, {}); + + mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 })); + + const spiedSend = mockedQueue.send; + await queue.send(msg); + expect(spiedSend).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cloudflare/src/api/queue-cache.ts b/packages/cloudflare/src/api/queue-cache.ts index ae9a68f3..b4f4c7a6 100644 --- a/packages/cloudflare/src/api/queue-cache.ts +++ b/packages/cloudflare/src/api/queue-cache.ts @@ -51,6 +51,8 @@ class QueueCache implements Queue { } } catch (e) { error("Error sending message to queue", e); + } finally { + this.clearLocalCache(); } } @@ -81,7 +83,12 @@ class QueueCache implements Queue { private async isInCache(msg: QueueMessage) { if (this.localCache.has(this.getCacheUrlString(msg))) { - return true; + const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!; + if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) { + return true; + } + this.localCache.delete(this.getCacheUrlString(msg)); + return false; } const cacheKey = this.getCacheKey(msg); const cache = await this.getCache(); From 3f0bfd1a04ec6179fe2fa74d97bb679c3f41a4d2 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Tue, 25 Mar 2025 14:49:18 +0100 Subject: [PATCH 5/7] fix rebase --- examples/e2e/app-router/open-next.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index ad6952a3..d67e3769 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -14,5 +14,5 @@ export default defineCloudflareConfig({ numberOfHardReplicas: 2, }, }), - queue: doQueue({ enableRegionalCache: true, regionalCacheTtlSec: 5 }), + queue: queueCache(doQueue), }); From 209a47f7b0587698d90cb2ea6fc5c1c6cc55afca Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Tue, 25 Mar 2025 17:44:40 +0100 Subject: [PATCH 6/7] changeset --- .changeset/large-zoos-approve.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/large-zoos-approve.md diff --git a/.changeset/large-zoos-approve.md b/.changeset/large-zoos-approve.md new file mode 100644 index 00000000..912b9c8b --- /dev/null +++ b/.changeset/large-zoos-approve.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +add an optional cache for the durable queue From a3d5d7e13178fc9d9733ed76a7aa568104579133 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas <nicodorseuil@yahoo.fr> Date: Fri, 28 Mar 2025 18:16:03 +0100 Subject: [PATCH 7/7] fix rebase --- examples/e2e/app-router/open-next.config.ts | 1 + .../cloudflare/src/api/{ => overrides/queue}/queue-cache.spec.ts | 0 packages/cloudflare/src/api/{ => overrides/queue}/queue-cache.ts | 0 3 files changed, 1 insertion(+) rename packages/cloudflare/src/api/{ => overrides/queue}/queue-cache.spec.ts (100%) rename packages/cloudflare/src/api/{ => overrides/queue}/queue-cache.ts (100%) diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index d67e3769..56a3759b 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -2,6 +2,7 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare"; import kvIncrementalCache from "@opennextjs/cloudflare/overrides/incremental-cache/kv-incremental-cache"; import shardedTagCache from "@opennextjs/cloudflare/overrides/tag-cache/do-sharded-tag-cache"; import doQueue from "@opennextjs/cloudflare/overrides/queue/do-queue"; +import queueCache from "@opennextjs/cloudflare/overrides/queue/queue-cache"; export default defineCloudflareConfig({ incrementalCache: kvIncrementalCache, diff --git a/packages/cloudflare/src/api/queue-cache.spec.ts b/packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts similarity index 100% rename from packages/cloudflare/src/api/queue-cache.spec.ts rename to packages/cloudflare/src/api/overrides/queue/queue-cache.spec.ts diff --git a/packages/cloudflare/src/api/queue-cache.ts b/packages/cloudflare/src/api/overrides/queue/queue-cache.ts similarity index 100% rename from packages/cloudflare/src/api/queue-cache.ts rename to packages/cloudflare/src/api/overrides/queue/queue-cache.ts