Skip to content

Commit

Permalink
Merge pull request #63 from GSA/add-load-manager
Browse files Browse the repository at this point in the history
Adds load manager module
  • Loading branch information
btylerburton authored May 7, 2024
2 parents 915e4f6 + a311e83 commit efc0960
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 72 deletions.
9 changes: 5 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ node_modules/
# openstack ( s3 mock )
tmp/

# vscode debugger
.vscode/*
!.vscode/launch.json
.env
# env
.env.secret
requirements.txt

# vscode debugger
.vscode/*
!.vscode/launch.json
7 changes: 4 additions & 3 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
from flask_migrate import Migrate

from database.models import db
from app.scripts.load_manager import load_manager

load_dotenv()

DATABASE_URI = os.getenv("DATABASE_URI")


def create_app(testing=False):

app = Flask(__name__, static_url_path="", static_folder="static")

if testing:
Expand All @@ -35,4 +33,7 @@ def create_app(testing=False):

register_routes(app)

with app.app_context():
load_manager()

return app
62 changes: 62 additions & 0 deletions app/scripts/load_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
from database.interface import HarvesterDBInterface
from harvester.utils import CFHandler

DATABASE_URI = os.getenv("DATABASE_URI")
CF_API_URL = os.getenv("CF_API_URL")
CF_SERVICE_USER = os.getenv("CF_SERVICE_USER")
CF_SERVICE_AUTH = os.getenv("CF_SERVICE_AUTH")
LM_RUNNER_APP_GUID = os.getenv("LM_RUNNER_APP_GUID")
CF_INSTANCE_INDEX = os.getenv("CF_INSTANCE_INDEX")

LM_MAX_TASKS_COUNT = 3

interface = HarvesterDBInterface()
cf_handler = CFHandler(CF_API_URL, CF_SERVICE_USER, CF_SERVICE_AUTH)


def create_task(jobId):
return {
"app_guuid": LM_RUNNER_APP_GUID,
"command": f"python harvest.py {jobId}",
"task_id": f"harvest-job-{jobId}",
}


def sort_jobs(jobs):
return sorted(jobs, key=lambda x: x["status"], reverse=True)


def load_manager():
# confirm CF_INSTANCE_INDEX == 0 or bail
if os.getenv("CF_INSTANCE_INDEX") != "0":
return

# filter harvestjobs by pending / pending_manual
jobs = interface.get_harvest_jobs_by_faceted_filter(
"status", ["pending", "pending_manual"]
)

# get current list of all tasks
current_tasks = cf_handler.get_all_app_tasks(LM_RUNNER_APP_GUID)
# filter out in_process tasks
running_tasks = cf_handler.get_all_running_tasks(current_tasks)

# confirm tasks < MAX_JOBS_COUNT or bail
if LM_MAX_TASKS_COUNT < running_tasks:
return
else:
slots = LM_MAX_TASKS_COUNT - running_tasks

# sort jobs by pending_manual first
sorted_jobs = sort_jobs(jobs)

# slice off jobs to invoke
jobs_to_invoke = sorted_jobs[:slots]

# invoke cf_task with next job(s)
# then mark that job(s) as running in the DB
for job in jobs_to_invoke:
task_contract = create_task(job["id"])
cf_handler.start_task(**task_contract)
interface.update_harvest_job(job["id"], {"status": "in_progress"})
82 changes: 23 additions & 59 deletions database/interface.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from sqlalchemy import create_engine, inspect
import os
from sqlalchemy import create_engine, inspect, or_
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import scoped_session, sessionmaker

Expand All @@ -10,7 +11,7 @@
Organization,
)

from . import DATABASE_URI
DATABASE_URI = os.getenv("DATABASE_URI")


class HarvesterDBInterface:
Expand All @@ -32,6 +33,8 @@ def __init__(self, session=None):

@staticmethod
def _to_dict(obj):
if obj is None:
return None
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}

def add_organization(self, org_data):
Expand All @@ -48,16 +51,10 @@ def add_organization(self, org_data):

def get_all_organizations(self):
orgs = self.db.query(Organization).all()
if orgs is None:
return None
else:
orgs_data = [HarvesterDBInterface._to_dict(org) for org in orgs]
return orgs_data
return [HarvesterDBInterface._to_dict(org) for org in orgs]

def get_organization(self, org_id):
result = self.db.query(Organization).filter_by(id=org_id).first()
if result is None:
return None
return HarvesterDBInterface._to_dict(result)

def update_organization(self, org_id, updates):
Expand Down Expand Up @@ -99,31 +96,17 @@ def add_harvest_source(self, source_data):

def get_all_harvest_sources(self):
harvest_sources = self.db.query(HarvestSource).all()
if harvest_sources is None:
return None
else:
harvest_sources_data = [
HarvesterDBInterface._to_dict(source) for source in harvest_sources
]
return harvest_sources_data
return [HarvesterDBInterface._to_dict(source) for source in harvest_sources]

def get_harvest_source(self, source_id):
result = self.db.query(HarvestSource).filter_by(id=source_id).first()
if result is None:
return None
return HarvesterDBInterface._to_dict(result)

def get_harvest_source_by_org(self, org_id):
harvest_source = (
self.db.query(HarvestSource).filter_by(organization_id=org_id).all()
)
if harvest_source is None:
return None
else:
harvest_source_data = [
HarvesterDBInterface._to_dict(src) for src in harvest_source
]
return harvest_source_data
return [HarvesterDBInterface._to_dict(src) for src in harvest_source]

def update_harvest_source(self, source_id, updates):
try:
Expand Down Expand Up @@ -164,21 +147,24 @@ def add_harvest_job(self, job_data):

def get_harvest_job(self, job_id):
result = self.db.query(HarvestJob).filter_by(id=job_id).first()
if result is None:
return None
return HarvesterDBInterface._to_dict(result)

def get_harvest_jobs_by_filter(self, filter):
harvest_jobs = self.db.query(HarvestJob).filter_by(**filter).all()
harvest_jobs_data = [HarvesterDBInterface._to_dict(job) for job in harvest_jobs]
return harvest_jobs_data

def get_harvest_jobs_by_faceted_filter(self, attr, values):
query_list = [getattr(HarvestJob, attr) == value for value in values]
harvest_jobs = self.db.query(HarvestJob).filter(or_(*query_list)).all()
harvest_jobs_data = [HarvesterDBInterface._to_dict(job) for job in harvest_jobs]
return harvest_jobs_data

def get_harvest_job_by_source(self, source_id):
harvest_job = (
self.db.query(HarvestJob).filter_by(harvest_source_id=source_id).all()
)
if harvest_job is None:
return None
else:
harvest_job_data = [
HarvesterDBInterface._to_dict(job) for job in harvest_job
]
return harvest_job_data
return [HarvesterDBInterface._to_dict(job) for job in harvest_job]

def update_harvest_job(self, job_id, updates):
try:
Expand Down Expand Up @@ -219,19 +205,11 @@ def add_harvest_error(self, error_data):

def get_harvest_error(self, error_id):
result = self.db.query(HarvestError).filter_by(id=error_id).first()
if result is None:
return None
return HarvesterDBInterface._to_dict(result)

def get_harvest_error_by_job(self, job_id):
harvest_errors = self.db.query(HarvestError).filter_by(harvest_job_id=job_id)
if harvest_errors is None:
return None
else:
harvest_errors_data = [
HarvesterDBInterface._to_dict(err) for err in harvest_errors
]
return harvest_errors_data
return [HarvesterDBInterface._to_dict(err) for err in harvest_errors]

def add_harvest_record(self, record_data):
try:
Expand Down Expand Up @@ -264,31 +242,17 @@ def add_harvest_records(self, records_data: list) -> bool:

def get_harvest_record(self, record_id):
result = self.db.query(HarvestRecord).filter_by(id=record_id).first()
if result is None:
return None
return HarvesterDBInterface._to_dict(result)

def get_harvest_record_by_job(self, job_id):
harvest_records = self.db.query(HarvestRecord).filter_by(harvest_job_id=job_id)
if harvest_records is None:
return None
else:
harvest_records_data = [
HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records
]
return harvest_records_data
return [HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records]

def get_harvest_record_by_source(self, source_id):
harvest_records = self.db.query(HarvestRecord).filter_by(
harvest_source_id=source_id
)
if harvest_records is None:
return None
else:
harvest_records_data = [
HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records
]
return harvest_records_data
return [HarvesterDBInterface._to_dict(rcd) for rcd in harvest_records]

def get_source_by_jobid(self, jobid):
harvest_job = self.db.query(HarvestJob).filter_by(id=jobid).first()
Expand Down
3 changes: 1 addition & 2 deletions harvester/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def get_title_from_fgdc(xml_str: str) -> str:


def parse_args(args):

parser = argparse.ArgumentParser(
prog="Harvest Runner", description="etl harvest sources"
)
Expand Down Expand Up @@ -114,6 +113,7 @@ def __init__(self, url: str, user: str, password: str):
self.url = url
self.user = user
self.password = password
self.setup()

def setup(self):
self.client = CloudFoundryClient(self.url)
Expand All @@ -136,7 +136,6 @@ def get_all_running_tasks(self, tasks):
return sum(1 for _ in filter(lambda task: task["state"] == "RUNNING", tasks))

def read_recent_app_logs(self, app_guuid, task_id=None):

app = self.client.v2.apps[app_guuid]
logs = filter(lambda lg: task_id in lg, [str(log) for log in app.recent_logs()])
return "\n".join(logs)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "datagov-harvesting-logic"
version = "0.4.0"
version = "0.4.1"
description = ""
# authors = [
# {name = "Jin Sun", email = "[email protected]"},
Expand Down
19 changes: 18 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,24 @@ def source_data_dcatus_invalid_records_job(
}


@pytest.fixture
def interface_with_multiple_jobs(
interface,
source_data_dcatus: dict,
):
statuses = ["pending", "pending_manual", "in_progress", "complete"]
source_ids = ["1234", "abcd"]
jobs = [
{"status": status, "harvest_source_id": source}
for status in statuses
for source in source_ids
]
for job in jobs:
interface.add_harvest_job(job)

return interface


@pytest.fixture
def internal_compare_data(job_data_dcatus: dict) -> dict:
# ruff: noqa: E501
Expand Down Expand Up @@ -249,7 +267,6 @@ def internal_compare_data(job_data_dcatus: dict) -> dict:

@pytest.fixture
def cf_handler() -> CFHandler:

url = os.getenv("CF_API_URL")
user = os.getenv("CF_SERVICE_USER")
password = os.getenv("CF_SERVICE_AUTH")
Expand Down
32 changes: 30 additions & 2 deletions tests/unit/database/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def test_add_harvest_record(
job_data_dcatus,
record_data_dcatus,
):

interface.add_organization(organization_data)
source = interface.add_harvest_source(source_data_dcatus)
harvest_job = interface.add_harvest_job(job_data_dcatus)
Expand All @@ -113,7 +112,6 @@ def test_add_harvest_records(
job_data_dcatus,
record_data_dcatus,
):

interface.add_organization(organization_data)
interface.add_harvest_source(source_data_dcatus)
interface.add_harvest_job(job_data_dcatus)
Expand All @@ -122,3 +120,33 @@ def test_add_harvest_records(
success = interface.add_harvest_records(records)
assert success is True
assert len(interface.get_all_harvest_records()) == 10

def test_add_harvest_job_with_id(self, interface, job_data_dcatus):
job = interface.add_harvest_job(job_data_dcatus)
assert job.id == job_data_dcatus["id"]
assert job.status == job_data_dcatus["status"]
assert job.harvest_source_id == job_data_dcatus["harvest_source_id"]

def test_add_harvest_job_without_id(self, interface, job_data_dcatus):
job_data_dcatus_id = job_data_dcatus["id"]
del job_data_dcatus["id"]
job = interface.add_harvest_job(job_data_dcatus)
assert job.id
assert job.id != job_data_dcatus_id
assert job.status == job_data_dcatus["status"]
assert job.harvest_source_id == job_data_dcatus["harvest_source_id"]

def test_get_harvest_jobs_by_filter(self, interface_with_multiple_jobs):
filters = {"status": "pending", "harvest_source_id": "1234"}
filtered_list = interface_with_multiple_jobs.get_harvest_jobs_by_filter(filters)
assert len(filtered_list) == 1
assert filtered_list[0]["status"] == "pending"
assert filtered_list[0]["harvest_source_id"] == "1234"

def test_filter_jobs_by_faceted_filter(self, interface_with_multiple_jobs):
faceted_list = interface_with_multiple_jobs.get_harvest_jobs_by_faceted_filter(
"status", ["pending", "pending_manual"]
)
assert len(faceted_list) == 4
assert len([x for x in faceted_list if x["status"] == "pending"]) == 2
assert len([x for x in faceted_list if x["harvest_source_id"] == "1234"]) == 2
Loading

2 comments on commit efc0960

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
harvester
   __init__.py50100% 
   ckan_utils.py11366 95%
   exceptions.py420100% 
   harvest.py2374141 83%
   logger_config.py10100% 
   utils.py791010 87%
TOTAL4775788% 

Tests Skipped Failures Errors Time
51 0 💤 0 ❌ 0 🔥 4.458s ⏱️

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
harvester
   __init__.py50100% 
   ckan_utils.py11366 95%
   exceptions.py420100% 
   harvest.py2374141 83%
   logger_config.py10100% 
   utils.py791010 87%
TOTAL4775788% 

Tests Skipped Failures Errors Time
51 0 💤 0 ❌ 0 🔥 5.145s ⏱️

Please sign in to comment.