diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index d5cabe06..d2dd8b56 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -8,6 +8,8 @@ import random import signal import traceback +import os +import platform import deepdiff from tqdm import tqdm @@ -15,6 +17,8 @@ from .errors import DataJointError, LostConnectionError from .expression import AndList, QueryExpression from .hash import key_hash +from .settings import config +from .utils import user_choice, to_camel_case # noinspection PyExceptionInherit,PyCallingNonCallable @@ -24,14 +28,14 @@ # --- helper functions for multiprocessing -- -def _initialize_populate(table, jobs, populate_kwargs): +def _initialize_populate(table, reserve_jobs, populate_kwargs): """ Initialize the process for multiprocessing. Saves the unpickled copy of the table to the current process and reconnects. """ process = mp.current_process() process.table = table - process.jobs = jobs + process.reserve_jobs = reserve_jobs process.populate_kwargs = populate_kwargs table.connection.connect() # reconnect @@ -43,7 +47,9 @@ def _call_populate1(key): :return: key, error if error, otherwise None """ process = mp.current_process() - return process.table._populate1(key, process.jobs, **process.populate_kwargs) + return process.table._populate1( + key, process.reserve_jobs, **process.populate_kwargs + ) class AutoPopulate: @@ -91,6 +97,7 @@ def _rename_attributes(table, props): self._key_source = _rename_attributes(*parents[0]) for q in parents[1:]: self._key_source *= _rename_attributes(*q) + return self._key_source def make(self, key): @@ -105,8 +112,8 @@ def make(self, key): The method can be implemented either as: (a) Regular method: All three steps are performed in a single database transaction. The method must return None. - (b) Generator method: - The make method is split into three functions: + (b) Generator method: + The make method is split into three functions: - `make_fetch`: Fetches data from the parent tables. - `make_compute`: Computes secondary attributes based on the fetched data. - `make_insert`: Inserts the computed data into the current table. @@ -124,7 +131,7 @@ def make(self, key): self.make_insert(key, *computed_result) commit_transaction - + Importantly, the output of make_fetch is a tuple that serves as the input into `make_compute`. The output of `make_compute` is a tuple that serves as the input into `make_insert`. @@ -228,6 +235,7 @@ def populate( display_progress=False, processes=1, make_kwargs=None, + schedule_jobs=True, ): """ ``table.populate()`` calls ``table.make(key)`` for every primary key in @@ -249,6 +257,8 @@ def populate( to be passed down to each ``make()`` call. Computation arguments should be specified within the pipeline e.g. using a `dj.Lookup` table. :type make_kwargs: dict, optional + :param schedule_jobs: if True, run schedule_jobs before doing populate (default: True), + only applicable if reserved_jobs is True :return: a dict with two keys "success_count": the count of successful ``make()`` calls in this ``populate()`` call "error_list": the error list that is filled if `suppress_errors` is True @@ -261,9 +271,9 @@ def populate( raise DataJointError( "The order argument must be one of %s" % str(valid_order) ) - jobs = ( - self.connection.schemas[self.target.database].jobs if reserve_jobs else None - ) + + if schedule_jobs: + self.schedule_jobs(*restrictions) # define and set up signal handler for SIGTERM: if reserve_jobs: @@ -274,19 +284,27 @@ def handler(signum, frame): old_handler = signal.signal(signal.SIGTERM, handler) + # retrieve `keys` if not provided if keys is None: - keys = (self._jobs_to_do(restrictions) - self.target).fetch( - "KEY", limit=limit - ) - - # exclude "error", "ignore" or "reserved" jobs - if reserve_jobs: - exclude_key_hashes = ( - jobs - & {"table_name": self.target.table_name} - & 'status in ("error", "ignore", "reserved")' - ).fetch("key_hash") - keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] + if reserve_jobs: + keys = (self.jobs & {"status": "scheduled"}).fetch( + "key", order_by="timestamp", limit=limit + ) + if restrictions: + # hitting the `key_source` again to apply the restrictions + # this is expensive/suboptimal + keys = (self._jobs_to_do(restrictions) & keys).fetch("KEY") + else: + keys = (self._jobs_to_do(restrictions) - self.target).fetch( + "KEY", limit=limit + ) + else: + # exclude "error", "ignore" or "reserved" jobs + if reserve_jobs: + exclude_key_hashes = ( + self.jobs & 'status in ("error", "ignore", "reserved")' + ).fetch("key_hash") + keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] if order == "reverse": keys.reverse() @@ -316,7 +334,7 @@ def handler(signum, frame): if display_progress else keys ): - status = self._populate1(key, jobs, **populate_kwargs) + status = self._populate1(key, reserve_jobs, **populate_kwargs) if status is True: success_list.append(1) elif isinstance(status, tuple): @@ -329,7 +347,7 @@ def handler(signum, frame): del self.connection._conn.ctx # SSLContext is not pickleable with ( mp.Pool( - processes, _initialize_populate, (self, jobs, populate_kwargs) + processes, _initialize_populate, (self, True, populate_kwargs) ) as pool, ( tqdm(desc="Processes: ", total=nkeys) @@ -358,11 +376,16 @@ def handler(signum, frame): } def _populate1( - self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None + self, + key, + reserve_jobs: bool, + suppress_errors: bool, + return_exception_objects: bool, + make_kwargs: dict = None, ): """ populates table for one source key, calling self.make inside a transaction. - :param jobs: the jobs table or None if not reserve_jobs + :param reserve_jobs: if True, reserve jobs to populate in asynchronous fashion :param key: dict specifying job to populate :param suppress_errors: bool if errors should be suppressed and returned :param return_exception_objects: if True, errors must be returned as objects @@ -372,7 +395,7 @@ def _populate1( # use the legacy `_make_tuples` callback. make = self._make_tuples if hasattr(self, "_make_tuples") else self.make - if jobs is not None and not jobs.reserve( + if reserve_jobs and not self._Jobs.reserve( self.target.table_name, self._job_key(key) ): return False @@ -385,12 +408,12 @@ def _populate1( if key in self.target: # already populated if not is_generator: self.connection.cancel_transaction() - if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) + self._Jobs.complete(self.target.table_name, self._job_key(key)) return False logger.debug(f"Making {key} -> {self.target.full_table_name}") self.__class__._allow_insert = True + make_start = datetime.datetime.utcnow() try: if not is_generator: @@ -412,11 +435,10 @@ def _populate1( != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[ fetched_data ] - ): # rollback due to referential integrity fail - self.connection.cancel_transaction() - logger.warning( - f"Referential integrity failed for {key} -> {self.target.full_table_name}") - return False + ): # raise error if fetched data has changed + raise DataJointError( + "Referential integrity failed! The `make_fetch` data has changed" + ) gen.send(computed_result) # insert except (KeyboardInterrupt, SystemExit, Exception) as error: @@ -431,13 +453,16 @@ def _populate1( logger.debug( f"Error making {key} -> {self.target.full_table_name} - {error_message}" ) - if jobs is not None: + if reserve_jobs: # show error name and error message (if any) - jobs.error( + self._Jobs.error( self.target.table_name, self._job_key(key), error_message=error_message, error_stack=traceback.format_exc(), + run_duration=( + datetime.datetime.utcnow() - make_start + ).total_seconds(), ) if not suppress_errors or isinstance(error, SystemExit): raise @@ -445,10 +470,26 @@ def _populate1( logger.error(error) return key, error if return_exception_objects else error_message else: + # Update the _job column with the job metadata for newly populated entries + if "_job" in self.target.heading._attributes: + job_metadata = { + "execution_duration": ( + datetime.datetime.utcnow() - make_start + ).total_seconds(), + "host": platform.node(), + "pid": os.getpid(), + "connection_id": self.connection.connection_id, + "user": self._Jobs._user, + } + for k in (self.target & key).fetch("KEY"): + self.target.update1({**k, "_job": job_metadata}) self.connection.commit_transaction() + self._Jobs.complete( + self.target.table_name, + self._job_key(key), + run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(), + ) logger.debug(f"Success making {key} -> {self.target.full_table_name}") - if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) return True finally: self.__class__._allow_insert = False @@ -475,3 +516,98 @@ def progress(self, *restrictions, display=False): ), ) return remaining, total + + @property + def _Jobs(self): + return self.connection.schemas[self.target.database].jobs + + @property + def jobs(self): + return self._Jobs & {"table_name": self.target.table_name} + + def schedule_jobs( + self, *restrictions, min_scheduling_interval=None + ): + """ + Schedule new jobs for this autopopulate table by finding keys that need computation. + + This method implements an optimization strategy to avoid excessive scheduling: + 1. First checks if jobs were scheduled recently (within min_scheduling_interval) + 2. If recent scheduling event exists, skips scheduling to prevent database load + 3. Otherwise, finds keys that need computation and schedules them + + Args: + restrictions: a list of restrictions each restrict (table.key_source - target.proj()) + min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling. + If None, uses the value from dj.config["min_scheduling_interval"] (default: None) + + Returns: + None + """ + __scheduled_event = { + "table_name": self.target.table_name, + "__type__": "jobs scheduling event", + } + + if min_scheduling_interval is None: + min_scheduling_interval = config["min_scheduling_interval"] + + if min_scheduling_interval > 0: + recent_scheduling_event = ( + self._Jobs.proj( + last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())" + ) + & {"table_name": f"__{self.target.table_name}__"} + & {"key_hash": key_hash(__scheduled_event)} + & f"last_scheduled <= {min_scheduling_interval}" + ) + if recent_scheduling_event: + logger.info( + f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)" + ) + return + + try: + with self.connection.transaction: + schedule_count = 0 + for key in (self._jobs_to_do(restrictions) - self.target).fetch("KEY"): + schedule_count += self._Jobs.schedule(self.target.table_name, key) + except Exception as e: + logger.exception(str(e)) + else: + self._Jobs.ignore( + f"__{self.target.table_name}__", + __scheduled_event, + message=f"Jobs scheduling event: {__scheduled_event['table_name']}", + ) + logger.info( + f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`" + ) + + def cleanup_jobs(self): + """ + Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table. + + This method handles two types of orphaned jobs: + 1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted) + 2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted) + + The method is potentially time-consuming as it needs to: + - Compare all jobs against the current key_source + - For success jobs, verify their existence in the target table + - Delete any jobs that fail these checks + + This cleanup should not need to run very often, but helps maintain database consistency. + """ + removed = 0 + if len(self.jobs): + keys2do = self._jobs_to_do({}).fetch("KEY") + if len(self.jobs) - len(keys2do) > 0: + for key, job_key in zip(*self.jobs.fetch("KEY", "key")): + if job_key not in keys2do: + (self.jobs & key).delete() + removed += 1 + + logger.info( + f"{removed} invalid jobs removed for `{to_camel_case(self.target.table_name)}`" + ) diff --git a/datajoint/declare.py b/datajoint/declare.py index 30447679..240174be 100644 --- a/datajoint/declare.py +++ b/datajoint/declare.py @@ -327,6 +327,13 @@ def declare(full_table_name, definition, context): for attr in metadata_attr_sql ) + # Add hidden _job column for imported and computed tables + if table_name.startswith(("_", "__")): + # This is an imported or computed table (single or double underscore prefix) + attribute_sql.append( + "`_job` json NULL COMMENT 'Hidden column for job tracking metadata'" + ) + if not primary_key: raise DataJointError("Table must have a primary key") diff --git a/datajoint/jobs.py b/datajoint/jobs.py index d6b31e13..e12716f9 100644 --- a/datajoint/jobs.py +++ b/datajoint/jobs.py @@ -1,5 +1,8 @@ import os +import datetime import platform +import json +from typing import Dict, Any, Union from .errors import DuplicateError from .hash import key_hash @@ -30,15 +33,20 @@ def __init__(self, conn, database): table_name :varchar(255) # className of the table key_hash :char(32) # key hash --- - status :enum('reserved','error','ignore') # if tuple is missing, the job is available - key=null :blob # structure containing the key + status :enum('reserved','error','ignore','scheduled','success') + key=null :json # structure containing the key for querying error_message="" :varchar({error_message_length}) # error message returned if failed error_stack=null :mediumblob # error stack if failed user="" :varchar(255) # database user host="" :varchar(255) # system hostname pid=0 :int unsigned # system process id - connection_id = 0 : bigint unsigned # connection_id() - timestamp=CURRENT_TIMESTAMP :timestamp # automatic timestamp + connection_id = 0 : bigint unsigned # connection_id() + timestamp :timestamp # timestamp of the job status change or scheduled time + run_duration=null : float # run duration in seconds + run_metadata=null :json # metadata about the run (e.g. code version, environment info) + index(table_name, status) + index(status) + index(timestamp) # for ordering jobs """.format( database=database, error_message_length=ERROR_MESSAGE_LENGTH ) @@ -62,33 +70,83 @@ def drop(self): """bypass interactive prompts and dependencies""" self.drop_quick() + def schedule(self, table_name, key, seconds_delay=0, force=False): + """ + Schedule a job for computation in the DataJoint pipeline. + + This method manages job scheduling with the following key behaviors: + 1. Creates a new job entry if one doesn't exist + 2. Updates existing jobs based on their current status: + - Allows rescheduling if job is in error/ignore status and force=True + - Prevents rescheduling if job is already scheduled/reserved/success + 3. Records job metadata including host, process ID, and user info + 4. Supports delayed execution through seconds_delay parameter + + Args: + table_name: Full table name in format `database`.`table_name` + key: Dictionary containing the job's primary key + seconds_delay: Optional delay in seconds before job execution (default: 0) + force: If True, allows rescheduling jobs in error/ignore status (default: False) + + Returns: + bool: True if job was successfully scheduled, False if job already exists with incompatible status + """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status in ("scheduled", "reserved") or ( + current_status in ("error", "ignore") and not force + ): + return False + + job = dict( + job_key, + status="scheduled", + host=platform.node(), + pid=os.getpid(), + connection_id=self.connection.connection_id, + key=_jsonify(key), + user=self._user, + timestamp=datetime.datetime.utcnow() + + datetime.timedelta(seconds=seconds_delay), + ) + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + + return True + def reserve(self, table_name, key): """ - Reserve a job for computation. When a job is reserved, the job table contains an entry for the - job key, identified by its hash. When jobs are completed, the entry is removed. + Reserve a job for computation. :param table_name: `database`.`table_name` :param key: the dict of the job's primary key :return: True if reserved job successfully. False = the jobs is already taken """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status != "scheduled": + return False + job = dict( - table_name=table_name, - key_hash=key_hash(key), + job_key, status="reserved", host=platform.node(), pid=os.getpid(), connection_id=self.connection.connection_id, - key=key, + key=_jsonify(key), user=self._user, + timestamp=datetime.datetime.utcnow(), ) - try: - with config(enable_python_native_blobs=True): - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + return True - def ignore(self, table_name, key): + def ignore(self, table_name, key, message=""): """ Set a job to be ignored for computation. When a job is ignored, the job table contains an entry for the job key, identified by its hash, with status "ignore". @@ -98,46 +156,66 @@ def ignore(self, table_name, key): Table name (str) - `database`.`table_name` key: The dict of the job's primary key + message: + The optional message for why the key is to be ignored Returns: True if ignore job successfully. False = the jobs is already taken """ + job_key = dict(table_name=table_name, key_hash=key_hash(key)) + if self & job_key: + current_status = (self & job_key).fetch1("status") + if current_status not in ("scheduled", "ignore"): + return False + job = dict( - table_name=table_name, - key_hash=key_hash(key), + job_key, status="ignore", host=platform.node(), pid=os.getpid(), connection_id=self.connection.connection_id, - key=key, + key=_jsonify(key), + error_message=message, user=self._user, + timestamp=datetime.datetime.utcnow(), ) - try: - with config(enable_python_native_blobs=True): - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False + + with config(enable_python_native_blobs=True): + self.insert1(job, replace=True, ignore_extra_fields=True) + return True def complete(self, table_name, key): """ Log a completed job. When a job is completed, its reservation entry is deleted. - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key + Args: + table_name: `database`.`table_name` + key: the dict of the job's primary key """ job_key = dict(table_name=table_name, key_hash=key_hash(key)) - (self & job_key).delete_quick() - - def error(self, table_name, key, error_message, error_stack=None): + (self & job_key).delete() + + def error( + self, + table_name, + key, + error_message, + error_stack=None, + run_duration=None, + run_metadata=None, + ): """ Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem - :param table_name: `database`.`table_name` - :param key: the dict of the job's primary key - :param error_message: string error message - :param error_stack: stack trace + Args: + table_name: `database`.`table_name` + key: the dict of the job's primary key + error_message: string error message + error_stack: stack trace + run_duration: duration in second of the job run + run_metadata: dict containing metadata about the run (e.g. code version, environment info) """ if len(error_message) > ERROR_MESSAGE_LENGTH: error_message = ( @@ -154,10 +232,21 @@ def error(self, table_name, key, error_message, error_stack=None): pid=os.getpid(), connection_id=self.connection.connection_id, user=self._user, - key=key, + key=_jsonify(key), error_message=error_message, error_stack=error_stack, + run_duration=run_duration, + run_metadata=_jsonify(run_metadata) if run_metadata else None, + timestamp=datetime.datetime.utcnow(), ), replace=True, ignore_extra_fields=True, ) + + +def _jsonify(key: Dict[str, Any]) -> Dict[str, Any]: + """ + Ensure the key is JSON serializable by converting to JSON and back. + Uses str() as fallback for any non-serializable objects. + """ + return json.loads(json.dumps(key, default=str)) diff --git a/datajoint/settings.py b/datajoint/settings.py index 30b206f9..162ba188 100644 --- a/datajoint/settings.py +++ b/datajoint/settings.py @@ -51,6 +51,8 @@ "add_hidden_timestamp": False, # file size limit for when to disable checksums "filepath_checksum_size_limit": None, + # minimum time in seconds between job scheduling operations + "min_scheduling_interval": 5, } ) diff --git a/docs/jobs_orchestration.md b/docs/jobs_orchestration.md new file mode 100644 index 00000000..12628084 --- /dev/null +++ b/docs/jobs_orchestration.md @@ -0,0 +1,133 @@ +# DataJoint Jobs Orchestration mechanism + +This document describes the behavior and mechanism of DataJoint's jobs reservation and execution system. + +## Jobs Table Structure + +The jobs table (`~jobs`) is a system table that tracks the state and execution of jobs in the DataJoint pipeline. It has the following key fields: + +- `table_name`: The full table name being populated +- `key_hash`: A hash of the job's primary key +- `status`: Current job status, one of: + - `scheduled`: Job is queued for execution + - `reserved`: Job is currently being processed + - `error`: Job failed with an error + - `ignore`: Job is marked to be ignored + - `success`: Job completed successfully +- `key`: JSON structure containing the job's primary key (query-able) +- `error_message`: Error message if job failed +- `error_stack`: Stack trace if job failed +- `user`: Database user who created the job +- `host`: System hostname where job was created +- `pid`: Process ID of the job +- `connection_id`: Database connection ID +- `timestamp`: When the job status was last changed +- `run_duration`: How long the job took to execute (in seconds) +- `run_metadata`: JSON structure containing metadata about the run (e.g. code version, environment info, system state) + +## Job Scheduling Process + +The `schedule_jobs` method implements an optimization strategy to prevent excessive scheduling: + +1. **Rate Limiting**: + - Uses `min_scheduling_interval` (configurable via `dj.config["min_scheduling_interval"]`) + - Default interval is 5 seconds + - Can be overridden per call + +2. **Scheduling Logic**: + - Checks for recent scheduling events within the interval + - Skips scheduling if recent events exist + - Otherwise, finds keys that need computation by: + 1. Querying the `key_source` to get all possible keys + 2. Excluding keys that already exist in the target table + 3. Excluding keys that are already in the jobs table with incompatible status + (i.e., `scheduled`, `reserved`, or `success`) + - Schedules each valid key as a new job + - Records scheduling events for rate limiting + +3. **Job States**: + - New jobs start as `scheduled` + - Jobs can be rescheduled if in `error` or `ignore` state (with `force=True`) + - Prevents rescheduling if job is `scheduled`, `reserved`, or `success` + +## Populate Process Flow + +The `populate()` method orchestrates the job execution process: + +1. **Initialization**: + - Optionally schedules new jobs (controlled by `schedule_jobs` parameter) + +2. **Job Selection**: + - If `reserve_jobs=True`: + - Fetches `scheduled` jobs from the jobs table + - Applies any restrictions to the job set + - Attempts to reserve each job before processing + - Skips jobs that cannot be reserved (already taken by another process) + - If `reserve_jobs=False`: + - Uses traditional direct computation approach + +3. **Execution**: + - Processes jobs in specified order (`original`, `reverse`, or `random`) + - Supports single or multi-process execution + - For reserved jobs: + - Updates job status to `reserved` during processing + - Records execution metrics (duration, version) + - On successful completion: remove job from the jobs table + - On error: update job status to `error` + - Records errors and execution metrics + +4. **Cleanup**: + - Optionally clean up orphaned/outdated jobs + +## Job Cleanup Process + +The `cleanup_jobs` method maintains database consistency by removing orphaned jobs: + +1. **Orphaned Success Jobs**: + - Identifies jobs marked as `success` but not present in the target table + - These typically occur when target table entries are deleted + +2. **Orphaned Incomplete Jobs**: + - Identifies jobs in `scheduled`/`error`/`ignore` state that are no longer in the `key_source` + - These typically occur when upstream table entries are deleted + +3. **Cleanup Characteristics**: + - Potentially time-consuming operation + - Should not need to run frequently + - Helps maintain database consistency + +## Jobs Table Maintenance + +The "freshness" and consistency of the jobs table depends on regular maintenance through two key operations: + +1. **Scheduling Updates** (`schedule_jobs`): + - Adds new jobs to the table + - Should be run frequently enough to keep up with new data + - Rate-limited by `min_scheduling_interval` to prevent overload + - Example: Run every few minutes in a cron job for active pipelines + - Event-driven approach: `inserts` in upstream tables auto trigger this step + +2. **Cleanup** (`cleanup_jobs`): + - Removes orphaned or outdated jobs + - Should be run periodically to maintain consistency + - More resource-intensive than scheduling + - Example: Run daily during low-activity periods + - Event-driven approach: `deletes` in upstream or target tables auto trigger this step + +The balance between these operations affects: +- How quickly new jobs are discovered and scheduled +- How long orphaned jobs remain in the table +- Database size and query performance +- Overall system responsiveness + +Recommended maintenance schedule: +```python +# Example: Run scheduling frequently +dj.config["min_scheduling_interval"] = 300 # 5 minutes + +# Example: Run cleanup daily +# (implement as a cron job or scheduled task) +def daily_cleanup(): + for table in your_pipeline_tables: + table.cleanup_jobs() +``` \ No newline at end of file diff --git a/tests/test_declare.py b/tests/test_declare.py index 82802193..6dc726e3 100644 --- a/tests/test_declare.py +++ b/tests/test_declare.py @@ -427,3 +427,40 @@ def test_add_hidden_timestamp_disabled(disable_add_hidden_timestamp, schema_any) ), msg assert not any(a.is_hidden for a in Experiment().heading._attributes.values()), msg assert not any(a.is_hidden for a in Experiment().heading.attributes.values()), msg + + +def test_hidden_job_column_for_imported_computed_tables(schema_any): + """Test that hidden _job column is added to imported and computed tables but not manual/lookup tables""" + + # Manual and Lookup tables should NOT have _job column + manual_attrs = Image().heading._attributes + lookup_attrs = Subject().heading._attributes + + assert not any( + a.name == "_job" for a in manual_attrs.values() + ), "Manual table should not have _job column" + assert not any( + a.name == "_job" for a in lookup_attrs.values() + ), "Lookup table should not have _job column" + + # Imported and Computed tables SHOULD have _job column + imported_attrs = Experiment().heading._attributes + computed_attrs = SigIntTable().heading._attributes + + assert any( + a.name == "_job" for a in imported_attrs.values() + ), "Imported table should have _job column" + assert any( + a.name == "_job" for a in computed_attrs.values() + ), "Computed table should have _job column" + + # Verify the _job column is hidden and has correct type + imported_job_attr = next(a for a in imported_attrs.values() if a.name == "_job") + computed_job_attr = next(a for a in computed_attrs.values() if a.name == "_job") + + assert imported_job_attr.is_hidden, "_job column should be hidden" + assert computed_job_attr.is_hidden, "_job column should be hidden" + assert "json" in imported_job_attr.sql.lower(), "_job column should be JSON type" + assert "json" in computed_job_attr.sql.lower(), "_job column should be JSON type" + assert "null" in imported_job_attr.sql.lower(), "_job column should be nullable" + assert "null" in computed_job_attr.sql.lower(), "_job column should be nullable"