Skip to content

Commit 7a8c5ee

Browse files
committed
Merge branch 'main' into ilongin/1519-flush-insert-buffer-based-on-time
2 parents b30110e + 0007b0d commit 7a8c5ee

File tree

8 files changed

+244
-34
lines changed

8 files changed

+244
-34
lines changed

docs/commands/job/run.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ usage: datachain job run [-h] [-v] [-q] [--team TEAM] [--env-file ENV_FILE]
1414
[--req-file REQ_FILE] [--req REQ [REQ ...]]
1515
[--priority PRIORITY]
1616
[--start-time START_TIME] [--cron CRON]
17-
[--no-wait]
17+
[--no-wait] [--ignore-checkpoints]
1818
file
1919
```
2020

@@ -43,6 +43,7 @@ This command runs a job in Studio using the specified query file. You can config
4343
* `--start-time START_TIME` - Time to schedule the task in YYYY-MM-DDTHH:mm format or natural language.
4444
* `--cron CRON` - Cron expression for the cron task.
4545
* `--no-wait` - Do not wait for the job to finish.
46+
* `--ignore-checkpoints` - Ignore existing checkpoints and run from scratch.
4647
* `-h`, `--help` - Show the help message and exit.
4748
* `-v`, `--verbose` - Be verbose.
4849
* `-q`, `--quiet` - Be quiet.
@@ -156,6 +157,7 @@ datachain job run query.py --no-wait
156157

157158
## Notes
158159

160+
* **Checkpoints**: Running the same script multiple times via `datachain job run` automatically links jobs together, enabling checkpoint reuse. If a previous run of the same script (by absolute path) exists, DataChain will resume from where it left off.
159161
* Closing the logs command (e.g., with Ctrl+C) will only stop displaying the logs but will not cancel the job execution
160162
* To cancel a running job, use the `datachain job cancel` command
161163
* The job will continue running in Studio even after you stop viewing the logs

docs/guide/checkpoints.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,24 @@ This means that if your script creates multiple datasets and fails partway throu
2020

2121
### Studio Runs
2222

23-
When running jobs on Studio, the checkpoint workflow is managed through the UI:
23+
#### Using `datachain job run` CLI
24+
25+
When you run `datachain job run my_script.py`, DataChain automatically:
26+
27+
1. **Links jobs** by finding previous runs of the same script (by absolute path) that were also executed in Studio
28+
2. **Passes checkpoint context** to Studio, enabling checkpoint reuse across runs
29+
30+
This means running the same script multiple times via `datachain job run` will automatically benefit from checkpoints without any additional configuration.
31+
32+
#### Using Studio UI
33+
34+
When triggering jobs through the Studio interface:
2435

2536
1. **Job execution** is triggered using the Run button in the Studio interface
2637
2. **Checkpoint control** is explicit - you choose between:
2738
- **Run from scratch**: Ignores any existing checkpoints and recreates all datasets
2839
- **Continue from last checkpoint**: Resumes from the last successful checkpoint, skipping already-completed stages
29-
3. **Parent-child job linking** is handled automatically by the system - no need for script path matching or job name conventions
40+
3. **Parent-child job linking** is handled automatically by the system
3041
4. **Checkpoint behavior** during execution is the same as local runs: datasets are saved at each `.save()` call and can be reused on retry
3142

3243

src/datachain/cli/parser/job.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ def add_jobs_parser(subparsers, parent_parser) -> None:
122122
action="store_true",
123123
help="Do not wait for the job to finish",
124124
)
125+
studio_run_parser.add_argument(
126+
"--ignore-checkpoints",
127+
action="store_true",
128+
help="Ignore existing checkpoints and run from scratch",
129+
)
125130

126131
studio_ls_help = "List jobs in Studio"
127132
studio_ls_description = "List jobs in Studio."

src/datachain/data_storage/metastore.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,8 @@ def create_job(
474474
parent_job_id: str | None = None,
475475
rerun_from_job_id: str | None = None,
476476
run_group_id: str | None = None,
477+
is_remote_execution: bool = False,
478+
job_id: str | None = None,
477479
) -> str:
478480
"""
479481
Creates a new job.
@@ -511,7 +513,9 @@ def get_job_status(self, job_id: str) -> JobStatus | None:
511513
"""Returns the status of the given job."""
512514

513515
@abstractmethod
514-
def get_last_job_by_name(self, name: str, conn=None) -> "Job | None":
516+
def get_last_job_by_name(
517+
self, name: str, is_remote_execution: bool = False, conn=None
518+
) -> "Job | None":
515519
"""Returns the last job with the given name, ordered by created_at."""
516520

517521
#
@@ -1877,6 +1881,7 @@ def _jobs_columns() -> "list[SchemaItem]":
18771881
Column("parent_job_id", Text, nullable=True),
18781882
Column("rerun_from_job_id", Text, nullable=True),
18791883
Column("run_group_id", Text, nullable=True),
1884+
Column("is_remote_execution", Boolean, nullable=False, default=False),
18801885
Index("idx_jobs_parent_job_id", "parent_job_id"),
18811886
Index("idx_jobs_rerun_from_job_id", "rerun_from_job_id"),
18821887
Index("idx_jobs_run_group_id", "run_group_id"),
@@ -1918,10 +1923,13 @@ def list_jobs_by_ids(self, ids: list[str], conn=None) -> Iterator["Job"]:
19181923
query = self._jobs_query().where(self._jobs.c.id.in_(ids))
19191924
yield from self._parse_jobs(self.db.execute(query, conn=conn))
19201925

1921-
def get_last_job_by_name(self, name: str, conn=None) -> "Job | None":
1926+
def get_last_job_by_name(
1927+
self, name: str, is_remote_execution: bool = False, conn=None
1928+
) -> "Job | None":
19221929
query = (
19231930
self._jobs_query()
19241931
.where(self._jobs.c.name == name)
1932+
.where(self._jobs.c.is_remote_execution == is_remote_execution)
19251933
.order_by(self._jobs.c.created_at.desc())
19261934
.limit(1)
19271935
)
@@ -1942,29 +1950,35 @@ def create_job(
19421950
parent_job_id: str | None = None,
19431951
rerun_from_job_id: str | None = None,
19441952
run_group_id: str | None = None,
1953+
is_remote_execution: bool = False,
1954+
job_id: str | None = None,
19451955
conn: Any = None,
19461956
) -> str:
19471957
"""
19481958
Creates a new job.
19491959
Returns the job id.
1960+
1961+
Args:
1962+
job_id: If provided, uses this ID instead of generating a new one.
1963+
Used for saving Studio jobs locally with their original IDs.
19501964
"""
1951-
job_id = str(uuid4())
1952-
1953-
# Validate run_group_id and rerun_from_job_id consistency
1954-
if rerun_from_job_id:
1955-
# Rerun job: run_group_id should be provided by caller
1956-
# If run_group_id is None, parent is a legacy job without run_group_id
1957-
# In this case, treat current job as first job in a new chain
1958-
# and break the link to the legacy parent
1959-
if run_group_id is None:
1965+
if job_id is None:
1966+
job_id = str(uuid4())
1967+
# Validate run_group_id and rerun_from_job_id consistency for local jobs
1968+
if rerun_from_job_id:
1969+
# Rerun job: run_group_id should be provided by caller
1970+
# If run_group_id is None, parent is a legacy job without run_group_id
1971+
# In this case, treat current job as first job in a new chain
1972+
# and break the link to the legacy parent
1973+
if run_group_id is None:
1974+
run_group_id = job_id
1975+
rerun_from_job_id = None
1976+
else:
1977+
assert run_group_id is None, (
1978+
"run_group_id should not be provided when rerun_from_job_id"
1979+
" is not set"
1980+
)
19601981
run_group_id = job_id
1961-
rerun_from_job_id = None
1962-
else:
1963-
# First job: run_group_id should not be provided (we set it here)
1964-
assert run_group_id is None, (
1965-
"run_group_id should not be provided when rerun_from_job_id is not set"
1966-
)
1967-
run_group_id = job_id
19681982

19691983
self.db.execute(
19701984
self._jobs_insert().values(
@@ -1983,6 +1997,7 @@ def create_job(
19831997
parent_job_id=parent_job_id,
19841998
rerun_from_job_id=rerun_from_job_id,
19851999
run_group_id=run_group_id,
2000+
is_remote_execution=is_remote_execution,
19862001
),
19872002
conn=conn,
19882003
)

src/datachain/job.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ class Job:
2626
parent_job_id: str | None = None
2727
rerun_from_job_id: str | None = None
2828
run_group_id: str | None = None
29+
is_remote_execution: bool = False
2930

3031
@classmethod
31-
def parse(
32+
def parse( # noqa: PLR0913
3233
cls,
3334
id: str | uuid.UUID,
3435
name: str,
@@ -46,6 +47,7 @@ def parse(
4647
parent_job_id: str | None,
4748
rerun_from_job_id: str | None,
4849
run_group_id: str | None,
50+
is_remote_execution: bool = False,
4951
) -> "Job":
5052
return cls(
5153
str(id),
@@ -64,4 +66,5 @@ def parse(
6466
str(parent_job_id) if parent_job_id else None,
6567
str(rerun_from_job_id) if rerun_from_job_id else None,
6668
str(run_group_id) if run_group_id else None,
69+
is_remote_execution,
6770
)

src/datachain/remote/studio.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,8 @@ def create_job(
452452
environment: str | None = None,
453453
workers: int | None = None,
454454
query_name: str | None = None,
455+
rerun_from_job_id: str | None = None,
456+
reset: bool = False,
455457
files: list[str] | None = None,
456458
python_version: str | None = None,
457459
requirements: str | None = None,
@@ -468,6 +470,8 @@ def create_job(
468470
"environment": environment,
469471
"workers": workers,
470472
"query_name": query_name,
473+
"rerun_from_job_id": rerun_from_job_id,
474+
"reset": reset,
471475
"files": files,
472476
"python_version": python_version,
473477
"requirements": requirements,

src/datachain/studio.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import requests
1010
import tabulate
1111

12+
from datachain.catalog import get_catalog
1213
from datachain.config import Config, ConfigLevel
13-
from datachain.data_storage.job import JobStatus
14+
from datachain.data_storage.job import JobQueryType, JobStatus
1415
from datachain.dataset import (
1516
QUERY_DATASET_PREFIX,
1617
parse_dataset_name,
@@ -58,6 +59,7 @@ def process_jobs_args(args: "Namespace"):
5859
args.cron,
5960
args.no_wait,
6061
args.credentials_name,
62+
args.ignore_checkpoints,
6163
)
6264

6365
if args.cmd == "cancel":
@@ -422,7 +424,7 @@ async def _run():
422424
return exit_code_by_status.get(final_status.upper(), 0) if final_status else 0
423425

424426

425-
def create_job(
427+
def create_job( # noqa: PLR0913
426428
query_file: str,
427429
team_name: str | None,
428430
env_file: str | None = None,
@@ -439,7 +441,10 @@ def create_job(
439441
cron: str | None = None,
440442
no_wait: bool | None = False,
441443
credentials_name: str | None = None,
444+
ignore_checkpoints: bool = False,
442445
):
446+
catalog = get_catalog()
447+
443448
query_type = "PYTHON" if query_file.endswith(".py") else "SHELL"
444449
with open(query_file) as f:
445450
query = f.read()
@@ -455,6 +460,15 @@ def create_job(
455460
with open(req_file) as f:
456461
requirements = f.read() + "\n" + requirements
457462

463+
script_path = os.path.abspath(query_file)
464+
465+
rerun_from_job_id = None
466+
rerun_from_job = catalog.metastore.get_last_job_by_name(
467+
script_path, is_remote_execution=True
468+
)
469+
if rerun_from_job:
470+
rerun_from_job_id = rerun_from_job.id
471+
458472
client = StudioClient(team=team_name)
459473
file_ids = upload_files(client, files) if files else []
460474

@@ -469,6 +483,8 @@ def create_job(
469483
environment=environment,
470484
workers=workers,
471485
query_name=os.path.basename(query_file),
486+
rerun_from_job_id=rerun_from_job_id,
487+
reset=ignore_checkpoints,
472488
files=file_ids,
473489
python_version=python_version,
474490
repository=repository,
@@ -486,13 +502,34 @@ def create_job(
486502
raise DataChainError("Failed to create job")
487503

488504
job_id = response.data.get("id")
505+
job_data = response.data
506+
507+
query_type_value = (
508+
JobQueryType.PYTHON if query_type == "PYTHON" else JobQueryType.SHELL
509+
)
510+
catalog.metastore.create_job(
511+
name=script_path, # Use local script path, not Studio's query_name
512+
query=query,
513+
query_type=query_type_value,
514+
status=JobStatus.CREATED,
515+
workers=job_data.get("workers", 0),
516+
python_version=job_data.get("python_version"),
517+
params=job_data.get("params", {}),
518+
parent_job_id=job_data.get("parent_job_id"),
519+
rerun_from_job_id=job_data.get("rerun_from_job_id"),
520+
run_group_id=job_data.get("run_group_id"),
521+
is_remote_execution=True,
522+
job_id=str(job_id), # Use Studio's job ID
523+
)
524+
525+
catalog.close()
489526

490527
if parsed_start_time or cron:
491528
print(f"Job {job_id} is scheduled as a task in Studio.")
492529
return 0
493530

494531
print(f"Job {job_id} created")
495-
print("Open the job in Studio at", response.data.get("url"))
532+
print("Open the job in Studio at", job_data.get("url"))
496533
print("=" * 40)
497534

498535
return 0 if no_wait else show_logs_from_client(client, job_id)

0 commit comments

Comments
 (0)