Skip to content

Commit

Permalink
Merge pull request #68 from ecmwf-projects/adaptor-config-in-a-new-table
Browse files Browse the repository at this point in the history
[WIP] Adaptor config in a new table
  • Loading branch information
francesconazzaro committed Jul 26, 2023
2 parents b981e11 + 9c9acc1 commit 9f3ad63
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 74 deletions.
53 changes: 53 additions & 0 deletions alembic/versions/e09564dc7652_add_adaptor_configuration_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Add Adaptor configuration table.
Revision ID: e09564dc7652
Revises: cc6cc1cb3529
Create Date: 2023-07-24 10:41:11.679876
"""
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB

from alembic import op

# revision identifiers, used by Alembic.
revision = "e09564dc7652"
down_revision = "cc6cc1cb3529"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"adaptor_properties",
sa.Column("hash", sa.Text, primary_key=True),
sa.Column("config", JSONB),
sa.Column("form", JSONB),
)
op.add_column(
"system_requests",
sa.Column(
"adaptor_properties_hash", sa.Text, sa.ForeignKey("adaptor_properties.hash")
),
)
op.add_column(
"system_requests",
sa.Column("entry_point", sa.Text),
)
op.execute("update system_requests set entry_point=request_body['entry_point']")
op.execute(
"insert into adaptor_properties (hash, config, form) values "
"('098f6bcd4621d373cade4e832627b4f6', '{}', '{}')"
)
op.execute(
"update system_requests set adaptor_properties_hash='098f6bcd4621d373cade4e832627b4f6'"
)


def downgrade() -> None:
op.execute(
"update system_requests set request_body['entry_point']=to_jsonb(\"entry_point\")"
)
op.drop_column("system_requests", "entry_point")
op.drop_column("system_requests", "adaptor_properties_hash")
op.drop_table("adaptor_properties")
100 changes: 83 additions & 17 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""SQLAlchemy ORM model."""
import datetime
import hashlib
import json
import os
import uuid
from typing import Any
Expand Down Expand Up @@ -29,8 +31,18 @@ class NoResultFound(Exception):
pass


class AdaptorProperties(BaseModel):
"""Adaptor Metadata ORM model."""

__tablename__ = "adaptor_properties"

hash = sa.Column(sa.Text, primary_key=True)
config = sa.Column(JSONB)
form = sa.Column(JSONB)


class SystemRequest(BaseModel):
"""Resource ORM model."""
"""System Request ORM model."""

__tablename__ = "system_requests"

Expand All @@ -54,6 +66,10 @@ class SystemRequest(BaseModel):
updated_at = sa.Column(sa.TIMESTAMP, default=sa.func.now(), onupdate=sa.func.now())
origin = sa.Column(sa.Text, default="ui")
portal = sa.Column(sa.Text)
adaptor_properties_hash = sa.Column(
sa.Text, sa.ForeignKey("adaptor_properties.hash"), nullable=False
)
entry_point = sa.Column(sa.Text)

__table_args__: tuple[sa.ForeignKeyConstraint, dict[None, None]] = (
sa.ForeignKeyConstraint(
Expand All @@ -64,6 +80,7 @@ class SystemRequest(BaseModel):

# joined is temporary
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="joined")

@property
def age(self):
Expand Down Expand Up @@ -151,15 +168,15 @@ def count_finished_requests_per_user(
def count_requests(
session: sa.orm.Session,
status: str | None = None,
process_id: str | None = None,
entry_point: str | None = None,
user_uid: str | None = None,
) -> int:
"""Count requests."""
statement = session.query(SystemRequest)
if status is not None:
statement = statement.filter(SystemRequest.status == status)
if process_id is not None:
statement = statement.filter(SystemRequest.process_id == process_id)
if entry_point is not None:
statement = statement.filter(SystemRequest.entry_point == entry_point)
if user_uid is not None:
statement = statement.filter(SystemRequest.user_uid == user_uid)
return statement.count()
Expand Down Expand Up @@ -271,15 +288,16 @@ def count_waiting_users_queued(session: sa.orm.Session):
).all()


def count_running_users(session: sa.orm.Session) -> list:
def count_users(status: str, entry_point: str, session: sa.orm.Session) -> int:
"""Users that have running requests, per dataset."""
return session.execute(
sa.select(
SystemRequest.process_id, sa.func.count(sa.distinct(SystemRequest.user_uid))
return (
session.query(SystemRequest.user_uid)
.filter(
SystemRequest.status == status, SystemRequest.entry_point == entry_point
)
.filter(SystemRequest.status == "running")
.group_by(SystemRequest.process_id)
).all()
.distinct()
.count()
)


def set_request_status(
Expand Down Expand Up @@ -330,31 +348,78 @@ def logger_kwargs(request: SystemRequest) -> dict[str, str]:
"finished_at": request.finished_at.isoformat()
if request.finished_at is not None
else None,
"request_kwargs": request.request_body.get("kwargs", {}).get("request", {}),
"request_kwargs": request.request_body.get("request", {}),
"user_request": True,
"process_id": request.process_id,
"resubmit_number": request.request_metadata.get("resubmit_number", 0),
"origin": request.origin,
"entry_point": request.request_body.get("entry_point", None),
"entry_point": request.entry_point,
**request.response_error,
}
return kwargs


def generate_adaptor_properties_hash(
config: dict[str, Any], form: dict[str, Any]
) -> str:
config_form = {"config": config, "form": form}
return hashlib.md5(
json.dumps(config_form, sort_keys=True).encode("utf-8")
).hexdigest()


def get_adaptor_properties(
adaptor_properties_hash: str,
session: sa.orm.Session,
) -> AdaptorProperties | None:
try:
statement = sa.select(AdaptorProperties).where(
AdaptorProperties.hash == adaptor_properties_hash
)
return session.scalars(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
return None


def add_adaptor_properties(
hash: str,
config: dict[str, Any],
form: dict[str, Any],
session: sa.orm.Session,
):
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)


def create_request(
session: sa.orm.Session,
user_uid: str,
setup_code: str,
entry_point: str,
kwargs: dict[str, Any],
request: dict[str, Any],
process_id: str,
portal: str,
adaptor_config: dict[str, Any],
adaptor_form: dict[str, Any],
adaptor_properties_hash: str,
metadata: dict[str, Any] = {},
resources: dict[str, Any] = {},
origin: str = "ui",
request_uid: str | None = None,
) -> dict[str, Any]:
"""Temporary function to create a request."""
"""Create a request."""
if (
get_adaptor_properties(
adaptor_properties_hash=adaptor_properties_hash, session=session
)
is None
):
add_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
metadata["resources"] = resources
request = SystemRequest(
request_uid=request_uid or str(uuid.uuid4()),
Expand All @@ -363,12 +428,13 @@ def create_request(
status="accepted",
request_body={
"setup_code": setup_code,
"entry_point": entry_point,
"kwargs": kwargs,
"request": request,
},
request_metadata=metadata,
origin=origin,
portal=portal,
adaptor_properties_hash=adaptor_properties_hash,
entry_point=entry_point,
)
session.add(request)
session.commit()
Expand Down
8 changes: 5 additions & 3 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def register_functions(self):
)
expressions.FunctionFactory.FunctionFactory.register_function(
"adaptor",
lambda context, *args: context.request.request_body.get("entry_point", ""),
lambda context, *args: context.request.entry_point,
)
expressions.FunctionFactory.FunctionFactory.register_function(
"userRequestCount",
Expand Down Expand Up @@ -243,8 +243,10 @@ def submit_request(
worker.submit_workflow,
key=request.request_uid,
setup_code=request.request_body.get("setup_code", ""),
entry_point=request.request_body.get("entry_point", ""),
kwargs=request.request_body.get("kwargs", {}),
entry_point=request.entry_point,
config=request.adaptor_properties.config,
form=request.adaptor_properties.form,
request=request.request_body.get("request", {}),
resources=request.request_metadata.get("resources", {}),
metadata=request.request_metadata,
)
Expand Down
Loading

0 comments on commit 9f3ad63

Please sign in to comment.