Skip to content

Commit a2d965d

Browse files
authored
allow advisory locks to be disabled (#5793)
1 parent 16e219b commit a2d965d

File tree

4 files changed

+163
-24
lines changed

4 files changed

+163
-24
lines changed

.changeset/big-ghosts-fall.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
allow advisory locks to be disabled

packages/cluster/src/ShardingConfig.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ export class ShardingConfig extends Context.Tag("@effect/cluster/ShardingConfig"
6363
* Shard lock expiration duration.
6464
*/
6565
readonly shardLockExpiration: DurationInput
66+
/**
67+
* Disable the use of advisory locks for shard locking.
68+
*/
69+
readonly shardLockDisableAdvisory: boolean
6670
/**
6771
* Start shutting down as soon as an Entity has started shutting down.
6872
*
@@ -134,6 +138,7 @@ export const defaults: ShardingConfig["Type"] = {
134138
preemptiveShutdown: true,
135139
shardLockRefreshInterval: Duration.seconds(10),
136140
shardLockExpiration: Duration.seconds(35),
141+
shardLockDisableAdvisory: false,
137142
entityMailboxCapacity: 4096,
138143
entityMaxIdleTime: Duration.minutes(1),
139144
entityRegistrationTimeout: Duration.minutes(1),
@@ -206,6 +211,10 @@ export const config: Config.Config<ShardingConfig["Type"]> = Config.all({
206211
Config.withDefault(defaults.shardLockExpiration),
207212
Config.withDescription("Shard lock expiration duration.")
208213
),
214+
shardLockDisableAdvisory: Config.boolean("shardLockDisableAdvisory").pipe(
215+
Config.withDefault(defaults.shardLockDisableAdvisory),
216+
Config.withDescription("Disable the use of advisory locks for shard locking.")
217+
),
209218
entityMailboxCapacity: Config.integer("entityMailboxCapacity").pipe(
210219
Config.withDefault(defaults.entityMailboxCapacity),
211220
Config.withDescription("The default capacity of the mailbox for entities.")

packages/cluster/src/SqlRunnerStorage.ts

Lines changed: 134 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export const make = Effect.fnUntraced(function*(options: {
2525
readonly prefix?: string | undefined
2626
}) {
2727
const config = yield* ShardingConfig.ShardingConfig
28+
const disableAdvisoryLocks = config.shardLockDisableAdvisory
2829
const sql = (yield* SqlClient.SqlClient).withoutTransforms()
2930
const prefix = options?.prefix ?? "cluster"
3031
const table = (name: string) => `${prefix}_${name}`
@@ -138,8 +139,22 @@ export const make = Effect.fnUntraced(function*(options: {
138139
acquired_at DATETIME NOT NULL
139140
)
140141
`,
141-
mysql: () => Effect.void,
142-
pg: () => Effect.void,
142+
mysql: () =>
143+
sql`
144+
CREATE TABLE IF NOT EXISTS ${locksTableSql} (
145+
shard_id VARCHAR(50) PRIMARY KEY,
146+
address VARCHAR(255) NOT NULL,
147+
acquired_at DATETIME NOT NULL
148+
)
149+
`,
150+
pg: () =>
151+
sql`
152+
CREATE TABLE IF NOT EXISTS ${locksTableSql} (
153+
shard_id VARCHAR(50) PRIMARY KEY,
154+
address VARCHAR(255) NOT NULL,
155+
acquired_at TIMESTAMP NOT NULL
156+
)
157+
`,
143158
orElse: () =>
144159
// sqlite
145160
sql`
@@ -232,6 +247,16 @@ export const make = Effect.fnUntraced(function*(options: {
232247
Effect.onError(() => lockConn.unsafeRebuild())
233248
)
234249
}
250+
const execWithLockConnUnprepared = <A>(
251+
effect: Statement.Statement<A>
252+
): Effect.Effect<ReadonlyArray<ReadonlyArray<any>>, SqlError> => {
253+
if (!lockConn) return effect.values
254+
const [query, params] = effect.compile()
255+
return lockConn.await.pipe(
256+
Effect.flatMap(([conn]) => conn.executeUnprepared(query, params, undefined)),
257+
Effect.onError(() => lockConn.unsafeRebuild())
258+
)
259+
}
235260
const execWithLockConnValues = <A>(
236261
effect: Statement.Statement<A>
237262
): Effect.Effect<ReadonlyArray<ReadonlyArray<any>>, SqlError> => {
@@ -244,8 +269,24 @@ export const make = Effect.fnUntraced(function*(options: {
244269
}
245270

246271
const acquireLock = sql.onDialectOrElse({
247-
pg: () =>
248-
Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray<string>) {
272+
pg: () => {
273+
if (disableAdvisoryLocks) {
274+
return (address: string, shardIds: ReadonlyArray<string>) => {
275+
const values = shardIds.map((shardId) =>
276+
sql`(${stringLiteral(shardId)}, ${stringLiteral(address)}, ${sqlNow})`
277+
)
278+
return sql`
279+
INSERT INTO ${locksTableSql} (shard_id, address, acquired_at) VALUES ${sql.csv(values)}
280+
ON CONFLICT (shard_id) DO UPDATE
281+
SET address = ${address}, acquired_at = ${sqlNow}
282+
WHERE ${locksTableSql}.address = ${address}
283+
OR ${locksTableSql}.acquired_at < ${lockExpiresAt}
284+
`.pipe(
285+
Effect.andThen(acquiredLocks(address, shardIds))
286+
)
287+
}
288+
}
289+
return Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray<string>) {
249290
const [conn, pid] = yield* lockConn!.await
250291
const acquiredShardIds: Array<string> = []
251292
const toAcquire = new Map(shardIds.map((shardId) => [lockNumbers.get(shardId)!, shardId]))
@@ -269,10 +310,26 @@ export const make = Effect.fnUntraced(function*(options: {
269310
}
270311
}
271312
return acquiredShardIds
272-
}, Effect.onError(() => lockConn!.unsafeRebuild())),
313+
}, Effect.onError(() => lockConn!.unsafeRebuild()))
314+
},
273315

274-
mysql: () =>
275-
Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray<string>) {
316+
mysql: () => {
317+
if (disableAdvisoryLocks) {
318+
return (address: string, shardIds: ReadonlyArray<string>) => {
319+
const values = shardIds.map((shardId) =>
320+
sql`(${stringLiteral(shardId)}, ${stringLiteral(address)}, ${sqlNow})`
321+
)
322+
return sql`
323+
INSERT INTO ${locksTableSql} (shard_id, address, acquired_at) VALUES ${sql.csv(values)}
324+
ON DUPLICATE KEY UPDATE
325+
address = IF(address = VALUES(address) OR acquired_at < ${lockExpiresAt}, VALUES(address), address),
326+
acquired_at = IF(address = VALUES(address) OR acquired_at < ${lockExpiresAt}, VALUES(acquired_at), acquired_at)
327+
`.unprepared.pipe(
328+
Effect.andThen(acquiredLocks(address, shardIds))
329+
)
330+
}
331+
}
332+
return Effect.fnUntraced(function*(_address: string, shardIds: ReadonlyArray<string>) {
276333
const [conn, pid] = yield* lockConn!.await
277334
const takenLocks = (yield* conn.executeValues(`SELECT ${allMySqlTakenLocks}`, []))[0] as Array<number | null>
278335
const acquiredShardIds: Array<string> = []
@@ -296,7 +353,8 @@ export const make = Effect.fnUntraced(function*(options: {
296353
}
297354
}
298355
return acquiredShardIds
299-
}, Effect.onError(() => lockConn!.unsafeRebuild())),
356+
}, Effect.onError(() => lockConn!.unsafeRebuild()))
357+
},
300358

301359
mssql: () => (address: string, shardIds: ReadonlyArray<string>) => {
302360
const values = shardIds.map((shardId) => sql`(${stringLiteral(shardId)}, ${stringLiteral(address)}, ${sqlNow})`)
@@ -399,8 +457,34 @@ export const make = Effect.fnUntraced(function*(options: {
399457
const stringLiteralArr = (arr: ReadonlyArray<string>) => sql.literal(`(${arr.map(wrapString).join(",")})`)
400458

401459
const refreshShards = sql.onDialectOrElse({
402-
pg: () => acquireLock,
403-
mysql: () => acquireLock,
460+
pg: () => {
461+
if (!disableAdvisoryLocks) return acquireLock
462+
return (address: string, shardIds: ReadonlyArray<string>) =>
463+
sql`
464+
UPDATE ${locksTableSql}
465+
SET acquired_at = ${sqlNow}
466+
WHERE address = ${address} AND shard_id IN ${stringLiteralArr(shardIds)}
467+
RETURNING shard_id
468+
`.pipe(
469+
execWithLockConnValues,
470+
Effect.map((rows) => rows.map((row) => row[0] as string))
471+
)
472+
},
473+
mysql: () => {
474+
if (!disableAdvisoryLocks) return acquireLock
475+
return (address: string, shardIds: ReadonlyArray<string>) => {
476+
const shardIdsStr = stringLiteralArr(shardIds)
477+
return sql<Array<{ shard_id: string }>>`
478+
UPDATE ${locksTableSql}
479+
SET acquired_at = ${sqlNow}
480+
WHERE address = ${address} AND shard_id IN ${shardIdsStr};
481+
SELECT shard_id FROM ${locksTableSql} WHERE address = ${address} AND shard_id IN ${shardIdsStr}
482+
`.pipe(
483+
execWithLockConnUnprepared,
484+
Effect.map((rows) => rows[1].map((row) => row.shard_id))
485+
)
486+
}
487+
},
404488
mssql: () => (address: string, shardIds: ReadonlyArray<string>) =>
405489
sql`
406490
UPDATE ${locksTableSql}
@@ -462,8 +546,14 @@ export const make = Effect.fnUntraced(function*(options: {
462546
),
463547

464548
release: sql.onDialectOrElse({
465-
pg: () =>
466-
Effect.fnUntraced(
549+
pg: () => {
550+
if (disableAdvisoryLocks) {
551+
return (address: string, shardId: string) =>
552+
sql`DELETE FROM ${locksTableSql} WHERE address = ${address} AND shard_id = ${shardId}`.pipe(
553+
PersistenceError.refail
554+
)
555+
}
556+
return Effect.fnUntraced(
467557
function*(_address, shardId) {
468558
const lockNum = lockNumbers.get(shardId)!
469559
for (let i = 0; i < 5; i++) {
@@ -481,9 +571,16 @@ export const make = Effect.fnUntraced(function*(options: {
481571
Effect.onError(() => lockConn!.unsafeRebuild()),
482572
Effect.asVoid,
483573
PersistenceError.refail
484-
),
485-
mysql: () =>
486-
Effect.fnUntraced(
574+
)
575+
},
576+
mysql: () => {
577+
if (disableAdvisoryLocks) {
578+
return (address: string, shardId: string) =>
579+
sql`DELETE FROM ${locksTableSql} WHERE address = ${address} AND shard_id = ${shardId}`.pipe(
580+
PersistenceError.refail
581+
)
582+
}
583+
return Effect.fnUntraced(
487584
function*(_address, shardId) {
488585
const lockName = lockNames.get(shardId)!
489586
while (true) {
@@ -499,28 +596,43 @@ export const make = Effect.fnUntraced(function*(options: {
499596
Effect.onError(() => lockConn!.unsafeRebuild()),
500597
Effect.asVoid,
501598
PersistenceError.refail
502-
),
599+
)
600+
},
503601
orElse: () => (address, shardId) =>
504602
sql`DELETE FROM ${locksTableSql} WHERE address = ${address} AND shard_id = ${shardId}`.pipe(
505603
PersistenceError.refail
506604
)
507605
}),
508606

509607
releaseAll: sql.onDialectOrElse({
510-
pg: () => (_address) =>
511-
sql`SELECT pg_advisory_unlock_all()`.pipe(
608+
pg: () => (address) => {
609+
if (disableAdvisoryLocks) {
610+
return sql`DELETE FROM ${locksTableSql} WHERE address = ${address}`.pipe(
611+
PersistenceError.refail,
612+
withTracerDisabled
613+
)
614+
}
615+
return sql`SELECT pg_advisory_unlock_all()`.pipe(
512616
execWithLockConn,
513617
Effect.asVoid,
514618
PersistenceError.refail,
515619
withTracerDisabled
516-
),
517-
mysql: () => (_address) =>
518-
sql`SELECT RELEASE_ALL_LOCKS()`.pipe(
620+
)
621+
},
622+
mysql: () => (address) => {
623+
if (disableAdvisoryLocks) {
624+
return sql`DELETE FROM ${locksTableSql} WHERE address = ${address}`.pipe(
625+
PersistenceError.refail,
626+
withTracerDisabled
627+
)
628+
}
629+
return sql`SELECT RELEASE_ALL_LOCKS()`.pipe(
519630
execWithLockConn,
520631
Effect.asVoid,
521632
PersistenceError.refail,
522633
withTracerDisabled
523-
),
634+
)
635+
},
524636
orElse: () => (address) =>
525637
sql`DELETE FROM ${locksTableSql} WHERE address = ${address}`.pipe(
526638
PersistenceError.refail,

packages/cluster/test/SqlRunnerStorage.test.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,21 @@ describe("SqlRunnerStorage", () => {
1616
["mysql", Layer.orDie(MysqlContainer.ClientLive)],
1717
["vitess", Layer.orDie(MysqlContainer.ClientLiveVitess)],
1818
["sqlite", Layer.orDie(SqliteLayer)]
19-
] as const).forEach(([label, layer]) => {
20-
it.layer(StorageLive.pipe(Layer.provideMerge(layer), Layer.provide(ShardingConfig.layer())), {
19+
] as const).flatMap(([label, layer]) =>
20+
[
21+
[label, StorageLive.pipe(Layer.provideMerge(layer), Layer.provide(ShardingConfig.layer()))],
22+
[
23+
label + " (no advisory)",
24+
StorageLive.pipe(
25+
Layer.provideMerge(layer),
26+
Layer.provide(ShardingConfig.layer({
27+
shardLockDisableAdvisory: true
28+
}))
29+
)
30+
]
31+
] as const
32+
).forEach(([label, layer]) => {
33+
it.layer(layer, {
2134
timeout: 60000
2235
})(label, (it) => {
2336
it.effect("getRunners", () =>

0 commit comments

Comments
 (0)