diff --git a/README.md b/README.md index dab5ae6..eb2004c 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,7 @@ When there are database DDL changes, use following to do the database update: ```bash app_name: harvesting-logic database_name: harvesting-logic-db - route-external: harvester-dev-datagov.app.cloud.gov + route_external: harvester-dev-datagov.app.cloud.gov ``` 2. Deploy the application using Cloud Foundry's `cf push` command with the variable file: diff --git a/app/forms.py b/app/forms.py index 281d1cd..6d0ef98 100644 --- a/app/forms.py +++ b/app/forms.py @@ -35,6 +35,11 @@ class HarvestSourceForm(FlaskForm): choices=["manual", "daily", "weekly", "biweekly", "monthly"], validators=[DataRequired()], ) + size = SelectField( + "Size", + choices=["small", "medium", "large"], + validators=[DataRequired()], + ) schema_type = SelectField( "Schema Type", choices=["strict", "other"], validators=[DataRequired()] ) diff --git a/app/routes.py b/app/routes.py index 9246326..ba6362b 100644 --- a/app/routes.py +++ b/app/routes.py @@ -431,9 +431,7 @@ def add_harvest_source(): @mod.route("/harvest_source/", methods=["GET"]) def view_harvest_source_data(source_id: str): source = db.get_harvest_source(source_id) - jobs = db.get_all_harvest_jobs_by_filter( - {"harvest_source_id": source.id, "status": "complete"} - ) + jobs = db.get_all_harvest_jobs_by_filter({"harvest_source_id": source.id}) next_job = "N/A" future_jobs = db.get_new_harvest_jobs_by_source_in_future(source.id) if len(future_jobs): @@ -623,9 +621,9 @@ def get_harvest_errors_by_job(job_id, error_type): try: match error_type: case "job": - return db.get_harvest_job_errors_by_job(job_id) + return db._to_dict(db.get_harvest_job_errors_by_job(job_id)) case "record": - return db.get_harvest_record_errors_by_job(job_id) + return db._to_dict(db.get_harvest_record_errors_by_job(job_id)) except Exception: return "Please provide correct job_id" diff --git a/app/scripts/load_manager.py b/app/scripts/load_manager.py index be49331..fe0bb30 100644 --- a/app/scripts/load_manager.py +++ b/app/scripts/load_manager.py @@ -18,6 +18,11 @@ logger = logging.getLogger("harvest_admin") +TASK_SIZE_ENUM = { + "medium": (2048, 768), + "large": (4096, 1536), +} + def create_cf_handler(): # check for correct env vars to init CFHandler @@ -27,7 +32,7 @@ def create_cf_handler(): return CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH) -def create_task(job_id, cf_handler=None): +def create_task(job_id, size, cf_handler=None): task_contract = { "app_guuid": HARVEST_RUNNER_APP_GUID, "command": f"python harvester/harvest.py {job_id}", @@ -36,6 +41,10 @@ def create_task(job_id, cf_handler=None): if cf_handler is None: cf_handler = create_cf_handler() + if size != "small" and size in TASK_SIZE_ENUM: + task_contract["memory_in_mb"] = TASK_SIZE_ENUM[size][0] + task_contract["disk_in_mb"] = TASK_SIZE_ENUM[size][1] + cf_handler.start_task(**task_contract) updated_job = interface.update_harvest_job(job_id, {"status": "in_progress"}) message = f"Updated job {updated_job.id} to in_progress" @@ -63,7 +72,7 @@ def trigger_manual_job(source_id): logger.info( f"Created new manual harvest job: for {job_data.harvest_source_id}." ) - return create_task(job_data.id) + return create_task(job_data.id, source.size) def schedule_first_job(source_id): @@ -121,5 +130,6 @@ def load_manager(): # then mark that job(s) as running in the DB logger.info("Load Manager :: Updated Harvest Jobs") for job in jobs[:slots]: - create_task(job.id, cf_handler) + source = interface.get_harvest_source(job.harvest_source_id) + create_task(job.id, source.size, cf_handler) schedule_next_job(job.harvest_source_id) diff --git a/app/templates/view_source_data.html b/app/templates/view_source_data.html index 99d70b2..e74fcc4 100644 --- a/app/templates/view_source_data.html +++ b/app/templates/view_source_data.html @@ -71,6 +71,7 @@

Harvest Jobs

Id + Status Date Created Date Finished Records Added @@ -85,6 +86,7 @@

Harvest Jobs

{{job.id}} + {{job.status}} {{job.date_created}} {{job.date_finished}} {{job.records_added}} diff --git a/database/models.py b/database/models.py index 0972239..8a37455 100644 --- a/database/models.py +++ b/database/models.py @@ -52,6 +52,7 @@ class HarvestSource(db.Model): index=True, ) url = db.Column(db.String, nullable=False, unique=True) + size = db.Column(Enum("small", "medium", "large", name="size")) schema_type = db.Column(db.String, nullable=False) source_type = db.Column(db.String, nullable=False) status = db.Column(db.String) diff --git a/harvester/exceptions.py b/harvester/exceptions.py index a5824af..a569eef 100644 --- a/harvester/exceptions.py +++ b/harvester/exceptions.py @@ -29,6 +29,9 @@ def __init__(self, msg, harvest_job_id): self.db_interface.add_harvest_job_error(error_data) self.db_interface.update_harvest_job(harvest_job_id, job_status) + self.log_err() + + def log_err(self): self.logger.critical(self.msg, exc_info=True) @@ -65,11 +68,15 @@ def __init__(self, msg, harvest_job_id, harvest_record_id): self.db_interface.add_harvest_record_error(error_data) self.db_interface.update_harvest_record(harvest_record_id, {"status": "error"}) + self.log_err() + + def log_err(self): self.logger.error(self.msg, exc_info=True) class ValidationException(HarvestNonCriticalException): - pass + def log_err(self): + pass class TranformationException(HarvestNonCriticalException): diff --git a/harvester/harvest.py b/harvester/harvest.py index dceeffa..c7de97d 100644 --- a/harvester/harvest.py +++ b/harvester/harvest.py @@ -221,8 +221,8 @@ def compare(self) -> None: def get_record_changes(self) -> None: """determine which records needs to be updated, deleted, or created""" logger.info(f"getting records changes for {self.name} using {self.url}") - self.prepare_internal_data() self.prepare_external_data() + self.prepare_internal_data() self.compare() def write_compare_to_db(self) -> dict: @@ -319,18 +319,21 @@ def report(self) -> None: None: 0, } actual_results_status = {"success": 0, "error": 0, None: 0} - validity = {"valid": 0, "invalid": 0} + validity = {"valid": 0, "invalid": 0, "ignored": 0} for record_id, record in self.external_records.items(): # action - actual_results_action[record.action] += 1 + if record.status != "error": + actual_results_action[record.action] += 1 # status actual_results_status[record.status] += 1 # validity if record.valid: validity["valid"] += 1 - else: + elif not record.valid: validity["invalid"] += 1 + else: + validity["not_validated"] += 1 # what actually happened? logger.info("actual actions completed") @@ -338,7 +341,7 @@ def report(self) -> None: # what actually happened? logger.info("actual status completed") - logger.info(actual_results_action) + logger.info(actual_results_status) # what's our record validity count? logger.info("validity of the records") diff --git a/harvester/lib/cf_handler.py b/harvester/lib/cf_handler.py index d0aa419..be0020f 100644 --- a/harvester/lib/cf_handler.py +++ b/harvester/lib/cf_handler.py @@ -14,8 +14,10 @@ def setup(self): self.client.init_with_user_credentials(self.user, self.password) self.task_mgr = TaskManager(self.url, self.client) - def start_task(self, app_guuid, command, task_id): - return self.task_mgr.create(app_guuid, command, task_id) + def start_task(self, app_guuid, command, task_id, memory_in_mb=512, disk_in_mb=512): + return self.task_mgr.create( + app_guuid, command, task_id, memory_in_mb, disk_in_mb + ) def stop_task(self, task_id): return self.task_mgr.cancel(task_id) diff --git a/manifest.yml b/manifest.yml index f39b1cd..98c9641 100644 --- a/manifest.yml +++ b/manifest.yml @@ -4,7 +4,7 @@ applications: buildpacks: - python_buildpack routes: - - route: ((route-external)) + - route: ((route_external)) services: - ((app_name))-db - ((app_name))-secrets diff --git a/tests/conftest.py b/tests/conftest.py index 864963a..12ba78a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -104,6 +104,7 @@ def source_data_dcatus(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", + "size": "small", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus.json", "schema_type": "type1", "source_type": "dcatus", @@ -119,6 +120,7 @@ def source_data_dcatus_2(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", + "size": "medium", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus_2.json", "schema_type": "type1", "source_type": "dcatus", @@ -134,6 +136,7 @@ def source_data_dcatus_same_title(organization_data: dict) -> dict: "notification_emails": "email@example.com", "organization_id": organization_data["id"], "frequency": "daily", + "size": "small", "url": f"{HARVEST_SOURCE_URL}/dcatus/dcatus_same_title.json", "schema_type": "type1", "source_type": "dcatus", @@ -154,6 +157,7 @@ def source_data_waf(organization_data: dict) -> dict: "notification_emails": "wafl@example.com", "organization_id": organization_data["id"], "frequency": "daily", + "size": "small", "url": f"{HARVEST_SOURCE_URL}/waf/", "schema_type": "type1", "source_type": "waf", diff --git a/vars.development.yml b/vars.development.yml index d3b1aa3..77c95c6 100644 --- a/vars.development.yml +++ b/vars.development.yml @@ -1,5 +1,5 @@ app_name: datagov-harvest -route-external: datagov-harvest-admin-dev.app.cloud.gov +route_external: datagov-harvest-admin-dev.app.cloud.gov CF_API_URL: https://api.fr.cloud.gov CKAN_API_URL: https://catalog-next-dev-admin-datagov.app.cloud.gov HARVEST_RUNNER_APP_GUID: e6a8bba8-ed6d-4200-8280-67b46cebdc63 diff --git a/vars.prod.yml b/vars.prod.yml index 5990141..fd800a5 100644 --- a/vars.prod.yml +++ b/vars.prod.yml @@ -1,5 +1,5 @@ app_name: datagov-harvest -route-external: datagov-harvest-admin-prod.app.cloud.gov +route_external: datagov-harvest-admin-prod.app.cloud.gov CF_API_URL: https://api.fr.cloud.gov CKAN_API_URL: https://catalog.data.gov HARVEST_RUNNER_APP_GUID: null diff --git a/vars.staging.yml b/vars.staging.yml index 15b5620..3960ecd 100644 --- a/vars.staging.yml +++ b/vars.staging.yml @@ -1,5 +1,5 @@ app_name: datagov-harvest -route-external: datagov-harvest-admin-stage.app.cloud.gov +route_external: datagov-harvest-admin-stage.app.cloud.gov CF_API_URL: https://api.fr.cloud.gov CKAN_API_URL: https://catalog-stage.data.gov HARVEST_RUNNER_APP_GUID: null