diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4fe44cb..59a1c22 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,8 +5,8 @@ on: branches: [main] jobs: - integration: - name: all tests + sqlite-tests: + name: sqlite tests runs-on: ubuntu-latest steps: - name: Install Nix @@ -16,5 +16,5 @@ jobs: name: lana-ci authToken: ${{ env.CACHIX_AUTH_TOKEN }} - uses: actions/checkout@v3 - - name: Run integration tests - run: nix run .#nextest + - name: Run SQLite tests + run: nix run .#nextest-sqlite diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..fbe68d3 --- /dev/null +++ b/.mcp.json @@ -0,0 +1,11 @@ +{ + "mcpServers": { + "clat": { + "headers": { + "Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIwMTljZjIyZS03YTRjLTdiZDMtYWNlMS1hNzc5ZjllZTM1OGYiLCJyb2xlIjoiZW5naW5lZXIiLCJwcm9qZWN0IjoiY2FuY2VsLWpvYiIsImlhdCI6MTc3MzU4OTY1OH0.WjJCicO3gcjTVBvUNXVPq7qJ_-4rBLsLJ1LlC7-QUVg" + }, + "type": "http", + "url": "http://127.0.0.1:9111/mcp?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIwMTljZjIyZS03YTRjLTdiZDMtYWNlMS1hNzc5ZjllZTM1OGYiLCJyb2xlIjoiZW5naW5lZXIiLCJwcm9qZWN0IjoiY2FuY2VsLWpvYiIsImlhdCI6MTc3MzU4OTY1OH0.WjJCicO3gcjTVBvUNXVPq7qJ_-4rBLsLJ1LlC7-QUVg" + } + } +} \ No newline at end of file diff --git a/.sqlx/query-019135648f4d3371f724322403863b87a60ad88a44541651afaf9f7ef6288435.json b/.sqlx/query-019135648f4d3371f724322403863b87a60ad88a44541651afaf9f7ef6288435.json deleted file mode 100644 index 0973509..0000000 --- a/.sqlx/query-019135648f4d3371f724322403863b87a60ad88a44541651afaf9f7ef6288435.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at)\n VALUES ($1, $2, $3, $4, COALESCE($5, NOW()), COALESCE($5, NOW()))\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - "Varchar", - "Timestamptz", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "019135648f4d3371f724322403863b87a60ad88a44541651afaf9f7ef6288435" -} diff --git a/.sqlx/query-0ef759ca363dd6215cacf388c8f2d3a31ee655b14d9f992fd7dcd7ea4559fb49.json b/.sqlx/query-0ef759ca363dd6215cacf388c8f2d3a31ee655b14d9f992fd7dcd7ea4559fb49.json deleted file mode 100644 index 5c6545c..0000000 --- a/.sqlx/query-0ef759ca363dd6215cacf388c8f2d3a31ee655b14d9f992fd7dcd7ea4559fb49.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state = 'pending',\n execute_at = $1,\n poller_instance_id = NULL\n WHERE poller_instance_id = $2 AND state = 'running'\n RETURNING id as \"id!: JobId\", attempt_index\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id!: JobId", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "attempt_index", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Timestamptz", - "Uuid" - ] - }, - "nullable": [ - false, - false - ] - }, - "hash": "0ef759ca363dd6215cacf388c8f2d3a31ee655b14d9f992fd7dcd7ea4559fb49" -} diff --git a/.sqlx/query-12668646228f369ad4658bf658e75e0be951ea12e9bcb4c016f93df1575bc8eb.json b/.sqlx/query-12668646228f369ad4658bf658e75e0be951ea12e9bcb4c016f93df1575bc8eb.json deleted file mode 100644 index 2712142..0000000 --- a/.sqlx/query-12668646228f369ad4658bf658e75e0be951ea12e9bcb4c016f93df1575bc8eb.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE id = $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $2 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Uuid", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "12668646228f369ad4658bf658e75e0be951ea12e9bcb4c016f93df1575bc8eb" -} diff --git a/.sqlx/query-3d84eec41b105db35e4672b32d352cdba492d5b44234a941227391ff4ed5c5ce.json b/.sqlx/query-3d84eec41b105db35e4672b32d352cdba492d5b44234a941227391ff4ed5c5ce.json deleted file mode 100644 index b73141b..0000000 --- a/.sqlx/query-3d84eec41b105db35e4672b32d352cdba492d5b44234a941227391ff4ed5c5ce.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE (COALESCE(id < $2, true)) ORDER BY id DESC LIMIT $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $3 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id desc, i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8", - "Uuid", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "3d84eec41b105db35e4672b32d352cdba492d5b44234a941227391ff4ed5c5ce" -} diff --git a/.sqlx/query-4aa09bfdf92c1b10f4b533557db33624c22189d93a1aeb5b5357f3cf7c3df0a0.json b/.sqlx/query-4aa09bfdf92c1b10f4b533557db33624c22189d93a1aeb5b5357f3cf7c3df0a0.json deleted file mode 100644 index 868f4a0..0000000 --- a/.sqlx/query-4aa09bfdf92c1b10f4b533557db33624c22189d93a1aeb5b5357f3cf7c3df0a0.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM job_executions\n WHERE id = $1 AND poller_instance_id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "4aa09bfdf92c1b10f4b533557db33624c22189d93a1aeb5b5357f3cf7c3df0a0" -} diff --git a/.sqlx/query-4ed94ccf82ccc9f8d56e50016d41fede1cef57f46e6b3b07ea414f3066bb911f.json b/.sqlx/query-4ed94ccf82ccc9f8d56e50016d41fede1cef57f46e6b3b07ea414f3066bb911f.json deleted file mode 100644 index 36be033..0000000 --- a/.sqlx/query-4ed94ccf82ccc9f8d56e50016d41fede1cef57f46e6b3b07ea414f3066bb911f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM job_executions\n WHERE id = $1 AND poller_instance_id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "4ed94ccf82ccc9f8d56e50016d41fede1cef57f46e6b3b07ea414f3066bb911f" -} diff --git a/.sqlx/query-5ea32394be9b2738e7f662c4dcd40a624190a3b6b54f29fd1a54f5ed1bc2170b.json b/.sqlx/query-5ea32394be9b2738e7f662c4dcd40a624190a3b6b54f29fd1a54f5ed1bc2170b.json deleted file mode 100644 index b3d9429..0000000 --- a/.sqlx/query-5ea32394be9b2738e7f662c4dcd40a624190a3b6b54f29fd1a54f5ed1bc2170b.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH eligible AS (\n SELECT id, queue_id, execute_at, execution_state_json, attempt_index\n FROM job_executions\n WHERE state = 'pending'\n AND job_type = ANY($4)\n AND NOT EXISTS (\n SELECT 1 FROM job_executions AS running\n WHERE running.state = 'running'\n AND running.queue_id IS NOT NULL\n AND running.queue_id = job_executions.queue_id\n )\n ),\n min_wait AS (\n SELECT MIN(execute_at) - $2::timestamptz AS wait_time\n FROM eligible\n WHERE execute_at > $2::timestamptz\n ),\n candidates AS (\n SELECT id, execution_state_json AS data_json, attempt_index,\n ROW_NUMBER() OVER (\n PARTITION BY COALESCE(queue_id, id::text)\n ORDER BY execute_at\n ) AS rn\n FROM eligible\n WHERE execute_at <= $2::timestamptz\n ),\n selected_jobs AS (\n SELECT je.id, c.data_json, c.attempt_index\n FROM candidates c\n JOIN job_executions je ON je.id = c.id\n WHERE c.rn = 1\n ORDER BY je.execute_at ASC\n LIMIT $1\n FOR UPDATE OF je\n ),\n updated AS (\n UPDATE job_executions AS je\n SET state = 'running', alive_at = $2, execute_at = NULL, poller_instance_id = $3\n FROM selected_jobs\n WHERE je.id = selected_jobs.id\n RETURNING je.id, selected_jobs.data_json, je.attempt_index\n )\n SELECT * FROM (\n SELECT\n u.id AS \"id?: JobId\",\n u.data_json AS \"data_json?: JsonValue\",\n u.attempt_index AS \"attempt_index?\",\n NULL::INTERVAL AS \"max_wait?: PgInterval\"\n FROM updated u\n UNION ALL\n SELECT\n NULL::UUID AS \"id?: JobId\",\n NULL::JSONB AS \"data_json?: JsonValue\",\n NULL::INT AS \"attempt_index?\",\n mw.wait_time AS \"max_wait?: PgInterval\"\n FROM min_wait mw\n WHERE NOT EXISTS (SELECT 1 FROM updated)\n ) AS result\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id?: JobId", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "data_json?: JsonValue", - "type_info": "Jsonb" - }, - { - "ordinal": 2, - "name": "attempt_index?", - "type_info": "Int4" - }, - { - "ordinal": 3, - "name": "max_wait?: PgInterval", - "type_info": "Interval" - } - ], - "parameters": { - "Left": [ - "Int8", - "Timestamptz", - "Uuid", - "TextArray" - ] - }, - "nullable": [ - null, - null, - null, - null - ] - }, - "hash": "5ea32394be9b2738e7f662c4dcd40a624190a3b6b54f29fd1a54f5ed1bc2170b" -} diff --git a/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json b/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json deleted file mode 100644 index 9b32f31..0000000 --- a/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM job_executions WHERE id = $1 AND state = 'pending'", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4" -} diff --git a/.sqlx/query-83a57fe9f4eef92fbf7cfb496ae1fb77556e6a2ccc45d4f1380da0695919b330.json b/.sqlx/query-83a57fe9f4eef92fbf7cfb496ae1fb77556e6a2ccc45d4f1380da0695919b330.json deleted file mode 100644 index 45e8560..0000000 --- a/.sqlx/query-83a57fe9f4eef92fbf7cfb496ae1fb77556e6a2ccc45d4f1380da0695919b330.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO job_events (id, recorded_at, sequence, event_type, event) SELECT $1, COALESCE($2, NOW()), ROW_NUMBER() OVER () + $3, unnested.event_type, unnested.event FROM UNNEST($4::TEXT[], $5::JSONB[]) AS unnested(event_type, event) RETURNING recorded_at", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Uuid", - "Timestamptz", - "Int8", - "TextArray", - "JsonbArray" - ] - }, - "nullable": [ - false - ] - }, - "hash": "83a57fe9f4eef92fbf7cfb496ae1fb77556e6a2ccc45d4f1380da0695919b330" -} diff --git a/.sqlx/query-9224c1c459a94c2de75e113a109ea8077430c79520ca26b44f1a7ae74656ad7c.json b/.sqlx/query-9224c1c459a94c2de75e113a109ea8077430c79520ca26b44f1a7ae74656ad7c.json deleted file mode 100644 index 8d42b16..0000000 --- a/.sqlx/query-9224c1c459a94c2de75e113a109ea8077430c79520ca26b44f1a7ae74656ad7c.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET alive_at = $1\n WHERE poller_instance_id = $2 AND state = 'running'\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "9224c1c459a94c2de75e113a109ea8077430c79520ca26b44f1a7ae74656ad7c" -} diff --git a/.sqlx/query-97647c873b103f0c060ab3e73e3175cb7288cdc35da02cb706e7cc62936f0523.json b/.sqlx/query-97647c873b103f0c060ab3e73e3175cb7288cdc35da02cb706e7cc62936f0523.json deleted file mode 100644 index f5f9b16..0000000 --- a/.sqlx/query-97647c873b103f0c060ab3e73e3175cb7288cdc35da02cb706e7cc62936f0523.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE job_type = $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $2 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Text", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "97647c873b103f0c060ab3e73e3175cb7288cdc35da02cb706e7cc62936f0523" -} diff --git a/.sqlx/query-9ab25bea85f68a606013136bf1d70be5173a6b992936215300ad0737c46bfeec.json b/.sqlx/query-9ab25bea85f68a606013136bf1d70be5173a6b992936215300ad0737c46bfeec.json deleted file mode 100644 index 2838a67..0000000 --- a/.sqlx/query-9ab25bea85f68a606013136bf1d70be5173a6b992936215300ad0737c46bfeec.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state = 'pending', execute_at = $2, attempt_index = $3, poller_instance_id = NULL\n WHERE id = $1 AND poller_instance_id = $4\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Timestamptz", - "Int4", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "9ab25bea85f68a606013136bf1d70be5173a6b992936215300ad0737c46bfeec" -} diff --git a/.sqlx/query-a13e1721858db65497f99095b4190c8169002ab3bf17d088c3af7628bcc5b35d.json b/.sqlx/query-a13e1721858db65497f99095b4190c8169002ab3bf17d088c3af7628bcc5b35d.json deleted file mode 100644 index 9aaecf6..0000000 --- a/.sqlx/query-a13e1721858db65497f99095b4190c8169002ab3bf17d088c3af7628bcc5b35d.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE id = ANY($1)) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $2 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "UuidArray", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "a13e1721858db65497f99095b4190c8169002ab3bf17d088c3af7628bcc5b35d" -} diff --git a/.sqlx/query-a68768110ff7b74d09822f7ba11bf7d3c4544d08cbb040662537ca75869724b9.json b/.sqlx/query-a68768110ff7b74d09822f7ba11bf7d3c4544d08cbb040662537ca75869724b9.json deleted file mode 100644 index df2bf0f..0000000 --- a/.sqlx/query-a68768110ff7b74d09822f7ba11bf7d3c4544d08cbb040662537ca75869724b9.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO jobs (id, job_type, unique_per_type, created_at) VALUES ($1, $2, $3, COALESCE($4, NOW()))", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Varchar", - "Bool", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "a68768110ff7b74d09822f7ba11bf7d3c4544d08cbb040662537ca75869724b9" -} diff --git a/.sqlx/query-ae139606654819ffd6895252999151b67d905c5b481f7c7e13cca53cf9b902cd.json b/.sqlx/query-ae139606654819ffd6895252999151b67d905c5b481f7c7e13cca53cf9b902cd.json deleted file mode 100644 index 51540a2..0000000 --- a/.sqlx/query-ae139606654819ffd6895252999151b67d905c5b481f7c7e13cca53cf9b902cd.json +++ /dev/null @@ -1,47 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE unique_per_type = $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $2 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Bool", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "ae139606654819ffd6895252999151b67d905c5b481f7c7e13cca53cf9b902cd" -} diff --git a/.sqlx/query-d58d1b9373f3485a2a4d74567f8a2f031bea0951a2938bcf0b2960179e66879a.json b/.sqlx/query-d58d1b9373f3485a2a4d74567f8a2f031bea0951a2938bcf0b2960179e66879a.json deleted file mode 100644 index 0ab26ef..0000000 --- a/.sqlx/query-d58d1b9373f3485a2a4d74567f8a2f031bea0951a2938bcf0b2960179e66879a.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT id FROM jobs WHERE (COALESCE(id > $2, true)) ORDER BY id ASC LIMIT $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $3 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.id asc, i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8", - "Uuid", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "d58d1b9373f3485a2a4d74567f8a2f031bea0951a2938bcf0b2960179e66879a" -} diff --git a/.sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json b/.sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json deleted file mode 100644 index 0069d97..0000000 --- a/.sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state = 'pending', execute_at = $1, attempt_index = attempt_index + 1, poller_instance_id = NULL\n WHERE state = 'running' AND alive_at < $1::timestamptz\n RETURNING id as id\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "id", - "type_info": "Uuid" - } - ], - "parameters": { - "Left": [ - "Timestamptz" - ] - }, - "nullable": [ - false - ] - }, - "hash": "da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be" -} diff --git a/.sqlx/query-dcc6c8810ed74de9da3aef674c749e4814e2b2eb82675e2226371920e19f1c0d.json b/.sqlx/query-dcc6c8810ed74de9da3aef674c749e4814e2b2eb82675e2226371920e19f1c0d.json deleted file mode 100644 index c683449..0000000 --- a/.sqlx/query-dcc6c8810ed74de9da3aef674c749e4814e2b2eb82675e2226371920e19f1c0d.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT created_at, id FROM jobs WHERE (COALESCE((created_at, id) > ($3, $2), $2 IS NULL)) ORDER BY created_at ASC, id ASC LIMIT $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $4 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.created_at asc, i.id asc, i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8", - "Uuid", - "Timestamptz", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "dcc6c8810ed74de9da3aef674c749e4814e2b2eb82675e2226371920e19f1c0d" -} diff --git a/.sqlx/query-e666d2e28c76cab9ed701d49d72646ee304f6574e4749f2f1878947b89ce6ae3.json b/.sqlx/query-e666d2e28c76cab9ed701d49d72646ee304f6574e4749f2f1878947b89ce6ae3.json deleted file mode 100644 index 893ead8..0000000 --- a/.sqlx/query-e666d2e28c76cab9ed701d49d72646ee304f6574e4749f2f1878947b89ce6ae3.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET execution_state_json = $1\n WHERE id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Jsonb", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "e666d2e28c76cab9ed701d49d72646ee304f6574e4749f2f1878947b89ce6ae3" -} diff --git a/.sqlx/query-e67fb92be8782f30d872b37b7a8efdaae473009d3dc398aacff73a58eb0e0ad7.json b/.sqlx/query-e67fb92be8782f30d872b37b7a8efdaae473009d3dc398aacff73a58eb0e0ad7.json deleted file mode 100644 index edf8a0b..0000000 --- a/.sqlx/query-e67fb92be8782f30d872b37b7a8efdaae473009d3dc398aacff73a58eb0e0ad7.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state = 'pending', execute_at = $2, attempt_index = 1, poller_instance_id = NULL\n WHERE id = $1 AND poller_instance_id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "e67fb92be8782f30d872b37b7a8efdaae473009d3dc398aacff73a58eb0e0ad7" -} diff --git a/.sqlx/query-fe01de1969256000b4a02e472fbd608a31aa78e73e60e466b7ba47cd9b7a72cd.json b/.sqlx/query-fe01de1969256000b4a02e472fbd608a31aa78e73e60e466b7ba47cd9b7a72cd.json deleted file mode 100644 index 0b09d11..0000000 --- a/.sqlx/query-fe01de1969256000b4a02e472fbd608a31aa78e73e60e466b7ba47cd9b7a72cd.json +++ /dev/null @@ -1,49 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH entities AS (SELECT created_at, id FROM jobs WHERE (COALESCE((created_at, id) < ($3, $2), $2 IS NULL)) ORDER BY created_at DESC, id DESC LIMIT $1) SELECT i.id AS \"entity_id: Repo__Id\", e.sequence, e.event, CASE WHEN $4 THEN e.context ELSE NULL::jsonb END as \"context: es_entity::ContextData\", e.recorded_at FROM entities i JOIN job_events e ON i.id = e.id ORDER BY i.created_at desc, i.id desc, i.id, e.sequence", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "entity_id: Repo__Id", - "type_info": "Uuid" - }, - { - "ordinal": 1, - "name": "sequence", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "event", - "type_info": "Jsonb" - }, - { - "ordinal": 3, - "name": "context: es_entity::ContextData", - "type_info": "Jsonb" - }, - { - "ordinal": 4, - "name": "recorded_at", - "type_info": "Timestamptz" - } - ], - "parameters": { - "Left": [ - "Int8", - "Uuid", - "Timestamptz", - "Bool" - ] - }, - "nullable": [ - false, - false, - false, - null, - false - ] - }, - "hash": "fe01de1969256000b4a02e472fbd608a31aa78e73e60e466b7ba47cd9b7a72cd" -} diff --git a/Cargo.lock b/Cargo.lock index 9f99069..05045b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,13 +456,12 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "es-entity" -version = "0.10.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d99d307ef96f2d79218c8d39a1da3fb5e8f1439a509558fb87aef0721547b416" +version = "0.10.28-dev" +source = "git+https://github.com/GaloyMoney/es-entity.git?branch=task%2Fsqlite-backend-019cdf3d#ff1808d3bf6b6a1df1eb690737b90b26ed8e46b4" dependencies = [ "chrono", "derive_builder", - "es-entity-macros", + "es-entity-macros-sqlite", "im", "opentelemetry", "opentelemetry_sdk", @@ -480,10 +479,9 @@ dependencies = [ ] [[package]] -name = "es-entity-macros" -version = "0.10.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe53aed1d1a568b4a487be142eb084eeb5e36f6375671a64af02340a09a5654" +name = "es-entity-macros-sqlite" +version = "0.10.28-dev" +source = "git+https://github.com/GaloyMoney/es-entity.git?branch=task%2Fsqlite-backend-019cdf3d#ff1808d3bf6b6a1df1eb690737b90b26ed8e46b4" dependencies = [ "convert_case", "darling 0.23.0", @@ -682,19 +680,19 @@ checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", ] [[package]] name = "getrandom" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 6.0.0", "rand_core 0.10.0", "wasip2", "wasip3", @@ -977,9 +975,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.90" +version = "0.3.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14dc6f6450b3f6d4ed5b16327f38fed626d375a886159ca555bd7822c0c3a5a6" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" dependencies = [ "once_cell", "wasm-bindgen", @@ -1002,9 +1000,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.182" +version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" [[package]] name = "libm" @@ -1014,13 +1012,14 @@ checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" [[package]] name = "libredox" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" +checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ "bitflags", "libc", - "redox_syscall 0.7.2", + "plain", + "redox_syscall 0.7.3", ] [[package]] @@ -1029,6 +1028,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ + "cc", "pkg-config", "vcpkg", ] @@ -1135,9 +1135,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.3" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "opentelemetry" @@ -1216,18 +1216,18 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pin-project" -version = "1.1.10" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.10" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", @@ -1236,9 +1236,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" [[package]] name = "pkcs1" @@ -1267,6 +1267,12 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plain" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" + [[package]] name = "pluralizer" version = "0.5.0" @@ -1322,9 +1328,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.44" +version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b2ebcf727b7760c461f091f9f0f539b77b8e87f2fd88131e7f1b433b3cece4" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" dependencies = [ "proc-macro2", ] @@ -1335,6 +1341,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rand" version = "0.8.5" @@ -1363,7 +1375,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" dependencies = [ "chacha20", - "getrandom 0.4.1", + "getrandom 0.4.2", "rand_core 0.10.0", ] @@ -1431,9 +1443,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d94dd2f7cd932d4dc02cc8b2b50dfd38bd079a4e5d79198b99743d7fcf9a4b4" +checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ "bitflags", ] @@ -1800,12 +1812,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f4aa3ad99f2088c990dfa82d367e19cb29268ed67c574d10d0a4bfe71f07e0" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2179,9 +2191,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", @@ -2332,7 +2344,7 @@ version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.4.1", + "getrandom 0.4.2", "js-sys", "serde_core", "wasm-bindgen", @@ -2388,9 +2400,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" [[package]] name = "wasm-bindgen" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60722a937f594b7fde9adb894d7c092fc1bb6612897c46368d18e7a20208eff2" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" dependencies = [ "cfg-if", "once_cell", @@ -2401,9 +2413,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac8c6395094b6b91c4af293f4c79371c163f9a6f56184d2c9a85f5a95f3950" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2411,9 +2423,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3fabce6159dc20728033842636887e4877688ae94382766e00b180abac9d60" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" dependencies = [ "bumpalo", "proc-macro2", @@ -2424,9 +2436,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de0e091bdb824da87dc01d967388880d017a0a9bc4f3bdc0d86ee9f9336e3bb5" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" dependencies = [ "unicode-ident", ] @@ -2580,15 +2592,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", -] - [[package]] name = "windows-sys" version = "0.61.2" @@ -2622,30 +2625,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -2658,12 +2644,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -2676,12 +2656,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -2694,24 +2668,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -2724,12 +2686,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -2742,12 +2698,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -2760,12 +2710,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -2778,12 +2722,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "wit-bindgen" version = "0.51.0" @@ -2903,18 +2841,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.39" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" +checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.39" +version = "0.8.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" +checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 9b4cd1c..605ec9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ json-schema = ["dep:schemars", "es-entity/json-schema"] tokio-task-names = ["tokio/tracing"] [dependencies] -es-entity = { workspace = true, features = ["tracing-context"] } +es-entity = { git = "https://github.com/GaloyMoney/es-entity.git", branch = "task/sqlite-backend-019cdf3d", default-features = false, features = ["sqlite", "tracing-context"] } async-trait = { workspace = true } derive_builder = { workspace = true } @@ -42,14 +42,12 @@ anyhow = { workspace = true } [workspace.dependencies] -es-entity = "0.10.27" - anyhow = "1.0" async-trait = "0.1" chrono = { version = "0.4", features = ["clock", "serde"], default-features = false } derive_builder = "0.20" tracing = { version = "0.1" } -sqlx = { version = "0.8", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "chrono", "json" ] } +sqlx = { version = "0.8", features = ["macros", "runtime-tokio-rustls", "sqlite", "uuid", "chrono", "json" ] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_with = "3.14" diff --git a/flake.nix b/flake.nix index 57836fe..1d21a54 100644 --- a/flake.nix +++ b/flake.nix @@ -46,13 +46,11 @@ rustSource = pkgs.lib.cleanSourceWith { src = craneLib.path ./.; filter = path: type: - (builtins.match ".*\.sqlx/.*" path != null) - || (builtins.match ".*deny\.toml$" path != null) + (builtins.match ".*deny\.toml$" path != null) || (craneLib.filterCargoSources path type); }; commonArgs = { src = rustSource; - SQLX_OFFLINE = "true"; }; cargoArtifacts = craneLib.buildDepsOnly commonArgs; @@ -65,76 +63,59 @@ cargo-deny mdbook bacon - postgresql - docker-compose ytt - podman - podman-compose curl ]; - devEnvVars = rec { - PGDATABASE = "pg"; - PGUSER = "user"; - PGPASSWORD = "password"; - PGHOST = "127.0.0.1"; - DATABASE_URL = "postgres://${PGUSER}:${PGPASSWORD}@${PGHOST}:5432/pg?sslmode=disable"; - PG_CON = "${DATABASE_URL}"; - }; - - podman-runner = pkgs.callPackage ./nix/podman-runner.nix {}; + devEnvVars = {}; nextest-runner = pkgs.writeShellScriptBin "nextest-runner" '' set -e export PATH="${pkgs.lib.makeBinPath [ - podman-runner.podman-compose-runner - pkgs.wait4x - pkgs.sqlx-cli pkgs.cargo-nextest pkgs.coreutils - pkgs.gnumake rustToolchain pkgs.stdenv.cc ]}:$PATH" - export DATABASE_URL="${devEnvVars.DATABASE_URL}" - export PG_CON="${devEnvVars.PG_CON}" - export PGDATABASE="${devEnvVars.PGDATABASE}" - export PGUSER="${devEnvVars.PGUSER}" - export PGPASSWORD="${devEnvVars.PGPASSWORD}" - export PGHOST="${devEnvVars.PGHOST}" + echo "Running nextest..." + cargo nextest run --workspace --verbose - cleanup() { - echo "Stopping deps..." - ${podman-runner.podman-compose-runner}/bin/podman-compose-runner down || true - } + echo "Running doc tests..." + cargo test --doc --workspace - trap cleanup EXIT + echo "Building docs..." + cargo doc --no-deps --workspace - echo "Starting PostgreSQL..." - ${podman-runner.podman-compose-runner}/bin/podman-compose-runner up -d + echo "Tests completed successfully!" + ''; - echo "Waiting for PostgreSQL to be ready..." - ${pkgs.wait4x}/bin/wait4x postgresql "$DATABASE_URL" --timeout 120s + nextest-sqlite-runner = pkgs.writeShellScriptBin "nextest-sqlite-runner" '' + set -e - echo "Running database migrations..." - ${pkgs.sqlx-cli}/bin/sqlx migrate run + export PATH="${pkgs.lib.makeBinPath [ + pkgs.cargo-nextest + pkgs.coreutils + rustToolchain + pkgs.stdenv.cc + ]}:$PATH" - echo "Running nextest..." + echo "Running SQLite tests..." cargo nextest run --workspace --verbose - echo "Running doc tests..." + echo "Running SQLite doc tests..." cargo test --doc --workspace echo "Building docs..." cargo doc --no-deps --workspace - echo "Tests completed successfully!" + echo "SQLite tests completed successfully!" ''; in with pkgs; { packages = { nextest = nextest-runner; + nextest-sqlite = nextest-sqlite-runner; }; checks = { diff --git a/migrations/20250904065521_job_setup.sql b/migrations/20250904065521_job_setup.sql index fd5bd72..fe4914b 100644 --- a/migrations/20250904065521_job_setup.sql +++ b/migrations/20250904065521_job_setup.sql @@ -1,34 +1,32 @@ CREATE TABLE jobs ( - id UUID PRIMARY KEY, - unique_per_type BOOLEAN NOT NULL, - job_type VARCHAR NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + id TEXT PRIMARY KEY NOT NULL, + unique_per_type INTEGER NOT NULL DEFAULT 0, + job_type TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) ); -CREATE UNIQUE INDEX idx_unique_job_type ON jobs (job_type) WHERE unique_per_type = TRUE; +CREATE UNIQUE INDEX idx_unique_job_type ON jobs (job_type) WHERE unique_per_type = 1; CREATE TABLE job_events ( - id UUID NOT NULL REFERENCES jobs(id), - sequence INT NOT NULL, - event_type VARCHAR NOT NULL, - event JSONB NOT NULL, - context JSONB DEFAULT NULL, - recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + id TEXT NOT NULL REFERENCES jobs(id), + sequence INTEGER NOT NULL, + event_type TEXT NOT NULL, + event TEXT NOT NULL, + context TEXT DEFAULT NULL, + recorded_at TEXT NOT NULL, UNIQUE(id, sequence) ); -CREATE TYPE JobExecutionState AS ENUM ('pending', 'running'); - CREATE TABLE job_executions ( - id UUID REFERENCES jobs(id) NOT NULL UNIQUE, - job_type VARCHAR NOT NULL, - queue_id VARCHAR, - poller_instance_id UUID, - attempt_index INT NOT NULL DEFAULT 1, - state JobExecutionState NOT NULL DEFAULT 'pending', - execution_state_json JSONB, - execute_at TIMESTAMPTZ, - alive_at TIMESTAMPTZ NOT NULL, - created_at TIMESTAMPTZ NOT NULL + id TEXT NOT NULL UNIQUE REFERENCES jobs(id), + job_type TEXT NOT NULL, + queue_id TEXT, + poller_instance_id TEXT, + attempt_index INTEGER NOT NULL DEFAULT 1, + state TEXT NOT NULL DEFAULT 'pending' CHECK(state IN ('pending', 'running')), + execution_state_json TEXT, + execute_at TEXT, + alive_at TEXT NOT NULL, + created_at TEXT NOT NULL ); CREATE INDEX idx_job_executions_poller_instance @@ -50,43 +48,3 @@ CREATE INDEX idx_job_executions_pending_job_type_execute_at CREATE INDEX idx_job_executions_running_queue_id ON job_executions(queue_id) WHERE state = 'running' AND queue_id IS NOT NULL; - -CREATE OR REPLACE FUNCTION notify_job_execution_insert() RETURNS TRIGGER AS $$ -BEGIN - PERFORM pg_notify('job_execution', NEW.job_type); - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION notify_job_execution_update() RETURNS TRIGGER AS $$ -BEGIN - IF NEW.execute_at IS DISTINCT FROM OLD.execute_at THEN - PERFORM pg_notify('job_execution', NEW.job_type); - END IF; - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER job_executions_notify_insert_trigger -AFTER INSERT ON job_executions -FOR EACH ROW -EXECUTE FUNCTION notify_job_execution_insert(); - -CREATE TRIGGER job_executions_notify_update_trigger -AFTER UPDATE ON job_executions -FOR EACH ROW -EXECUTE FUNCTION notify_job_execution_update(); - -CREATE OR REPLACE FUNCTION notify_job_execution_delete() RETURNS TRIGGER AS $$ -BEGIN - IF OLD.queue_id IS NOT NULL THEN - PERFORM pg_notify('job_execution', OLD.job_type); - END IF; - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER job_executions_notify_delete_trigger -AFTER DELETE ON job_executions -FOR EACH ROW -EXECUTE FUNCTION notify_job_execution_delete(); diff --git a/src/config.rs b/src/config.rs index a20c0e5..00d18c7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ use derive_builder::Builder; use es_entity::clock::{Clock, ClockHandle}; +use es_entity::db; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -41,64 +42,21 @@ impl Default for JobPollerConfig { #[builder(build_fn(skip))] /// Configuration consumed by [`Jobs::init`](crate::Jobs::init). /// Build with [`JobSvcConfig::builder`](Self::builder). -/// -/// # Examples -/// -/// Build a configuration that manages its own Postgres pool from a connection string: -/// -/// ```no_run -/// use job::{Jobs, JobSvcConfig}; -/// use job::error::JobError; -/// -/// # async fn run() -> Result<(), JobError> { -/// let config = JobSvcConfig::builder() -/// .pg_con("postgres://postgres:password@localhost/postgres") -/// .build() -/// .unwrap(); -/// -/// let mut jobs = Jobs::init(config).await?; -/// jobs.start_poll().await?; -/// # Ok(()) -/// # } -/// # tokio::runtime::Runtime::new().unwrap().block_on(run()).unwrap(); -/// ``` -/// -/// Reuse an existing `sqlx::PgPool` instead: -/// -/// ```no_run -/// use job::{Jobs, JobSvcConfig}; -/// use job::error::JobError; -/// use sqlx::postgres::PgPoolOptions; -/// -/// # async fn run() -> Result<(), JobError> { -/// let pool = PgPoolOptions::new() -/// .connect_lazy("postgres://postgres:password@localhost/postgres")?; -/// -/// let config = JobSvcConfig::builder() -/// .pool(pool) -/// .exec_migrations(false) // migrations already handled elsewhere -/// .build() -/// .unwrap(); -/// -/// let mut jobs = Jobs::init(config).await?; -/// jobs.start_poll().await?; -/// # Ok(()) -/// # } -/// # tokio::runtime::Runtime::new().unwrap().block_on(run()).unwrap(); -/// ``` pub struct JobSvcConfig { #[builder(setter(into, strip_option), default)] - /// Provide a Postgres connection string used to build an internal pool. Mutually exclusive with `pool`. When set, `exec_migrations` defaults to `true` unless overridden. - pub(super) pg_con: Option, + /// Provide a SQLite connection string used to build an internal pool. Mutually exclusive with `pool`. + /// When set, `exec_migrations` defaults to `true` unless overridden. + pub(super) db_con: Option, #[builder(setter(into, strip_option), default)] /// Override the maximum number of connections the internally managed pool may open. Ignored when `pool` is supplied. pub(super) max_connections: Option, #[builder(default)] - /// Set to `true` to have `Jobs::init` run the embedded database migrations during startup. Defaults to `false`, unless `pg_con` is supplied without an explicit value. + /// Set to `true` to have `Jobs::init` run the embedded database migrations during startup. + /// Defaults to `false`, unless `db_con` is supplied without an explicit value. pub(super) exec_migrations: bool, #[builder(setter(into, strip_option), default)] - /// Inject an existing `sqlx::PgPool` instead of letting the job service build one. Mutually exclusive with `pg_con`. - pub(super) pool: Option, + /// Inject an existing pool instead of letting the job service build one. Mutually exclusive with `db_con`. + pub(super) pool: Option, #[builder(default)] /// Override the defaults that control how the background poller distributes work across processes. pub poller_config: JobPollerConfig, @@ -116,24 +74,24 @@ impl JobSvcConfig { } impl JobSvcConfigBuilder { - /// Validate and construct a [`JobSvcConfig`], ensuring either `pg_con` or `pool` is set. + /// Validate and construct a [`JobSvcConfig`], ensuring either `db_con` or `pool` is set. pub fn build(&mut self) -> Result { // Validate configuration - match (self.pg_con.as_ref(), self.pool.as_ref()) { + match (self.db_con.as_ref(), self.pool.as_ref()) { (None, None) | (Some(None), None) | (None, Some(None)) => { - return Err("One of pg_con or pool must be set".to_string()); + return Err("One of db_con or pool must be set".to_string()); } - (Some(_), Some(_)) => return Err("Only one of pg_con or pool must be set".to_string()), + (Some(_), Some(_)) => return Err("Only one of db_con or pool must be set".to_string()), _ => (), } - // If pg_con is provided and exec_migrations is not explicitly set, default to true - if matches!(self.pg_con.as_ref(), Some(Some(_))) && self.exec_migrations.is_none() { + // If db_con is provided and exec_migrations is not explicitly set, default to true + if matches!(self.db_con.as_ref(), Some(Some(_))) && self.exec_migrations.is_none() { self.exec_migrations = Some(true); } Ok(JobSvcConfig { - pg_con: self.pg_con.clone().flatten(), + db_con: self.db_con.clone().flatten(), max_connections: self.max_connections.flatten(), exec_migrations: self.exec_migrations.unwrap_or(false), pool: self.pool.clone().flatten(), diff --git a/src/current.rs b/src/current.rs index 422c32b..88d0159 100644 --- a/src/current.rs +++ b/src/current.rs @@ -1,8 +1,8 @@ //! Execution-time helpers available to running jobs. use es_entity::clock::ClockHandle; +use es_entity::db; use serde::{Serialize, de::DeserializeOwned}; -use sqlx::PgPool; use super::{JobId, error::JobError}; @@ -10,7 +10,7 @@ use super::{JobId, error::JobError}; pub struct CurrentJob { id: JobId, attempt: u32, - pool: PgPool, + pool: db::Pool, execution_state_json: Option, shutdown_rx: tokio::sync::broadcast::Receiver< tokio::sync::mpsc::Sender>, @@ -22,7 +22,7 @@ impl CurrentJob { pub(super) fn new( id: JobId, attempt: u32, - pool: PgPool, + pool: db::Pool, execution_state: Option, shutdown_rx: tokio::sync::broadcast::Receiver< tokio::sync::mpsc::Sender>, @@ -60,15 +60,17 @@ impl CurrentJob { ) -> Result<(), JobError> { let execution_state_json = serde_json::to_value(execution_state) .map_err(JobError::CouldNotSerializeExecutionState)?; - sqlx::query!( + let json_str = serde_json::to_string(&execution_state_json) + .map_err(JobError::CouldNotSerializeExecutionState)?; + sqlx::query( r#" UPDATE job_executions - SET execution_state_json = $1 - WHERE id = $2 + SET execution_state_json = ?1 + WHERE id = ?2 "#, - execution_state_json, - &self.id as &JobId ) + .bind(&json_str) + .bind(self.id) .execute(op.as_executor()) .await?; self.execution_state_json = Some(execution_state_json); @@ -81,15 +83,17 @@ impl CurrentJob { ) -> Result<(), JobError> { let execution_state_json = serde_json::to_value(execution_state) .map_err(JobError::CouldNotSerializeExecutionState)?; - sqlx::query!( + let json_str = serde_json::to_string(&execution_state_json) + .map_err(JobError::CouldNotSerializeExecutionState)?; + sqlx::query( r#" UPDATE job_executions - SET execution_state_json = $1 - WHERE id = $2 + SET execution_state_json = ?1 + WHERE id = ?2 "#, - execution_state_json, - &self.id as &JobId ) + .bind(&json_str) + .bind(self.id) .execute(&self.pool) .await?; self.execution_state_json = Some(execution_state_json); @@ -100,7 +104,7 @@ impl CurrentJob { &self.id } - pub fn pool(&self) -> &PgPool { + pub fn pool(&self) -> &db::Pool { &self.pool } @@ -126,24 +130,6 @@ impl CurrentJob { /// /// Job runners can use this to detect when the application is shutting down /// and perform cleanup or finish their current work gracefully. - /// - /// # Example - /// - /// ```no_run - /// # use job::CurrentJob; - /// # async fn example(mut current_job: CurrentJob) { - /// tokio::select! { - /// _ = current_job.shutdown_requested() => { - /// // Shutdown requested, exit gracefully - /// return; - /// } - /// result = do_work() => { - /// // Normal work completion - /// } - /// } - /// # } - /// # async fn do_work() {} - /// ``` pub async fn shutdown_requested(&mut self) -> bool { self.shutdown_rx.recv().await.is_ok() } diff --git a/src/dispatcher.rs b/src/dispatcher.rs index ced1fcf..82a654f 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -272,17 +272,17 @@ impl JobDispatcher { self.rescheduled = true; span.record("will_retry", true); - sqlx::query!( + sqlx::query( r#" UPDATE job_executions - SET state = 'pending', execute_at = $2, attempt_index = $3, poller_instance_id = NULL - WHERE id = $1 AND poller_instance_id = $4 + SET state = 'pending', execute_at = ?1, attempt_index = ?2, poller_instance_id = NULL + WHERE id = ?3 AND poller_instance_id = ?4 "#, - id as JobId, - reschedule_at, - next_attempt as i32, - self.instance_id ) + .bind(reschedule_at) + .bind(next_attempt as i32) + .bind(id) + .bind(self.instance_id.to_string()) .execute(op.as_executor()) .await?; } else { @@ -292,14 +292,14 @@ impl JobDispatcher { ); span.record("will_retry", false); - sqlx::query!( + sqlx::query( r#" DELETE FROM job_executions - WHERE id = $1 AND poller_instance_id = $2 + WHERE id = ?1 AND poller_instance_id = ?2 "#, - id as JobId, - self.instance_id ) + .bind(id) + .bind(self.instance_id.to_string()) .execute(op.as_executor()) .await?; } @@ -316,14 +316,14 @@ impl JobDispatcher { id: JobId, ) -> Result<(), JobError> { let mut job = self.repo.find_by_id(&id).await?; - sqlx::query!( + sqlx::query( r#" DELETE FROM job_executions - WHERE id = $1 AND poller_instance_id = $2 + WHERE id = ?1 AND poller_instance_id = ?2 "#, - id as JobId, - self.instance_id ) + .bind(id) + .bind(self.instance_id.to_string()) .execute(op.as_executor()) .await?; job.complete_job(); @@ -340,16 +340,16 @@ impl JobDispatcher { ) -> Result<(), JobError> { self.rescheduled = true; let mut job = self.repo.find_by_id(&id).await?; - sqlx::query!( + sqlx::query( r#" UPDATE job_executions - SET state = 'pending', execute_at = $2, attempt_index = 1, poller_instance_id = NULL - WHERE id = $1 AND poller_instance_id = $3 + SET state = 'pending', execute_at = ?1, attempt_index = 1, poller_instance_id = NULL + WHERE id = ?2 AND poller_instance_id = ?3 "#, - id as JobId, - reschedule_at, - self.instance_id ) + .bind(reschedule_at) + .bind(id) + .bind(self.instance_id.to_string()) .execute(op.as_executor()) .await?; job.reschedule_execution(reschedule_at); diff --git a/src/entity.rs b/src/entity.rs index aa9e43e..65fd1bc 100644 --- a/src/entity.rs +++ b/src/entity.rs @@ -5,7 +5,7 @@ use derive_builder::Builder; use rand::{RngExt, rng}; use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, time::Duration}; +use std::time::Duration; use es_entity::{context::TracingContext, *}; @@ -16,19 +16,19 @@ use crate::{JobId, error::JobError}; #[serde(transparent)] /// Identifier describing a job type or class of work. /// -/// Use `JobType::new` for static name registration. +/// Use `JobType::new` for name registration. /// /// # Examples /// /// ```rust /// use job::JobType; /// -/// const CLEANUP_JOB: JobType = JobType::new("cleanup-job"); +/// let cleanup_job = JobType::new("cleanup-job"); /// ``` -pub struct JobType(Cow<'static, str>); +pub struct JobType(String); impl JobType { - pub const fn new(job_type: &'static str) -> Self { - JobType(Cow::Borrowed(job_type)) + pub fn new(job_type: &str) -> Self { + JobType(job_type.to_string()) } pub fn as_str(&self) -> &str { @@ -37,7 +37,7 @@ impl JobType { #[cfg(test)] pub(crate) fn from_owned(job_type: String) -> Self { - JobType(Cow::Owned(job_type)) + JobType(job_type) } } diff --git a/src/error.rs b/src/error.rs index ae50ecd..cc5c9f3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,18 +54,40 @@ impl From> for JobError { impl From for JobError { fn from(error: JobCreateError) -> Self { - match error { + match &error { JobCreateError::ConstraintViolation { column: Some(super::repo::JobColumn::Id), value, .. - } => Self::DuplicateId(value), + } => Self::DuplicateId(value.clone()), JobCreateError::ConstraintViolation { column: Some(super::repo::JobColumn::JobType), value, .. - } => Self::DuplicateUniqueJobType(value), - other => Self::Create(other), + } => Self::DuplicateUniqueJobType(value.clone()), + // SQLite does not return structured constraint names, so we + // use the extended error code and error message to classify. + JobCreateError::ConstraintViolation { + column: None, + inner, + .. + } => { + // Extended error code 1555 = SQLITE_CONSTRAINT_PRIMARYKEY + let is_pk = matches!(inner, sqlx::Error::Database(db_err) if db_err.code().as_deref() == Some("1555")); + if is_pk { + Self::DuplicateId(None) + } else { + let msg = inner.to_string(); + if msg.contains("jobs.id") { + Self::DuplicateId(None) + } else if msg.contains("jobs.job_type") { + Self::DuplicateUniqueJobType(None) + } else { + Self::Create(error) + } + } + } + _ => Self::Create(error), } } } diff --git a/src/lib.rs b/src/lib.rs index 4086013..9fa73cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,205 +1,6 @@ -//! `job` is an async, Postgres-backed job scheduler and runner for Rust +//! `job` is an async, SQLite-backed job scheduler and runner for Rust //! applications. It coordinates distributed workers, tracks job history, and -//! handles retries with predictable backoff. Inspired by earlier systems like -//! [Sidekiq](https://sidekiq.org), it focuses on running your -//! application code asynchronously, outside of request/response paths while keeping business -//! logic in familiar Rust async functions. The crate uses [`sqlx`] for -//! database access and forbids `unsafe`. -//! -//! ## Documentation -//! - [Github repository](https://github.com/GaloyMoney/job) -//! - [Cargo package](https://crates.io/crates/job) -//! -//! ## Highlights -//! - Durable Postgres-backed storage so jobs survive restarts and crashes. -//! - Automatic exponential backoff with jitter, plus opt-in infinite retries. -//! - Concurrency controls that let many worker instances share the workload, -//! configurable through [`JobPollerConfig`]. -//! - Optional at-most-one-per-type queueing via [`JobSpawner::spawn_unique`]. -//! - Built-in migrations that you can run automatically or embed into your own -//! migration workflow. -//! -//! ## Core Concepts -//! - **Jobs service** – [`Jobs`] owns registration, polling, and shutdown. -//! - **Initializer** – [`JobInitializer`] registers a job type and builds a -//! [`JobRunner`] for each execution. Defines the associated `Config` type. -//! - **Spawner** – [`JobSpawner`] is returned from registration and provides -//! type-safe job creation methods. Parameterized by the config type. -//! - **Runner** – [`JobRunner`] performs the work using the provided -//! [`CurrentJob`] context. -//! - **Current job** – [`CurrentJob`] exposes attempt counts, execution state, -//! and access to the Postgres pool during a run. -//! - **Completion** – [`JobCompletion`] returns the outcome: finish, retry, or -//! reschedule at a later time. -//! -//! ## Lifecycle -//! -//! 1. Initialize the service with [`Jobs::init`] -//! 2. Register initializers with [`Jobs::add_initializer`] – returns a [`JobSpawner`] -//! 3. Start polling with [`Jobs::start_poll`] -//! 4. Use spawners to create jobs throughout your application -//! 5. Shut down gracefully with [`Jobs::shutdown`] -//! -//! ## Example -//! -//! ```ignore -//! use async_trait::async_trait; -//! use job::{ -//! CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner, -//! JobSpawner, JobSvcConfig, JobType, Jobs, -//! }; -//! use serde::{Deserialize, Serialize}; -//! -//! // 1. Define your config (serialized to the database) -//! #[derive(Debug, Serialize, Deserialize)] -//! struct MyConfig { -//! value: i32, -//! } -//! -//! // 2. Define your initializer -//! struct MyInitializer; -//! -//! impl JobInitializer for MyInitializer { -//! type Config = MyConfig; -//! -//! fn job_type(&self) -> JobType { -//! JobType::new("my-job") -//! } -//! -//! fn init(&self, job: &Job) -> Result, Box> { -//! let config: MyConfig = job.config()?; -//! Ok(Box::new(MyRunner { value: config.value })) -//! } -//! } -//! -//! // 3. Define your runner -//! struct MyRunner { -//! value: i32, -//! } -//! -//! #[async_trait] -//! impl JobRunner for MyRunner { -//! async fn run( -//! &self, -//! _current_job: CurrentJob, -//! ) -> Result> { -//! println!("Processing value: {}", self.value); -//! Ok(JobCompletion::Complete) -//! } -//! } -//! -//! // 4. Wire it up -//! #[tokio::main] -//! async fn main() -> Result<(), Box> { -//! let config = JobSvcConfig::builder() -//! .pg_con("postgres://user:pass@localhost/db") -//! .build()?; -//! -//! let mut jobs = Jobs::init(config).await?; -//! -//! // Registration returns a type-safe spawner -//! let spawner: JobSpawner = jobs.add_initializer(MyInitializer); -//! -//! jobs.start_poll().await?; -//! -//! // Use the spawner to create jobs -//! spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?; -//! -//! Ok(()) -//! } -//! ``` -//! -//! ## Scheduling -//! -//! Jobs run immediately once a poller claims them. If you need a future start -//! time, schedule it up front with [`JobSpawner::spawn_at_in_op`]. After a -//! run completes, return [`JobCompletion::Complete`] for one-off work or use the -//! `JobCompletion::Reschedule*` variants to book the next execution. -//! -//! ## Retries -//! -//! Retry behaviour comes from [`JobInitializer::retry_on_error_settings`]. Once -//! attempts are exhausted the job is marked as errored and removed from the -//! queue. -//! -//! ```ignore -//! impl JobInitializer for MyInitializer { -//! // ... -//! -//! fn retry_on_error_settings(&self) -> RetrySettings { -//! RetrySettings { -//! n_attempts: Some(5), -//! min_backoff: Duration::from_secs(10), -//! max_backoff: Duration::from_secs(300), -//! ..Default::default() -//! } -//! } -//! } -//! ``` -//! -//! ## Uniqueness -//! -//! For at-most-one semantics, use [`JobSpawner::spawn_unique`]. This method -//! consumes the spawner, enforcing at the type level that only one job of -//! this type can exist: -//! -//! ```ignore -//! let cleanup_spawner = jobs.add_initializer(CleanupInitializer); -//! -//! // Consumes spawner - can't accidentally spawn twice -//! cleanup_spawner.spawn_unique(JobId::new(), CleanupConfig::default()).await?; -//! ``` -//! -//! ## Parameterized Job Types -//! -//! For cases where the job type is configured at runtime (e.g., multi-tenant inboxes), -//! store the job type in your initializer and return it from the instance method: -//! -//! ```ignore -//! struct TenantJobInitializer { -//! job_type: JobType, -//! tenant_id: String, -//! } -//! -//! impl JobInitializer for TenantJobInitializer { -//! type Config = TenantJobConfig; -//! -//! fn job_type(&self) -> JobType { -//! self.job_type.clone() // From instance, not hardcoded -//! } -//! -//! // ... -//! } -//! ``` -//! -//! ## Database migrations -//! -//! See the [setup guide](https://github.com/GaloyMoney/job/blob/main/README.md#setup) -//! for migration options and examples. -//! -//! ## Feature flags -//! -//! - `es-entity` enables advanced integration with the [`es_entity`] crate, -//! allowing runners to finish with `DbOp` handles and enriching tracing/event -//! metadata. -//! -//! ## Testing with simulated time -//! -//! For deterministic testing of time-dependent behavior (e.g., backoff strategies), -//! inject an artificial clock via [`JobSvcConfig::clock`]: -//! -//! ```ignore -//! use job::{JobSvcConfig, ClockHandle, ArtificialClockConfig}; -//! -//! let (clock, controller) = ClockHandle::artificial(ArtificialClockConfig::manual()); -//! let config = JobSvcConfig::builder() -//! .pool(pool) -//! .clock(clock) -//! .build()?; -//! -//! // Advance time deterministically -//! controller.advance(Duration::from_secs(60)).await; -//! ``` +//! handles retries with predictable backoff. #![cfg_attr(feature = "fail-on-warnings", deny(warnings))] #![cfg_attr(feature = "fail-on-warnings", deny(clippy::all))] @@ -256,18 +57,18 @@ pub struct Jobs { impl Jobs { /// Initialize the service using a [`JobSvcConfig`] for connection and runtime settings. pub async fn init(config: JobSvcConfig) -> Result { - let pool = match (config.pool.clone(), config.pg_con.clone()) { + let pool = match (config.pool.clone(), config.db_con.clone()) { (Some(pool), None) => pool, - (None, Some(pg_con)) => { - let mut pool_opts = sqlx::postgres::PgPoolOptions::new(); + (None, Some(db_con)) => { + let mut pool_opts = sqlx::sqlite::SqlitePoolOptions::new(); if let Some(max_connections) = config.max_connections { pool_opts = pool_opts.max_connections(max_connections); } - pool_opts.connect(&pg_con).await? + pool_opts.connect(&db_con).await? } _ => { return Err(JobError::Config( - "One of pg_con or pool must be set".to_string(), + "One of db_con or pool must be set".to_string(), )); } }; @@ -288,150 +89,7 @@ impl Jobs { }) } - /// Start the background poller that fetches and dispatches jobs from Postgres. - /// - /// Call this only after registering every job initializer. The call consumes the internal - /// registry; attempting to register additional initializers or starting the poller again - /// afterwards will panic with `Registry has been consumed by executor`. - /// - /// # Errors - /// - /// Returns [`JobError::Sqlx`] if the poller cannot initialise its database listeners or - /// supporting tasks. - /// - /// # Panics - /// - /// Panics if invoked more than once, or if [`Jobs::add_initializer`] is called after the - /// poller has started. - /// - /// # Examples - /// - /// Register any initializers and then start the poller: - /// - /// ```no_run - /// use job::{ - /// Jobs, JobSvcConfig, Job, JobId, JobInitializer, JobRunner, JobType, JobCompletion, - /// CurrentJob, JobSpawner - /// }; - /// use job::error::JobError; - /// use async_trait::async_trait; - /// use serde::{Serialize, Deserialize}; - /// use sqlx::postgres::PgPoolOptions; - /// use std::error::Error; - /// - /// #[derive(Debug, Serialize, Deserialize)] - /// struct MyConfig { - /// value: i32, - /// } - /// - /// struct MyInitializer; - /// - /// impl JobInitializer for MyInitializer { - /// type Config = MyConfig; - /// - /// fn job_type(&self) -> JobType { - /// JobType::new("example") - /// } - /// - /// fn init(&self, _job: &Job, _: JobSpawner) -> Result, Box> { - /// Ok(Box::new(MyRunner)) - /// } - /// } - /// - /// struct MyRunner; - /// - /// #[async_trait] - /// impl JobRunner for MyRunner { - /// async fn run( - /// &self, - /// _current_job: CurrentJob, - /// ) -> Result> { - /// Ok(JobCompletion::Complete) - /// } - /// } - /// - /// # async fn example() -> Result<(), JobError> { - /// let pool = PgPoolOptions::new() - /// .connect_lazy("postgres://postgres:password@localhost/postgres")?; - /// let config = JobSvcConfig::builder().pool(pool).build().unwrap(); - /// let mut jobs = Jobs::init(config).await?; - /// - /// // Registration returns a type-safe spawner - /// let spawner: JobSpawner = jobs.add_initializer(MyInitializer); - /// - /// jobs.start_poll().await?; - /// - /// // Use the spawner to create jobs - /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?; - /// # Ok(()) - /// # } - /// # tokio::runtime::Runtime::new().unwrap().block_on(example()).unwrap(); - /// ``` - /// - /// Calling `start_poll` again, or attempting to register a new initializer afterwards, - /// results in a panic: - /// - /// ```no_run - /// use job::{ - /// Jobs, JobSvcConfig, Job, JobInitializer, JobRunner, JobType, JobCompletion, CurrentJob, - /// JobSpawner, - /// }; - /// use job::error::JobError; - /// use async_trait::async_trait; - /// use serde::{Serialize, Deserialize}; - /// use sqlx::postgres::PgPoolOptions; - /// use std::error::Error; - /// - /// #[derive(Debug, Serialize, Deserialize)] - /// struct MyConfig { - /// value: i32, - /// } - /// - /// struct MyInitializer; - /// - /// impl JobInitializer for MyInitializer { - /// type Config = MyConfig; - /// - /// fn job_type(&self) -> JobType { - /// JobType::new("example") - /// } - /// - /// fn init(&self, _job: &Job, _spawner: JobSpawner) -> Result, Box> { - /// Ok(Box::new(MyRunner)) - /// } - /// } - /// - /// struct MyRunner; - /// - /// #[async_trait] - /// impl JobRunner for MyRunner { - /// async fn run( - /// &self, - /// _current_job: CurrentJob, - /// ) -> Result> { - /// Ok(JobCompletion::Complete) - /// } - /// } - /// - /// # async fn double_start() -> Result<(), JobError> { - /// let pool = PgPoolOptions::new() - /// .connect_lazy("postgres://postgres:password@localhost/postgres")?; - /// let config = JobSvcConfig::builder().pool(pool).build().unwrap(); - /// let mut jobs = Jobs::init(config).await?; - /// - /// let _spawner = jobs.add_initializer(MyInitializer); - /// - /// jobs.start_poll().await?; - /// - /// // Panics with "Registry has been consumed by executor". - /// // jobs.start_poll().await.unwrap(); - /// - /// // Also panics because the registry moved into the poller. - /// // jobs.add_initializer(MyInitializer); - /// # Ok(()) - /// # } - /// # tokio::runtime::Runtime::new().unwrap().block_on(double_start()).unwrap(); - /// ``` + /// Start the background poller that fetches and dispatches jobs. pub async fn start_poll(&mut self) -> Result<(), JobError> { let registry = self .registry @@ -454,13 +112,6 @@ impl Jobs { /// Register a [`JobInitializer`] and return a [`JobSpawner`] for creating jobs. /// - /// # Examples - /// - /// ```ignore - /// let spawner = jobs.add_initializer(MyInitializer); - /// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?; - /// ``` - /// /// # Panics /// /// Panics if called after [`start_poll`](Self::start_poll). @@ -492,12 +143,11 @@ impl Jobs { let mut job = self.repo.find_by_id(id).await?; if job.cancel().did_execute() { - let result = sqlx::query!( - r#"DELETE FROM job_executions WHERE id = $1 AND state = 'pending'"#, - id as JobId, - ) - .execute(op.as_executor()) - .await?; + let result: sqlx::sqlite::SqliteQueryResult = + sqlx::query("DELETE FROM job_executions WHERE id = ? AND state = 'pending'") + .bind(id.to_string()) + .execute(op.as_executor()) + .await?; if result.rows_affected() == 0 { return Err(JobError::CannotCancelJob); @@ -516,13 +166,6 @@ impl Jobs { } /// Gracefully shut down the job poller. - /// - /// This method is idempotent and can be called multiple times safely. - /// It will send a shutdown signal to all running jobs, wait briefly for them - /// to complete, and reschedule any jobs still running. - /// - /// If not called manually, shutdown will be automatically triggered when the - /// Jobs instance is dropped. #[instrument(name = "job.shutdown", skip(self), err)] pub async fn shutdown(&self) -> Result<(), JobError> { if let Some(handle) = &self.poller_handle { diff --git a/src/poller.rs b/src/poller.rs index f735d25..6cc73e9 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -1,7 +1,8 @@ use es_entity::AtomicOperation; use es_entity::clock::ClockHandle; +use es_entity::db; use serde_json::Value as JsonValue; -use sqlx::postgres::{PgListener, PgPool, types::PgInterval}; +use sqlx::Row; use tracing::{Span, instrument}; use std::{ @@ -64,7 +65,9 @@ pub(crate) struct JobPollerHandle { clock: ClockHandle, } -const MAX_WAIT: Duration = Duration::from_secs(60); +// Without PgListener (LISTEN/NOTIFY), the poller relies on periodic +// polling. Keep this short so newly inserted jobs are picked up promptly. +const MAX_WAIT: Duration = Duration::from_secs(1); impl JobPoller { pub fn new( @@ -91,7 +94,6 @@ impl JobPoller { } pub async fn start(self) -> Result { - let listener_handle = self.start_listener().await?; let lost_handle = self.start_lost_handler(); let keep_alive_handle = self.start_keep_alive_handler(); let shutdown_tx = self.shutdown_tx.clone(); @@ -103,12 +105,7 @@ impl JobPoller { let executor = Arc::new(self); let handle = OwnedTaskHandle::new(spawn_named_task!( "job-poller-main-loop", - Self::main_loop( - Arc::clone(&executor), - listener_handle, - lost_handle, - keep_alive_handle, - ) + Self::main_loop(Arc::clone(&executor), lost_handle, keep_alive_handle,) )); Ok(JobPollerHandle { poller: executor, @@ -125,7 +122,6 @@ impl JobPoller { async fn main_loop( self: Arc, - _listener_task: OwnedTaskHandle, _lost_task: OwnedTaskHandle, _keep_alive_task: OwnedTaskHandle, ) { @@ -155,6 +151,12 @@ impl JobPoller { result = self.clock.timeout(timeout, self.tracker.notified()) => { woken_up = result.is_ok(); } + // Real-time fallback: ensures the poller wakes even when an + // artificial (manual) clock has already been advanced past + // the timeout target before it was registered. + _ = tokio::time::sleep(MAX_WAIT) => { + woken_up = false; + } } } } @@ -202,32 +204,6 @@ impl JobPoller { Ok(Duration::ZERO) } - async fn start_listener(&self) -> Result { - let mut listener = PgListener::connect_with(self.repo.pool()).await?; - listener.listen("job_execution").await?; - let tracker = self.tracker.clone(); - let supported_job_types = self.registry.registered_job_types(); - Ok(OwnedTaskHandle::new(spawn_named_task!( - "job-poller-listener", - async move { - loop { - match listener.recv().await { - Ok(notification) => { - let job_type = notification.payload(); - // Only wake the tracker if this is a job type we support - if supported_job_types.iter().any(|jt| jt.as_str() == job_type) { - tracker.job_execution_inserted(); - } - } - Err(_) => { - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - } - ))) - } - fn start_lost_handler(&self) -> OwnedTaskHandle { let job_lost_interval = self.config.job_lost_interval; let pool = self.repo.pool().clone(); @@ -246,26 +222,30 @@ impl JobPoller { ); let _guard = span.enter(); - if let Ok(rows) = sqlx::query!( - r#" - UPDATE job_executions - SET state = 'pending', execute_at = $1, attempt_index = attempt_index + 1, poller_instance_id = NULL - WHERE state = 'running' AND alive_at < $1::timestamptz - RETURNING id as id - "#, - check_time, - ) - .fetch_all(&pool) - .await - && !rows.is_empty() - { + if let Ok(rows) = sqlx::query( + r#" + UPDATE job_executions + SET state = 'pending', execute_at = ?1, attempt_index = attempt_index + 1, poller_instance_id = NULL + WHERE state = 'running' AND alive_at < ?1 + RETURNING id + "#, + ) + .bind(check_time) + .fetch_all(&pool) + .await + { + if !rows.is_empty() { Span::current().record("n_lost_jobs", rows.len()); - for row in rows { - tracing::error!(job_id = %row.id, "lost job"); + for row in &rows { + let id: String = row.get("id"); + tracing::error!(job_id = %id, "lost job"); } } else { Span::current().record("n_lost_jobs", 0); } + } else { + Span::current().record("n_lost_jobs", 0); + } } })) } @@ -290,15 +270,15 @@ impl JobPoller { ); let _guard = span.enter(); - let timeout = match sqlx::query!( + let timeout = match sqlx::query( r#" UPDATE job_executions - SET alive_at = $1 - WHERE poller_instance_id = $2 AND state = 'running' + SET alive_at = ?1 + WHERE poller_instance_id = ?2 AND state = 'running' "#, - now, - instance_id, ) + .bind(now) + .bind(instance_id.to_string()) .execute(&pool) .await { @@ -434,7 +414,7 @@ impl JobPoller { #[instrument(name = "job.poll_jobs", level = "debug", skip(pool, supported_job_types, clock), fields(n_jobs_to_poll, instance_id = %instance_id, n_jobs_found = tracing::field::Empty), err)] async fn poll_jobs( - pool: &PgPool, + pool: &db::Pool, n_jobs_to_poll: usize, instance_id: uuid::Uuid, supported_job_types: &[super::entity::JobType], @@ -443,78 +423,114 @@ async fn poll_jobs( let now = clock.now(); Span::current().record("now", tracing::field::display(now)); - let rows = sqlx::query_as!( - JobPollRow, + if supported_job_types.is_empty() { + return Ok(JobPollResult::WaitTillNextJob(MAX_WAIT)); + } + + // Build dynamic IN clause placeholders for supported job types + // Parameters: ?1 = now, ?2 = instance_id, ?3 = limit, ?4.. = job_types + let type_placeholders: Vec = (0..supported_job_types.len()) + .map(|i| format!("?{}", i + 4)) + .collect(); + let in_clause = type_placeholders.join(", "); + + // Step 1: Select and update eligible jobs atomically + let update_query = format!( r#" - WITH eligible AS ( - SELECT id, queue_id, execute_at, execution_state_json, attempt_index + UPDATE job_executions + SET state = 'running', alive_at = ?1, execute_at = NULL, poller_instance_id = ?2 + WHERE id IN ( + SELECT e.id FROM ( + SELECT id, execute_at, + ROW_NUMBER() OVER ( + PARTITION BY COALESCE(queue_id, CAST(id AS TEXT)) + ORDER BY execute_at + ) AS rn + FROM job_executions + WHERE state = 'pending' + AND execute_at <= ?1 + AND job_type IN ({in_clause}) + AND NOT EXISTS ( + SELECT 1 FROM job_executions AS running + WHERE running.state = 'running' + AND running.queue_id IS NOT NULL + AND running.queue_id = job_executions.queue_id + ) + ) e WHERE e.rn = 1 + ORDER BY e.execute_at ASC + LIMIT ?3 + ) + RETURNING id, execution_state_json, attempt_index + "# + ); + + let mut q = sqlx::query(&update_query) + .bind(now) + .bind(instance_id.to_string()) + .bind(n_jobs_to_poll as i32); + for jt in supported_job_types { + q = q.bind(jt.as_str()); + } + let rows = q.fetch_all(pool).await?; + + if rows.is_empty() { + // Step 2: No jobs ready now — find the wait time to the next eligible job + let wait_type_placeholders: Vec = (0..supported_job_types.len()) + .map(|i| format!("?{}", i + 2)) + .collect(); + let wait_in_clause = wait_type_placeholders.join(", "); + let wait_query = format!( + r#" + SELECT MIN(execute_at) as next_execute_at FROM job_executions WHERE state = 'pending' - AND job_type = ANY($4) - AND NOT EXISTS ( - SELECT 1 FROM job_executions AS running - WHERE running.state = 'running' - AND running.queue_id IS NOT NULL - AND running.queue_id = job_executions.queue_id - ) - ), - min_wait AS ( - SELECT MIN(execute_at) - $2::timestamptz AS wait_time - FROM eligible - WHERE execute_at > $2::timestamptz - ), - candidates AS ( - SELECT id, execution_state_json AS data_json, attempt_index, - ROW_NUMBER() OVER ( - PARTITION BY COALESCE(queue_id, id::text) - ORDER BY execute_at - ) AS rn - FROM eligible - WHERE execute_at <= $2::timestamptz - ), - selected_jobs AS ( - SELECT je.id, c.data_json, c.attempt_index - FROM candidates c - JOIN job_executions je ON je.id = c.id - WHERE c.rn = 1 - ORDER BY je.execute_at ASC - LIMIT $1 - FOR UPDATE OF je - ), - updated AS ( - UPDATE job_executions AS je - SET state = 'running', alive_at = $2, execute_at = NULL, poller_instance_id = $3 - FROM selected_jobs - WHERE je.id = selected_jobs.id - RETURNING je.id, selected_jobs.data_json, je.attempt_index - ) - SELECT * FROM ( - SELECT - u.id AS "id?: JobId", - u.data_json AS "data_json?: JsonValue", - u.attempt_index AS "attempt_index?", - NULL::INTERVAL AS "max_wait?: PgInterval" - FROM updated u - UNION ALL - SELECT - NULL::UUID AS "id?: JobId", - NULL::JSONB AS "data_json?: JsonValue", - NULL::INT AS "attempt_index?", - mw.wait_time AS "max_wait?: PgInterval" - FROM min_wait mw - WHERE NOT EXISTS (SELECT 1 FROM updated) - ) AS result - "#, - n_jobs_to_poll as i32, - now, - instance_id, - supported_job_types as _, - ) - .fetch_all(pool) - .await?; + AND job_type IN ({wait_in_clause}) + AND execute_at > ?1 + AND NOT EXISTS ( + SELECT 1 FROM job_executions AS running + WHERE running.state = 'running' + AND running.queue_id IS NOT NULL + AND running.queue_id = job_executions.queue_id + ) + "# + ); + let mut wq = sqlx::query(&wait_query).bind(now); + for jt in supported_job_types { + wq = wq.bind(jt.as_str()); + } + let wait_row = wq.fetch_optional(pool).await?; + + let wait = wait_row + .and_then(|row| { + let next: Option> = row.get("next_execute_at"); + next.and_then(|t| (t - now).to_std().ok()) + }) + .unwrap_or(MAX_WAIT); + + Span::current().record("n_jobs_found", 0); + return Ok(JobPollResult::WaitTillNextJob(wait)); + } Span::current().record("n_jobs_found", rows.len()); - Ok(JobPollResult::from_rows(rows)) + + let jobs = rows + .iter() + .map(|row| { + let id: JobId = row.get("id"); + let data_json_str: Option = row.get("execution_state_json"); + let data_json = data_json_str + .as_deref() + .and_then(|s| serde_json::from_str::(s).ok()); + let attempt_index: i32 = row.get("attempt_index"); + PolledJob { + id, + data_json, + attempt: attempt_index as u32, + } + }) + .collect(); + + Ok(JobPollResult::Jobs(jobs)) } #[derive(Debug)] @@ -523,66 +539,8 @@ enum JobPollResult { WaitTillNextJob(Duration), } -#[derive(Debug)] -struct JobPollRow { - id: Option, - data_json: Option, - attempt_index: Option, - max_wait: Option, -} - -impl JobPollResult { - /// Convert raw query rows into a JobPollResult - pub fn from_rows(rows: Vec) -> Self { - if rows.is_empty() { - JobPollResult::WaitTillNextJob(MAX_WAIT) - } else if rows.len() == 1 && rows[0].id.is_none() { - if let Some(interval) = &rows[0].max_wait { - JobPollResult::WaitTillNextJob(pg_interval_to_duration(interval)) - } else { - JobPollResult::WaitTillNextJob(MAX_WAIT) - } - } else { - let jobs = rows - .into_iter() - .filter_map(|row| { - if let (Some(id), Some(attempt_index)) = (row.id, row.attempt_index) { - Some(PolledJob { - id, - data_json: row.data_json, - attempt: attempt_index as u32, - }) - } else { - None - } - }) - .collect(); - JobPollResult::Jobs(jobs) - } - } -} - -fn pg_interval_to_duration(interval: &PgInterval) -> Duration { - const SECONDS_PER_DAY: u64 = 24 * 60 * 60; - if interval.microseconds < 0 || interval.days < 0 || interval.months < 0 { - Duration::default() - } else { - let days = (interval.days as u64) + (interval.months as u64) * 30; - Duration::from_micros(interval.microseconds as u64) - + Duration::from_secs(days * SECONDS_PER_DAY) - } -} - impl JobPollerHandle { /// Gracefully shut down the job poller. - /// - /// This method is idempotent and can be called multiple times safely. - /// It will: - /// 1. Send shutdown signal to all running job tasks - /// 2. Wait briefly for tasks to complete naturally - /// 3. Reschedule any jobs still running for this instance - /// - /// If not called manually, it will be called automatically when the handle is dropped. pub async fn shutdown(&self) -> Result<(), JobError> { perform_shutdown( self.shutdown_tx.clone(), @@ -700,7 +658,6 @@ async fn perform_shutdown( } } else { // No active subscribers - wait for the shutdown timeout anyway - // to give jobs a chance to complete gracefully tracing::warn!("No active shutdown subscribers, waiting for shutdown timeout"); tokio::time::sleep(shutdown_timeout).await; } @@ -716,18 +673,18 @@ async fn kill_remaining_jobs( ) -> Result<(), JobError> { let mut op = repo.begin_op_with_clock(&clock).await?; let now = clock.now(); - let rows = sqlx::query!( + let rows = sqlx::query( r#" UPDATE job_executions SET state = 'pending', - execute_at = $1, + execute_at = ?1, poller_instance_id = NULL - WHERE poller_instance_id = $2 AND state = 'running' - RETURNING id as "id!: JobId", attempt_index + WHERE poller_instance_id = ?2 AND state = 'running' + RETURNING id, attempt_index "#, - now, - instance_id ) + .bind(now) + .bind(instance_id.to_string()) .fetch_all(op.as_executor()) .await?; @@ -739,8 +696,12 @@ async fn kill_remaining_jobs( } let attempt_map: std::collections::HashMap = rows - .into_iter() - .map(|r| (r.id, r.attempt_index as u32)) + .iter() + .map(|r| { + let id: JobId = r.get("id"); + let attempt_index: i32 = r.get("attempt_index"); + (id, attempt_index as u32) + }) .collect(); let ids: Vec = attempt_map.keys().copied().collect(); diff --git a/src/repo.rs b/src/repo.rs index 5cd84b0..cfd1d0e 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -1,4 +1,4 @@ -use sqlx::PgPool; +use es_entity::db; use es_entity::*; @@ -19,11 +19,11 @@ use crate::JobId; persist_event_context = false )] pub struct JobRepo { - pool: PgPool, + pool: db::Pool, } impl JobRepo { - pub(super) fn new(pool: &PgPool) -> Self { + pub(super) fn new(pool: &db::Pool) -> Self { Self { pool: pool.clone() } } } @@ -33,9 +33,11 @@ mod tests { use super::*; use crate::error::JobError; - pub async fn init_pool() -> anyhow::Result { - let pg_con = std::env::var("PG_CON").unwrap(); - let pool = sqlx::PgPool::connect(&pg_con).await?; + pub async fn init_pool() -> anyhow::Result { + let db_name = uuid::Uuid::now_v7(); + let url = format!("sqlite:file:{db_name}?mode=memory&cache=shared"); + let pool = db::Pool::connect(&url).await?; + sqlx::migrate!().run(&pool).await?; Ok(pool) } @@ -73,7 +75,8 @@ mod tests { .into(); assert!(matches!(err, JobError::DuplicateUniqueJobType(_))); - // Same type same id + // Same type same id — SQLite may report either the PK or unique + // index violation first when both are violated simultaneously. let new_job = NewJob::builder() .id(a_id) .unique_per_type(true) @@ -87,7 +90,13 @@ mod tests { .err() .expect("expected error") .into(); - assert!(matches!(err, JobError::DuplicateId(_))); + assert!( + matches!( + err, + JobError::DuplicateId(_) | JobError::DuplicateUniqueJobType(_) + ), + "Expected DuplicateId or DuplicateUniqueJobType, got: {err:?}" + ); let new_job = NewJob::builder() .id(JobId::new()) diff --git a/src/runner.rs b/src/runner.rs index 4938579..5be8100 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -16,20 +16,6 @@ pub trait JobInitializer: Send + Sync + 'static { type Config: Serialize + DeserializeOwned + Send + Sync; /// Returns the job type identifier. - /// - /// For simple cases, return a constant: - /// ```ignore - /// fn job_type(&self) -> JobType { - /// JobType::new("my-job") - /// } - /// ``` - /// - /// For configured/parameterized initializers, return from instance: - /// ```ignore - /// fn job_type(&self) -> JobType { - /// self.job_type.clone() - /// } - /// ``` fn job_type(&self) -> JobType; /// Retry settings to use when the runner returns an error. @@ -56,14 +42,14 @@ pub enum JobCompletion { /// Job finished and returns an `EsEntity` operation that the job service will commit. CompleteWithOp(es_entity::DbOp<'static>), /// Job finished and returns a transaction that the job service will commit. - CompleteWithTx(sqlx::Transaction<'static, sqlx::Postgres>), + CompleteWithTx(sqlx::Transaction<'static, es_entity::db::Db>), /// Schedule a new run immediately. RescheduleNow, #[cfg(feature = "es-entity")] /// Schedule a new run immediately and return an `EsEntity` operation that the job service will commit. RescheduleNowWithOp(es_entity::DbOp<'static>), /// Schedule a new run immediately and return a transaction that the job service will commit. - RescheduleNowWithTx(sqlx::Transaction<'static, sqlx::Postgres>), + RescheduleNowWithTx(sqlx::Transaction<'static, es_entity::db::Db>), /// Schedule the next run after a delay. RescheduleIn(std::time::Duration), #[cfg(feature = "es-entity")] @@ -71,7 +57,7 @@ pub enum JobCompletion { RescheduleInWithOp(es_entity::DbOp<'static>, std::time::Duration), /// Schedule the next run after a delay and return a transaction that the job service will commit. RescheduleInWithTx( - sqlx::Transaction<'static, sqlx::Postgres>, + sqlx::Transaction<'static, es_entity::db::Db>, std::time::Duration, ), /// Schedule the next run at an exact timestamp. @@ -80,7 +66,7 @@ pub enum JobCompletion { /// Schedule the next run at an exact timestamp and return an `EsEntity` operation that the job service will commit. RescheduleAtWithOp(es_entity::DbOp<'static>, DateTime), /// Schedule the next run at an exact timestamp and return a transaction that the job service will commit. - RescheduleAtWithTx(sqlx::Transaction<'static, sqlx::Postgres>, DateTime), + RescheduleAtWithTx(sqlx::Transaction<'static, es_entity::db::Db>, DateTime), } #[async_trait] @@ -95,8 +81,6 @@ pub trait JobRunner: Send + Sync + 'static { #[derive(Debug, Clone)] /// Controls retry attempt limits, telemetry escalation thresholds, and exponential backoff behaviour. -/// Use [`RetrySettings::n_warn_attempts`] to decide how many failures remain `WARN` events before -/// escalation. Set it to `None` to keep every retry at `WARN`. pub struct RetrySettings { /// Maximum number of consecutive attempts before the job is failed for good. `None` retries /// indefinitely. @@ -105,17 +89,13 @@ pub struct RetrySettings { /// promotes subsequent failures to `ERROR`. `None` disables escalation and keeps every retry /// at `WARN`. pub n_warn_attempts: Option, - /// Smallest backoff duration when rescheduling failures. Acts as the base for exponential - /// backoff growth. + /// Smallest backoff duration when rescheduling failures. pub min_backoff: std::time::Duration, - /// Maximum backoff duration. Once the exponentially increasing delay reaches this value it will - /// stop growing. + /// Maximum backoff duration. pub max_backoff: std::time::Duration, - /// Percentage (0-100) jitter applied to the computed backoff window to avoid thundering herds. + /// Percentage (0-100) jitter applied to the computed backoff window. pub backoff_jitter_pct: u8, - /// Multiplier applied to the previous backoff window. Once the elapsed time since the last - /// scheduled run exceeds `previous_backoff * attempt_reset_after_backoff_multiples`, the job is - /// treated as healthy again and the attempt counter resets to `1`. + /// Multiplier applied to the previous backoff window for attempt reset detection. pub attempt_reset_after_backoff_multiples: u32, } diff --git a/src/spawner.rs b/src/spawner.rs index 2d4031e..4304948 100644 --- a/src/spawner.rs +++ b/src/spawner.rs @@ -14,21 +14,6 @@ use super::{ }; /// Describes a job to be created as part of a bulk [`JobSpawner::spawn_all`] call. -/// -/// Use [`JobSpec::new`] to create a spec with just an id and config, then -/// chain [`JobSpec::schedule_at`] or [`JobSpec::queue_id`] for optional overrides. -/// -/// # Examples -/// -/// ```ignore -/// let specs = vec![ -/// JobSpec::new(JobId::new(), MyConfig { value: 1 }), -/// JobSpec::new(JobId::new(), MyConfig { value: 2 }) -/// .schedule_at(future_time) -/// .queue_id("my-queue"), -/// ]; -/// spawner.spawn_all(specs).await?; -/// ``` pub struct JobSpec { pub id: JobId, pub config: Config, @@ -61,16 +46,6 @@ impl JobSpec { /// /// Returned by [`crate::Jobs::add_initializer`]. The spawner encapsulates the job type /// and provides type-safe job creation methods. -/// -/// # Examples -/// -/// ```ignore -/// // Registration returns a spawner -/// let spawner = jobs.add_initializer(MyInitializer); -/// -/// // Use the spawner to create jobs -/// spawner.spawn(JobId::new(), MyConfig { value: 42 }).await?; -/// ``` #[derive(Clone)] pub struct JobSpawner { repo: Arc, @@ -195,8 +170,6 @@ where } /// Create and spawn a job within a queue as part of an existing atomic operation. - /// - /// At most one job per `queue_id` will run globally at any time. #[instrument( name = "job_spawner.spawn_with_queue_id_in_op", skip(self, op, config, queue_id), @@ -216,8 +189,6 @@ where } /// Create and spawn a job for execution at a specific time within a queue. - /// - /// At most one job per `queue_id` will run globally at any time. #[instrument( name = "job_spawner.spawn_at_with_queue_id", skip(self, config, queue_id), @@ -241,8 +212,6 @@ where /// Create and spawn a job for execution at a specific time within a queue, /// as part of an existing atomic operation. - /// - /// At most one job per `queue_id` will run globally at any time. #[instrument( name = "job_spawner.spawn_at_with_queue_id_in_op", skip(self, op, config, queue_id), @@ -264,7 +233,6 @@ where /// Create and spawn multiple jobs in a single atomic operation. /// /// All jobs are created within a single transaction — either all succeed or all roll back. - /// Each [`JobSpec`] can independently specify `schedule_at` and `queue_id`. #[instrument( name = "job_spawner.spawn_all", skip(self, specs), @@ -279,9 +247,6 @@ where } /// Create and spawn multiple jobs as part of an existing atomic operation. - /// - /// Each [`JobSpec`] can independently specify `schedule_at` and `queue_id`. - /// Internally uses batch inserts for both the job entities and `job_executions` rows. #[instrument( name = "job_spawner.spawn_all_in_op", skip(self, op, specs), @@ -321,23 +286,24 @@ where let mut jobs = self.repo.create_all_in_op(op, new_jobs).await?; - let ids: Vec = jobs.iter().map(|j| j.id).collect(); - sqlx::query( - r#" - INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at) - SELECT unnested.id, $2, unnested.queue_id, unnested.execute_at, - COALESCE($5, NOW()), COALESCE($5, NOW()) - FROM UNNEST($1::uuid[], $3::text[], $4::timestamptz[]) - AS unnested(id, queue_id, execute_at) - "#, - ) - .bind(&ids) - .bind(&self.job_type) - .bind(&queue_ids) - .bind(&schedule_times) - .bind(op.maybe_now()) - .execute(op.as_executor()) - .await?; + // Insert execution rows one at a time (SQLite doesn't support UNNEST) + let now_val = op.maybe_now(); + for (i, job) in jobs.iter().enumerate() { + let now_or_default = now_val.unwrap_or_else(chrono::Utc::now); + sqlx::query( + r#" + INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?5) + "#, + ) + .bind(job.id) + .bind(self.job_type.as_str()) + .bind(queue_ids[i].as_deref()) + .bind(schedule_times[i]) + .bind(now_or_default) + .execute(op.as_executor()) + .await?; + } for (job, schedule_at) in jobs.iter_mut().zip(&schedule_times) { job.schedule_execution(*schedule_at); @@ -418,17 +384,18 @@ where schedule_at: DateTime, queue_id: Option<&str>, ) -> Result<(), JobError> { - sqlx::query!( + let now_or_default = op.maybe_now().unwrap_or_else(chrono::Utc::now); + sqlx::query( r#" INSERT INTO job_executions (id, job_type, queue_id, execute_at, alive_at, created_at) - VALUES ($1, $2, $3, $4, COALESCE($5, NOW()), COALESCE($5, NOW())) + VALUES (?1, ?2, ?3, ?4, ?5, ?5) "#, - job.id as JobId, - &job.job_type as &JobType, - queue_id, - schedule_at, - op.maybe_now() ) + .bind(job.id) + .bind(job.job_type.as_str()) + .bind(queue_id) + .bind(schedule_at) + .bind(now_or_default) .execute(op.as_executor()) .await?; job.schedule_execution(schedule_at); diff --git a/src/tracker.rs b/src/tracker.rs index b141865..30d37d6 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -37,14 +37,10 @@ impl JobTracker { self.notify.notified() } - pub fn job_execution_inserted(&self) { - self.notify.notify_one() - } - - pub fn job_completed(&self, rescheduled: bool) { - let n_running_jobs = self.running_jobs.fetch_sub(1, Ordering::SeqCst); - if rescheduled || n_running_jobs == self.min_jobs { - self.notify.notify_one(); - } + pub fn job_completed(&self, _rescheduled: bool) { + self.running_jobs.fetch_sub(1, Ordering::SeqCst); + // Without PgListener (LISTEN/NOTIFY), always wake the poller so it + // can pick up queued jobs that may have been blocked by this one. + self.notify.notify_one(); } } diff --git a/tests/helpers.rs b/tests/helpers.rs index e0ecd2c..87fa4d2 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -1,6 +1,7 @@ -pub async fn init_pool() -> anyhow::Result { - let pg_host = std::env::var("PG_HOST").unwrap_or("localhost".to_string()); - let pg_con = format!("postgres://user:password@{pg_host}:5432/pg"); - let pool = sqlx::PgPool::connect(&pg_con).await?; +pub async fn init_pool() -> anyhow::Result { + let db_name = uuid::Uuid::now_v7(); + let url = format!("sqlite:file:{db_name}?mode=memory&cache=shared"); + let pool = es_entity::db::Pool::connect(&url).await?; + sqlx::migrate!().run(&pool).await?; Ok(pool) }