Skip to content

Autopopulate 2.0 #1244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 44 commits into
base: feat/autopopulate2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4f01ea3
added JobsConfig
Feb 3, 2023
de4437c
successful prototype for key_source
Feb 10, 2023
8edf179
added `refresh_jobs` and `purge_invalid_jobs`
Feb 10, 2023
fd849bc
Merge branch 'populate_success_count' into autopopulate-2.0
Feb 10, 2023
8692eb4
rename function: `schedule_jobs`
Mar 24, 2023
f38ce9f
implement `schedule_jobs` as part of `populate()`
Mar 24, 2023
ef3adc2
Merge branch 'master' into autopopulate-2.1
Mar 24, 2023
8b9ac0f
remove JobConfigTable and register_key_source
Mar 24, 2023
71b0696
bugfix, add tests
Mar 30, 2023
c13b3e1
bugfix - remove `jobconfig`
Mar 30, 2023
1f3fac1
Merge branch 'master' into autopulate-2.0
ttngu207 Feb 25, 2025
32fbc6d
Merge branch 'master' into autopulate-2.0
ttngu207 May 27, 2025
0d9ec01
chore: minor bugfix
ttngu207 May 28, 2025
4cc170d
chore: code cleanup
ttngu207 May 28, 2025
b7e4d9b
fix: `key` attribute of type `JSON`
ttngu207 May 28, 2025
57c7247
feat: prevent excessive scheduling with `min_scheduling_interval`
ttngu207 May 28, 2025
3f5247b
chore: minor cleanup
ttngu207 May 29, 2025
45b5658
feat: improve logic to prevent excessive scheduling
ttngu207 May 29, 2025
9903e02
chore: minor bugfix
ttngu207 May 29, 2025
872c5dc
chore: tiny bugfix
ttngu207 May 29, 2025
69d8831
fix: fix scheduling logic
ttngu207 May 29, 2025
cc0f398
chore: minor logging tweak
ttngu207 May 29, 2025
2ac9fa2
fix: log run_duration in error jobs
ttngu207 May 29, 2025
db93e0a
fix: improve logic in `purge_invalid_jobs`
ttngu207 May 29, 2025
c9a5750
docs: new `jobs_orchestration.md` docs
ttngu207 May 30, 2025
28df6c2
Update jobs_orchestration.md
ttngu207 May 30, 2025
b0308e2
Update jobs_orchestration.md
ttngu207 May 30, 2025
e9f5377
feat: add `run_metadata` column to `jobs` table
ttngu207 May 30, 2025
eb90d3d
Merge branch 'datajoint:master' into autopopulate-2.0
ttngu207 Jun 7, 2025
e7c8943
fix: improve error handling when `make_fetch` referential integrity f…
ttngu207 Jun 13, 2025
e55bbcb
style: black format
ttngu207 Jun 13, 2025
964743e
style: format
ttngu207 Jun 13, 2025
53e38f7
Merge pull request #1245 from ttngu207/bugfix-three-part-make
dimitri-yatsenko Jun 16, 2025
0cf1ea0
Merge remote-tracking branch 'upstream/master' into autopopulate-2.0
ttngu207 Jul 1, 2025
15f791c
feat: add `_job` hidden column for `Imported` `Computed` tables
ttngu207 Jul 1, 2025
18727e9
chore: rename `purge_valid_jobs` -> `purge_jobs`
ttngu207 Jul 1, 2025
efbb920
feat: insert `_job` metadata upon `make` completion
ttngu207 Jul 1, 2025
918cc9d
chore: minor code optimization in `purge_jobs`
ttngu207 Jul 1, 2025
dcfeaf5
feat: remove logging of `success` jobs in Jobs table
ttngu207 Jul 1, 2025
1f773fa
docs: minor updates
ttngu207 Jul 1, 2025
7184ce5
format: black
ttngu207 Jul 1, 2025
9d3a9e4
chore: remove the optional `purge_jobs` in `schedule_jobs`
ttngu207 Jul 1, 2025
269c4af
chore: code cleanup
ttngu207 Jul 2, 2025
3d7c4ea
fix: update job metadata in the make's transaction
ttngu207 Jul 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 173 additions & 37 deletions datajoint/autopopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import random
import signal
import traceback
import os
import platform

import deepdiff
from tqdm import tqdm

from .errors import DataJointError, LostConnectionError
from .expression import AndList, QueryExpression
from .hash import key_hash
from .settings import config
from .utils import user_choice, to_camel_case

# noinspection PyExceptionInherit,PyCallingNonCallable

Expand All @@ -24,14 +28,14 @@
# --- helper functions for multiprocessing --


def _initialize_populate(table, jobs, populate_kwargs):
def _initialize_populate(table, reserve_jobs, populate_kwargs):
"""
Initialize the process for multiprocessing.
Saves the unpickled copy of the table to the current process and reconnects.
"""
process = mp.current_process()
process.table = table
process.jobs = jobs
process.reserve_jobs = reserve_jobs
process.populate_kwargs = populate_kwargs
table.connection.connect() # reconnect

Expand All @@ -43,7 +47,9 @@ def _call_populate1(key):
:return: key, error if error, otherwise None
"""
process = mp.current_process()
return process.table._populate1(key, process.jobs, **process.populate_kwargs)
return process.table._populate1(
key, process.reserve_jobs, **process.populate_kwargs
)


class AutoPopulate:
Expand Down Expand Up @@ -91,6 +97,7 @@ def _rename_attributes(table, props):
self._key_source = _rename_attributes(*parents[0])
for q in parents[1:]:
self._key_source *= _rename_attributes(*q)

return self._key_source

def make(self, key):
Expand All @@ -105,8 +112,8 @@ def make(self, key):
The method can be implemented either as:
(a) Regular method: All three steps are performed in a single database transaction.
The method must return None.
(b) Generator method:
The make method is split into three functions:
(b) Generator method:
The make method is split into three functions:
- `make_fetch`: Fetches data from the parent tables.
- `make_compute`: Computes secondary attributes based on the fetched data.
- `make_insert`: Inserts the computed data into the current table.
Expand All @@ -124,7 +131,7 @@ def make(self, key):
self.make_insert(key, *computed_result)
commit_transaction
<pseudocode>

Importantly, the output of make_fetch is a tuple that serves as the input into `make_compute`.
The output of `make_compute` is a tuple that serves as the input into `make_insert`.

Expand Down Expand Up @@ -228,6 +235,7 @@ def populate(
display_progress=False,
processes=1,
make_kwargs=None,
schedule_jobs=True,
):
"""
``table.populate()`` calls ``table.make(key)`` for every primary key in
Expand All @@ -249,6 +257,8 @@ def populate(
to be passed down to each ``make()`` call. Computation arguments should be
specified within the pipeline e.g. using a `dj.Lookup` table.
:type make_kwargs: dict, optional
:param schedule_jobs: if True, run schedule_jobs before doing populate (default: True),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than baking this operation into populate, which makes the logic more convoluted, consider making schedule_jobs a separate, explicit process.

only applicable if reserved_jobs is True
:return: a dict with two keys
"success_count": the count of successful ``make()`` calls in this ``populate()`` call
"error_list": the error list that is filled if `suppress_errors` is True
Expand All @@ -261,9 +271,9 @@ def populate(
raise DataJointError(
"The order argument must be one of %s" % str(valid_order)
)
jobs = (
self.connection.schemas[self.target.database].jobs if reserve_jobs else None
)

if schedule_jobs:
self.schedule_jobs(*restrictions)

# define and set up signal handler for SIGTERM:
if reserve_jobs:
Expand All @@ -274,19 +284,27 @@ def handler(signum, frame):

old_handler = signal.signal(signal.SIGTERM, handler)

# retrieve `keys` if not provided
if keys is None:
keys = (self._jobs_to_do(restrictions) - self.target).fetch(
"KEY", limit=limit
)

# exclude "error", "ignore" or "reserved" jobs
if reserve_jobs:
exclude_key_hashes = (
jobs
& {"table_name": self.target.table_name}
& 'status in ("error", "ignore", "reserved")'
).fetch("key_hash")
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes]
if reserve_jobs:
keys = (self.jobs & {"status": "scheduled"}).fetch(
"key", order_by="timestamp", limit=limit
)
if restrictions:
# hitting the `key_source` again to apply the restrictions
# this is expensive/suboptimal
keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY")
else:
keys = (self._jobs_to_do(restrictions) - self.target).fetch(
"KEY", limit=limit
)
else:
# exclude "error", "ignore" or "reserved" jobs
if reserve_jobs:
exclude_key_hashes = (
self.jobs & 'status in ("error", "ignore", "reserved")'
).fetch("key_hash")
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes]

if order == "reverse":
keys.reverse()
Expand Down Expand Up @@ -316,7 +334,7 @@ def handler(signum, frame):
if display_progress
else keys
):
status = self._populate1(key, jobs, **populate_kwargs)
status = self._populate1(key, reserve_jobs, **populate_kwargs)
if status is True:
success_list.append(1)
elif isinstance(status, tuple):
Expand All @@ -329,7 +347,7 @@ def handler(signum, frame):
del self.connection._conn.ctx # SSLContext is not pickleable
with (
mp.Pool(
processes, _initialize_populate, (self, jobs, populate_kwargs)
processes, _initialize_populate, (self, True, populate_kwargs)
) as pool,
(
tqdm(desc="Processes: ", total=nkeys)
Expand Down Expand Up @@ -358,11 +376,16 @@ def handler(signum, frame):
}

def _populate1(
self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None
self,
key,
reserve_jobs: bool,
suppress_errors: bool,
return_exception_objects: bool,
make_kwargs: dict = None,
):
"""
populates table for one source key, calling self.make inside a transaction.
:param jobs: the jobs table or None if not reserve_jobs
:param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion
:param key: dict specifying job to populate
:param suppress_errors: bool if errors should be suppressed and returned
:param return_exception_objects: if True, errors must be returned as objects
Expand All @@ -372,7 +395,7 @@ def _populate1(
# use the legacy `_make_tuples` callback.
make = self._make_tuples if hasattr(self, "_make_tuples") else self.make

if jobs is not None and not jobs.reserve(
if reserve_jobs and not self._Jobs.reserve(
self.target.table_name, self._job_key(key)
):
return False
Expand All @@ -385,12 +408,12 @@ def _populate1(
if key in self.target: # already populated
if not is_generator:
self.connection.cancel_transaction()
if jobs is not None:
jobs.complete(self.target.table_name, self._job_key(key))
self._Jobs.complete(self.target.table_name, self._job_key(key))
return False

logger.debug(f"Making {key} -> {self.target.full_table_name}")
self.__class__._allow_insert = True
make_start = datetime.datetime.utcnow()

try:
if not is_generator:
Expand All @@ -412,11 +435,10 @@ def _populate1(
!= deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[
fetched_data
]
): # rollback due to referential integrity fail
self.connection.cancel_transaction()
logger.warning(
f"Referential integrity failed for {key} -> {self.target.full_table_name}")
return False
): # raise error if fetched data has changed
raise DataJointError(
"Referential integrity failed! The `make_fetch` data has changed"
)
gen.send(computed_result) # insert

except (KeyboardInterrupt, SystemExit, Exception) as error:
Expand All @@ -431,24 +453,43 @@ def _populate1(
logger.debug(
f"Error making {key} -> {self.target.full_table_name} - {error_message}"
)
if jobs is not None:
if reserve_jobs:
# show error name and error message (if any)
jobs.error(
self._Jobs.error(
self.target.table_name,
self._job_key(key),
error_message=error_message,
error_stack=traceback.format_exc(),
run_duration=(
datetime.datetime.utcnow() - make_start
).total_seconds(),
)
if not suppress_errors or isinstance(error, SystemExit):
raise
else:
logger.error(error)
return key, error if return_exception_objects else error_message
else:
# Update the _job column with the job metadata for newly populated entries
if "_job" in self.target.heading._attributes:
job_metadata = {
"execution_duration": (
datetime.datetime.utcnow() - make_start
).total_seconds(),
"host": platform.node(),
"pid": os.getpid(),
"connection_id": self.connection.connection_id,
"user": self._Jobs._user,
}
for k in (self.target & key).fetch("KEY"):
self.target.update1({**k, "_job": job_metadata})
self.connection.commit_transaction()
self._Jobs.complete(
self.target.table_name,
self._job_key(key),
run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(),
)
logger.debug(f"Success making {key} -> {self.target.full_table_name}")
if jobs is not None:
jobs.complete(self.target.table_name, self._job_key(key))
return True
finally:
self.__class__._allow_insert = False
Expand All @@ -475,3 +516,98 @@ def progress(self, *restrictions, display=False):
),
)
return remaining, total

@property
def _Jobs(self):
return self.connection.schemas[self.target.database].jobs

@property
def jobs(self):
return self._Jobs & {"table_name": self.target.table_name}

def schedule_jobs(
self, *restrictions, min_scheduling_interval=None
):
"""
Schedule new jobs for this autopopulate table by finding keys that need computation.

This method implements an optimization strategy to avoid excessive scheduling:
1. First checks if jobs were scheduled recently (within min_scheduling_interval)
2. If recent scheduling event exists, skips scheduling to prevent database load
3. Otherwise, finds keys that need computation and schedules them

Args:
restrictions: a list of restrictions each restrict (table.key_source - target.proj())
min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling.
If None, uses the value from dj.config["min_scheduling_interval"] (default: None)

Returns:
None
"""
__scheduled_event = {
"table_name": self.target.table_name,
"__type__": "jobs scheduling event",
}

if min_scheduling_interval is None:
min_scheduling_interval = config["min_scheduling_interval"]

if min_scheduling_interval > 0:
recent_scheduling_event = (
self._Jobs.proj(
last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())"
)
& {"table_name": f"__{self.target.table_name}__"}
& {"key_hash": key_hash(__scheduled_event)}
& f"last_scheduled <= {min_scheduling_interval}"
)
if recent_scheduling_event:
logger.info(
f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)"
)
return

try:
with self.connection.transaction:
schedule_count = 0
for key in (self._jobs_to_do(restrictions) - self.target).fetch("KEY"):
schedule_count += self._Jobs.schedule(self.target.table_name, key)
except Exception as e:
logger.exception(str(e))
else:
self._Jobs.ignore(
f"__{self.target.table_name}__",
__scheduled_event,
message=f"Jobs scheduling event: {__scheduled_event['table_name']}",
)
logger.info(
f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`"
)

def cleanup_jobs(self):
"""
Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table.

This method handles two types of orphaned jobs:
1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted)
2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted)

The method is potentially time-consuming as it needs to:
- Compare all jobs against the current key_source
- For success jobs, verify their existence in the target table
- Delete any jobs that fail these checks

This cleanup should not need to run very often, but helps maintain database consistency.
"""
removed = 0
if len(self.jobs):
keys2do = self._jobs_to_do({}).fetch("KEY")
if len(self.jobs) - len(keys2do) > 0:
for key, job_key in zip(*self.jobs.fetch("KEY", "key")):
if job_key not in keys2do:
(self.jobs & key).delete()
removed += 1

logger.info(
f"{removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`"
)
7 changes: 7 additions & 0 deletions datajoint/declare.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,13 @@ def declare(full_table_name, definition, context):
for attr in metadata_attr_sql
)

# Add hidden _job column for imported and computed tables
if table_name.startswith(("_", "__")):
# This is an imported or computed table (single or double underscore prefix)
attribute_sql.append(
"`_job` json NULL COMMENT 'Hidden column for job tracking metadata'"
)

if not primary_key:
raise DataJointError("Table must have a primary key")

Expand Down
Loading
Loading