Skip to content

Commit 7b9958b

Browse files
Merge pull request #39 from tensorzero/viraj/atomic-checkpoint
2 parents e6eb86c + 3b1b099 commit 7b9958b

File tree

1 file changed

+15
-24
lines changed

1 file changed

+15
-24
lines changed

src/postgres/migrations/20251202002136_initial_setup.sql

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -999,8 +999,6 @@ as $$
999999
declare
10001000
v_now timestamptz := durable.current_time();
10011001
v_new_attempt integer;
1002-
v_existing_attempt integer;
1003-
v_existing_owner uuid;
10041002
v_task_state text;
10051003
begin
10061004
if p_step_name is null or length(trim(p_step_name)) = 0 then
@@ -1042,29 +1040,22 @@ begin
10421040
end if;
10431041

10441042
execute format(
1045-
'select c.owner_run_id,
1046-
r.attempt
1047-
from durable.%I c
1048-
left join durable.%I r on r.run_id = c.owner_run_id
1049-
where c.task_id = $1
1050-
and c.checkpoint_name = $2',
1043+
'insert into durable.%I (task_id, checkpoint_name, state, owner_run_id, updated_at)
1044+
values ($1, $2, $3, $4, $5)
1045+
on conflict (task_id, checkpoint_name)
1046+
do update set state = excluded.state,
1047+
owner_run_id = excluded.owner_run_id,
1048+
updated_at = excluded.updated_at
1049+
where $6 >= coalesce(
1050+
(select r.attempt
1051+
from durable.%I r
1052+
where r.run_id = durable.%I.owner_run_id),
1053+
$6
1054+
)',
10511055
'c_' || p_queue_name,
1052-
'r_' || p_queue_name
1053-
)
1054-
into v_existing_owner, v_existing_attempt
1055-
using p_task_id, p_step_name;
1056-
1057-
if v_existing_owner is null or v_existing_attempt is null or v_new_attempt >= v_existing_attempt then
1058-
execute format(
1059-
'insert into durable.%I (task_id, checkpoint_name, state, owner_run_id, updated_at)
1060-
values ($1, $2, $3, $4, $5)
1061-
on conflict (task_id, checkpoint_name)
1062-
do update set state = excluded.state,
1063-
owner_run_id = excluded.owner_run_id,
1064-
updated_at = excluded.updated_at',
1065-
'c_' || p_queue_name
1066-
) using p_task_id, p_step_name, p_state, p_owner_run, v_now;
1067-
end if;
1056+
'r_' || p_queue_name,
1057+
'c_' || p_queue_name
1058+
) using p_task_id, p_step_name, p_state, p_owner_run, v_now, v_new_attempt;
10681059
end;
10691060
$$;
10701061

0 commit comments

Comments
 (0)