Skip to content

Commit b3589df

Browse files
authored
Merge branch 'master' into depfu/update/group/rails-8.0.2.1
2 parents 9cbb15f + 158ab2e commit b3589df

File tree

6 files changed

+140
-1558
lines changed

6 files changed

+140
-1558
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,5 +89,5 @@ workflows:
8989
- build:
9090
matrix:
9191
parameters:
92-
postgresql_image: [ "cimg/postgres:14.19", "cimg/postgres:15.14" ]
92+
postgresql_image: [ "cimg/postgres:14.19", "cimg/postgres:15.14", "cimg/postgres:16.10", "cimg/postgres:17.6", "cimg/postgres:18.0" ]
9393
- docker-build

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ GEM
319319
responders (3.0.1)
320320
actionpack (>= 5.0)
321321
railties (>= 5.0)
322-
rexml (3.3.9)
322+
rexml (3.4.4)
323323
rubocop (1.52.1)
324324
json (~> 2.3)
325325
parallel (~> 1.10)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
class ReplaceMd5WithHashtextInQueJobNotify < ActiveRecord::Migration[7.0]
2+
# This fixes https://github.com/que-rb/que/pull/437
3+
# Be careful on Que upgrade in case the final fix differed.
4+
5+
def up
6+
Que.transaction do
7+
Que.execute <<~SQL
8+
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
9+
DECLARE
10+
locker_pid integer;
11+
sort_key json;
12+
BEGIN
13+
-- Don't do anything if the job is scheduled for a future time.
14+
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
15+
RETURN null;
16+
END IF;
17+
18+
-- Pick a locker to notify of the job's insertion, weighted by their number
19+
-- of workers. Should bounce pseudorandomly between lockers on each
20+
-- invocation, hence the hashtext-ordering, but still touch each one equally,
21+
-- hence the modulo using the job_id.
22+
SELECT pid
23+
INTO locker_pid
24+
FROM (
25+
SELECT *, last_value(row_number) OVER () + 1 AS count
26+
FROM (
27+
SELECT *, row_number() OVER () - 1 AS row_number
28+
FROM (
29+
SELECT *
30+
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
31+
WHERE
32+
listening AND
33+
queues @> ARRAY[NEW.queue] AND
34+
ql.job_schema_version = NEW.job_schema_version
35+
ORDER BY hashtext(pid::text || id::text)
36+
) t1
37+
) t2
38+
) t3
39+
WHERE NEW.id % count = row_number;
40+
41+
IF locker_pid IS NOT NULL THEN
42+
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
43+
-- rather than throw errors when someone enqueues a big job, just
44+
-- broadcast the most pertinent information, and let the locker query for
45+
-- the record after it's taken the lock. The worker will have to hit the
46+
-- DB in order to make sure the job is still visible anyway.
47+
SELECT row_to_json(t)
48+
INTO sort_key
49+
FROM (
50+
SELECT
51+
'job_available' AS message_type,
52+
NEW.queue AS queue,
53+
NEW.priority AS priority,
54+
NEW.id AS id,
55+
-- Make sure we output timestamps as UTC ISO 8601
56+
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
57+
) t;
58+
59+
PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
60+
END IF;
61+
62+
RETURN null;
63+
END
64+
$$
65+
LANGUAGE plpgsql;
66+
SQL
67+
end
68+
end
69+
70+
def down
71+
Que.transaction do
72+
Que.execute <<~SQL
73+
CREATE OR REPLACE FUNCTION que_job_notify() RETURNS trigger AS $$
74+
DECLARE
75+
locker_pid integer;
76+
sort_key json;
77+
BEGIN
78+
-- Don't do anything if the job is scheduled for a future time.
79+
IF NEW.run_at IS NOT NULL AND NEW.run_at > now() THEN
80+
RETURN null;
81+
END IF;
82+
83+
-- Pick a locker to notify of the job's insertion, weighted by their number
84+
-- of workers. Should bounce pseudorandomly between lockers on each
85+
-- invocation, hence the md5-ordering, but still touch each one equally,
86+
-- hence the modulo using the job_id.
87+
SELECT pid
88+
INTO locker_pid
89+
FROM (
90+
SELECT *, last_value(row_number) OVER () + 1 AS count
91+
FROM (
92+
SELECT *, row_number() OVER () - 1 AS row_number
93+
FROM (
94+
SELECT *
95+
FROM public.que_lockers ql, generate_series(1, ql.worker_count) AS id
96+
WHERE
97+
listening AND
98+
queues @> ARRAY[NEW.queue] AND
99+
ql.job_schema_version = NEW.job_schema_version
100+
ORDER BY md5(pid::text || id::text)
101+
) t1
102+
) t2
103+
) t3
104+
WHERE NEW.id % count = row_number;
105+
106+
IF locker_pid IS NOT NULL THEN
107+
-- There's a size limit to what can be broadcast via LISTEN/NOTIFY, so
108+
-- rather than throw errors when someone enqueues a big job, just
109+
-- broadcast the most pertinent information, and let the locker query for
110+
-- the record after it's taken the lock. The worker will have to hit the
111+
-- DB in order to make sure the job is still visible anyway.
112+
SELECT row_to_json(t)
113+
INTO sort_key
114+
FROM (
115+
SELECT
116+
'job_available' AS message_type,
117+
NEW.queue AS queue,
118+
NEW.priority AS priority,
119+
NEW.id AS id,
120+
-- Make sure we output timestamps as UTC ISO 8601
121+
to_char(NEW.run_at AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"') AS run_at
122+
) t;
123+
124+
PERFORM pg_notify('que_listener_' || locker_pid::text, sort_key::text);
125+
END IF;
126+
127+
RETURN null;
128+
END
129+
$$
130+
LANGUAGE plpgsql;
131+
SQL
132+
end
133+
end
134+
end

0 commit comments

Comments
 (0)