Skip to content

Commit

Permalink
Merge pull request #74 from ecmwf-projects/set-request-status-to-dism…
Browse files Browse the repository at this point in the history
…issed

Set request status to dismissed
  • Loading branch information
francesconazzaro authored Aug 2, 2023
2 parents fe07308 + 63c30cc commit 2138186
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def downgrade() -> None:
op.execute(
"update system_requests set request_body['entry_point']=to_jsonb(\"entry_point\")"
)
op.execute(
"update system_requests set request_body['kwargs']=to_jsonb({})"
)
op.execute("update system_requests set request_body['kwargs']=to_jsonb({})")
op.execute(
"update system_requests set request_body['kwargs']['request']=to_jsonb(request_body['request'])"
)
Expand Down
6 changes: 2 additions & 4 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,11 @@ def get_request_result(


def delete_request(
request_uid: str,
request: SystemRequest,
session: sa.orm.Session,
) -> SystemRequest:
request = get_request(request_uid, session)
) -> None:
session.delete(request)
session.commit()
return request


def init_database(connection_string: str, force: bool = False) -> sa.engine.Engine:
Expand Down
8 changes: 7 additions & 1 deletion cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,15 @@ def sync_database(self, session: sa.orm.Session) -> None:
If the task is not in the dask scheduler, it is re-queued.
"""
statement = sa.select(db.SystemRequest).where(
db.SystemRequest.status == "running"
db.SystemRequest.status.in_(("running", "dismissed"))
)
for request in session.scalars(statement):
# the retrieve API set the status to "dismissed", here the broker deletes the request
# this is to better control the status of the QoS
if request.status == "dismissed":
db.delete_request(request=request, session=session)
self.qos.notify_end_of_request(request, session)
continue
# if request is in futures, go on
if request.request_uid in self.futures:
continue
Expand Down
4 changes: 1 addition & 3 deletions tests/test_02_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,7 @@ def test_delete_request(session_obj: sa.orm.sessionmaker) -> None:
session.add(adaptor_properties)
session.add(request)
session.commit()
request = db.delete_request(request_uid, session=session)
assert request.request_uid == request_uid
assert request.status == "dismissed"
db.delete_request(request, session=session)
with pytest.raises(db.NoResultFound):
with session_obj() as session:
request = db.get_request(request_uid, session=session)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_20_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

import distributed
import pytest
import pytest_mock
import sqlalchemy as sa

Expand Down Expand Up @@ -59,6 +60,7 @@ def test_broker_sync_database(
in_futures_request_uid = str(uuid.uuid4())
in_dask_request_uid = str(uuid.uuid4())
lost_request_uid = str(uuid.uuid4())
dismissed_request_uid = str(uuid.uuid4())
adaptor_metadata = mock_config()
in_futures_request = mock_system_request(
request_uid=in_futures_request_uid,
Expand All @@ -75,11 +77,17 @@ def test_broker_sync_database(
status="running",
adaptor_properties_hash=adaptor_metadata.hash,
)
dismissed_request = mock_system_request(
request_uid=dismissed_request_uid,
status="dismissed",
adaptor_properties_hash=adaptor_metadata.hash,
)
with session_obj() as session:
session.add(adaptor_metadata)
session.add(in_futures_request)
session.add(in_dask_request)
session.add(lost_request)
session.add(dismissed_request)
session.commit()

def mock_get_tasks() -> dict[str, str]:
Expand Down Expand Up @@ -110,3 +118,7 @@ def mock_get_tasks() -> dict[str, str]:
output_request = session.scalars(statement).first()
assert output_request.status == "accepted"
assert output_request.request_metadata.get("resubmit_number") == 1

with pytest.raises(db.NoResultFound):
with session_obj() as session:
db.get_request(dismissed_request_uid, session=session)

0 comments on commit 2138186

Please sign in to comment.