Skip to content

Commit

Permalink
add high priority user + qa
Browse files Browse the repository at this point in the history
  • Loading branch information
francesconazzaro committed Sep 19, 2024
1 parent 85ffc5d commit 5e7b14f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
12 changes: 6 additions & 6 deletions cads_broker/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,14 @@ def decrement_qos_rule_running(
return None, None


def get_cost_per_user(
def get_users_queue_from_processing_time(
session: sa.orm.Session,
) -> list[tuple[str, int]]:
"""Get the cost of the user's requests.
"""
statement = sa.text("select user_uid, sum(EXTRACT(EPOCH FROM (coalesce(finished_at, now()) - coalesce(started_at, coalesce(finished_at, now()))))::integer) as cost" \
" from system_requests where (finished_at > (now() - interval '24h') " \
" or status='running') or (status='accepted') group by user_uid order by cost")
"""Build the queue of the users from the processing time."""
statement = sa.text("select user_uid, sum(EXTRACT(EPOCH FROM (coalesce(finished_at, now()) -" \
" coalesce(started_at, coalesce(finished_at, now()))))::integer) as cost" \
" from system_requests where finished_at > (now() - interval '24h') " \
" or status='running' or status='accepted' group by user_uid order by cost")
return session.execute(statement).all()


Expand Down
26 changes: 18 additions & 8 deletions cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@

WORKERS_MULTIPLIER = float(os.getenv("WORKERS_MULTIPLIER", 1))
ONE_SECOND = datetime.timedelta(seconds=1)
HIGH_PRIORITY_USER_UID = os.getenv(
"HIGH_PRIORITY_USER_UID", "8d8ee054-6a09-4da8-a5be-d5dff52bbc5f"
)
BROKER_PRIORITY_ALGORITHM = os.getenv("BROKER_PRIORITY_ALGORITHM", "legacy")


@cachetools.cached( # type: ignore
Expand Down Expand Up @@ -311,9 +315,9 @@ def set_request_error_status(
session=session,
)
requeue = False
if requeue and request.request_metadata.get("resubmit_number", 0) < os.getenv(
"BROKER_REQUEUE_LIMIT", 3
):
if requeue and request.request_metadata.get(
"resubmit_number", 0
) < os.getenv("BROKER_REQUEUE_LIMIT", 3):
logger.info("worker killed: re-queueing", job_id=request_uid)
db.requeue_request(request=request, session=session)
self.queue.add(request_uid, request)
Expand Down Expand Up @@ -570,17 +574,23 @@ def submit_requests(
candidates: Iterable[db.SystemRequest],
) -> None:
"""Check the qos rules and submit the requests to the dask scheduler."""
if os.getenv("BROKER_EXPERIMENTAL_PRIORITY_ALGORITHM", True):
if "BROKER_PRIORITY_ALGORITHM" == "processing_time":
user_requests: dict[str, list[db.SystemRequest]] = {}
for request in candidates:
user_requests.setdefault(request.user_uid, []).append(request)
# all running or started_at within 24h
user_queue = db.get_cost_per_user(session=session_write)
# FIXME: this is a temporary solution to prioritize subrequests from the high priority user
users_queue = [
(HIGH_PRIORITY_USER_UID, 0)
] + db.get_users_queue_from_processing_time(session=session_write)
requests_counter = 0
for user_uid, _ in user_queue:
for user_uid, _ in users_queue:
if user_uid not in user_requests:
continue
request = sorted(user_requests[user_uid], key=lambda x: x.created_at)[0]
request = sorted(
user_requests[user_uid],
key=lambda candidate: self.qos.priority(candidate, session_write),
reverse=True,
)[0]
if self.qos.can_run(
request, session=session_write, scheduler=self.internal_scheduler
):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_02_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ def test_get_request(session_obj: sa.orm.sessionmaker) -> None:
assert request.request_uid == request_uid


def test_get_cost_per_user(session_obj: sa.orm.sessionmaker) -> None:
def test_get_users_queue_per_cost(session_obj: sa.orm.sessionmaker) -> None:
adaptor_properties = mock_config()
request_1 = mock_system_request(
status="successful", adaptor_properties_hash=adaptor_properties.hash,
Expand Down Expand Up @@ -975,7 +975,7 @@ def test_get_cost_per_user(session_obj: sa.orm.sessionmaker) -> None:
session.add(request_6)
session.commit()
with session_obj() as session:
users_cost = db.get_cost_per_user(session)
users_cost = db.get_users_queue_from_processing_time(session)
assert users_cost[0] == ("user3", 0)
assert users_cost[1] == ("user1", 15 * 60 * 60)
assert users_cost[2] == ("user2", 30 * 60 * 60)
Expand Down

0 comments on commit 5e7b14f

Please sign in to comment.