Skip to content

Commit

Permalink
try to fix
Browse files Browse the repository at this point in the history
  • Loading branch information
francesconazzaro committed May 28, 2024
1 parent 69042a4 commit 5a63887
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
14 changes: 14 additions & 0 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,20 @@ def set_request_cache_id(request_uid: str, cache_id: int, session: sa.orm.Sessio
return request


def set_successful_request(
request_uid: str,
session: sa.orm.Session,
) -> SystemRequest:
statement = sa.select(SystemRequest).where(SystemRequest.request_uid == request_uid)
request = session.scalars(statement).one()
if request.status == "successful":
return
request.status = "successful"
request.finished_at = sa.func.now()
session.commit()
return request


def set_request_status(
request_uid: str,
status: str,
Expand Down
20 changes: 10 additions & 10 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,14 @@ def sync_database(self, session: sa.orm.Session) -> None:
# if the task is in memory and it is not in the futures
# it means that the task has been lost by the broker (broker has been restarted)
# the task is successful.
request = db.set_request_status(
request = db.set_successful_request(
request_uid=request.request_uid,
status="successful",
session=session,
)
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
)
if request:
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
)
elif state == "erred":
exception = pickle.loads(task["exception"])
self.set_request_error_status(
Expand Down Expand Up @@ -406,9 +406,8 @@ def on_future_done(self, future: distributed.Future) -> None:
with self.session_maker_write() as session:
if future.status == "finished":
# the result is updated in the database by the worker
request = db.set_request_status(
request = db.set_successful_request(
request_uid=future.key,
status="successful",
session=session,
)
elif future.status == "error":
Expand All @@ -434,9 +433,10 @@ def on_future_done(self, future: distributed.Future) -> None:
# if the dask status is cancelled, the qos has already been reset by sync_database
return
self.futures.pop(future.key, None)
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
)
if request:
self.qos.notify_end_of_request(
request, session, scheduler=self.internal_scheduler
)
logger.info(
"job has finished",
dask_status=future.status,
Expand Down

0 comments on commit 5a63887

Please sign in to comment.