-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathqueue-cache.ts
114 lines (100 loc) · 3.25 KB
/
queue-cache.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import { error } from "@opennextjs/aws/adapters/logger.js";
import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides";
interface QueueCachingOptions {
/**
* The TTL for the regional cache in seconds.
* @default 5
*/
regionalCacheTtlSec?: number;
/**
* Whether to wait for the queue ack before returning.
* When set to false, the cache will be populated asap and the queue will be called after.
* When set to true, the cache will be populated only after the queue ack is received.
* @default false
*/
waitForQueueAck?: boolean;
}
const DEFAULT_QUEUE_CACHE_TTL_SEC = 5;
class QueueCache implements Queue {
readonly name;
readonly regionalCacheTtlSec: number;
readonly waitForQueueAck: boolean;
cache: Cache | undefined;
localCache: Map<string, number> = new Map();
constructor(
private originalQueue: Queue,
options: QueueCachingOptions
) {
this.name = `cached-${originalQueue.name}`;
this.regionalCacheTtlSec = options.regionalCacheTtlSec ?? DEFAULT_QUEUE_CACHE_TTL_SEC;
this.waitForQueueAck = options.waitForQueueAck ?? false;
}
async send(msg: QueueMessage) {
try {
const isCached = await this.isInCache(msg);
if (isCached) {
return;
}
if (!this.waitForQueueAck) {
await this.putToCache(msg);
}
await this.originalQueue.send(msg);
if (this.waitForQueueAck) {
await this.putToCache(msg);
}
} catch (e) {
error("Error sending message to queue", e);
} finally {
this.clearLocalCache();
}
}
private async getCache() {
if (!this.cache) {
this.cache = await caches.open("durable-queue");
}
return this.cache;
}
private getCacheUrlString(msg: QueueMessage) {
return `queue/${msg.MessageGroupId}/${msg.MessageDeduplicationId}`;
}
private getCacheKey(msg: QueueMessage) {
return new Request(new URL(this.getCacheUrlString(msg), "http://local.cache"));
}
private async putToCache(msg: QueueMessage) {
this.localCache.set(this.getCacheUrlString(msg), Date.now());
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
await cache.put(
cacheKey,
new Response(null, { status: 200, headers: { "Cache-Control": `max-age=${this.regionalCacheTtlSec}` } })
);
}
private async isInCache(msg: QueueMessage) {
if (this.localCache.has(this.getCacheUrlString(msg))) {
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!;
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) {
return true;
}
this.localCache.delete(this.getCacheUrlString(msg));
return false;
}
const cacheKey = this.getCacheKey(msg);
const cache = await this.getCache();
const cachedResponse = await cache.match(cacheKey);
if (cachedResponse) {
return true;
}
}
/**
* Remove any value older than the TTL from the local cache
*/
private clearLocalCache() {
const now = Date.now();
for (const [key, value] of this.localCache.entries()) {
if (now - value > this.regionalCacheTtlSec * 1000) {
this.localCache.delete(key);
}
}
}
}
export default (originalQueue: Queue, opts: QueueCachingOptions = {}) => new QueueCache(originalQueue, opts);