Skip to content

Commit

Permalink
Merge pull request #69 from ecmwf-projects/revert-68-adaptor-config-i…
Browse files Browse the repository at this point in the history
…n-a-new-table

Revert "[WIP] Adaptor config in a new table"
  • Loading branch information
francesconazzaro committed Jul 26, 2023
2 parents 9f3ad63 + 7bad3d0 commit ddb03e3
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 427 deletions.
53 changes: 0 additions & 53 deletions alembic/versions/e09564dc7652_add_adaptor_configuration_table.py

This file was deleted.

100 changes: 17 additions & 83 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""SQLAlchemy ORM model."""
import datetime
import hashlib
import json
import os
import uuid
from typing import Any
Expand Down Expand Up @@ -31,18 +29,8 @@ class NoResultFound(Exception):
pass


class AdaptorProperties(BaseModel):
"""Adaptor Metadata ORM model."""

__tablename__ = "adaptor_properties"

hash = sa.Column(sa.Text, primary_key=True)
config = sa.Column(JSONB)
form = sa.Column(JSONB)


class SystemRequest(BaseModel):
"""System Request ORM model."""
"""Resource ORM model."""

__tablename__ = "system_requests"

Expand All @@ -66,10 +54,6 @@ class SystemRequest(BaseModel):
updated_at = sa.Column(sa.TIMESTAMP, default=sa.func.now(), onupdate=sa.func.now())
origin = sa.Column(sa.Text, default="ui")
portal = sa.Column(sa.Text)
adaptor_properties_hash = sa.Column(
sa.Text, sa.ForeignKey("adaptor_properties.hash"), nullable=False
)
entry_point = sa.Column(sa.Text)

__table_args__: tuple[sa.ForeignKeyConstraint, dict[None, None]] = (
sa.ForeignKeyConstraint(
Expand All @@ -80,7 +64,6 @@ class SystemRequest(BaseModel):

# joined is temporary
cache_entry = sa.orm.relationship(cacholote.database.CacheEntry, lazy="joined")
adaptor_properties = sa.orm.relationship(AdaptorProperties, lazy="joined")

@property
def age(self):
Expand Down Expand Up @@ -168,15 +151,15 @@ def count_finished_requests_per_user(
def count_requests(
session: sa.orm.Session,
status: str | None = None,
entry_point: str | None = None,
process_id: str | None = None,
user_uid: str | None = None,
) -> int:
"""Count requests."""
statement = session.query(SystemRequest)
if status is not None:
statement = statement.filter(SystemRequest.status == status)
if entry_point is not None:
statement = statement.filter(SystemRequest.entry_point == entry_point)
if process_id is not None:
statement = statement.filter(SystemRequest.process_id == process_id)
if user_uid is not None:
statement = statement.filter(SystemRequest.user_uid == user_uid)
return statement.count()
Expand Down Expand Up @@ -288,16 +271,15 @@ def count_waiting_users_queued(session: sa.orm.Session):
).all()


def count_users(status: str, entry_point: str, session: sa.orm.Session) -> int:
def count_running_users(session: sa.orm.Session) -> list:
"""Users that have running requests, per dataset."""
return (
session.query(SystemRequest.user_uid)
.filter(
SystemRequest.status == status, SystemRequest.entry_point == entry_point
return session.execute(
sa.select(
SystemRequest.process_id, sa.func.count(sa.distinct(SystemRequest.user_uid))
)
.distinct()
.count()
)
.filter(SystemRequest.status == "running")
.group_by(SystemRequest.process_id)
).all()


def set_request_status(
Expand Down Expand Up @@ -348,78 +330,31 @@ def logger_kwargs(request: SystemRequest) -> dict[str, str]:
"finished_at": request.finished_at.isoformat()
if request.finished_at is not None
else None,
"request_kwargs": request.request_body.get("request", {}),
"request_kwargs": request.request_body.get("kwargs", {}).get("request", {}),
"user_request": True,
"process_id": request.process_id,
"resubmit_number": request.request_metadata.get("resubmit_number", 0),
"origin": request.origin,
"entry_point": request.entry_point,
"entry_point": request.request_body.get("entry_point", None),
**request.response_error,
}
return kwargs


def generate_adaptor_properties_hash(
config: dict[str, Any], form: dict[str, Any]
) -> str:
config_form = {"config": config, "form": form}
return hashlib.md5(
json.dumps(config_form, sort_keys=True).encode("utf-8")
).hexdigest()


def get_adaptor_properties(
adaptor_properties_hash: str,
session: sa.orm.Session,
) -> AdaptorProperties | None:
try:
statement = sa.select(AdaptorProperties).where(
AdaptorProperties.hash == adaptor_properties_hash
)
return session.scalars(statement).one()
except sqlalchemy.orm.exc.NoResultFound:
return None


def add_adaptor_properties(
hash: str,
config: dict[str, Any],
form: dict[str, Any],
session: sa.orm.Session,
):
adaptor_properties = AdaptorProperties(hash=hash, config=config, form=form)
session.add(adaptor_properties)


def create_request(
session: sa.orm.Session,
user_uid: str,
setup_code: str,
entry_point: str,
request: dict[str, Any],
kwargs: dict[str, Any],
process_id: str,
portal: str,
adaptor_config: dict[str, Any],
adaptor_form: dict[str, Any],
adaptor_properties_hash: str,
metadata: dict[str, Any] = {},
resources: dict[str, Any] = {},
origin: str = "ui",
request_uid: str | None = None,
) -> dict[str, Any]:
"""Create a request."""
if (
get_adaptor_properties(
adaptor_properties_hash=adaptor_properties_hash, session=session
)
is None
):
add_adaptor_properties(
hash=adaptor_properties_hash,
config=adaptor_config,
form=adaptor_form,
session=session,
)
"""Temporary function to create a request."""
metadata["resources"] = resources
request = SystemRequest(
request_uid=request_uid or str(uuid.uuid4()),
Expand All @@ -428,13 +363,12 @@ def create_request(
status="accepted",
request_body={
"setup_code": setup_code,
"request": request,
"entry_point": entry_point,
"kwargs": kwargs,
},
request_metadata=metadata,
origin=origin,
portal=portal,
adaptor_properties_hash=adaptor_properties_hash,
entry_point=entry_point,
)
session.add(request)
session.commit()
Expand Down
8 changes: 3 additions & 5 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def register_functions(self):
)
expressions.FunctionFactory.FunctionFactory.register_function(
"adaptor",
lambda context, *args: context.request.entry_point,
lambda context, *args: context.request.request_body.get("entry_point", ""),
)
expressions.FunctionFactory.FunctionFactory.register_function(
"userRequestCount",
Expand Down Expand Up @@ -243,10 +243,8 @@ def submit_request(
worker.submit_workflow,
key=request.request_uid,
setup_code=request.request_body.get("setup_code", ""),
entry_point=request.entry_point,
config=request.adaptor_properties.config,
form=request.adaptor_properties.form,
request=request.request_body.get("request", {}),
entry_point=request.request_body.get("entry_point", ""),
kwargs=request.request_body.get("kwargs", {}),
resources=request.request_metadata.get("resources", {}),
metadata=request.request_metadata,
)
Expand Down
Loading

0 comments on commit ddb03e3

Please sign in to comment.