Skip to content

Commit 11c9578

Browse files
committed
ruff
1 parent 5debc66 commit 11c9578

File tree

8 files changed

+41
-28
lines changed

8 files changed

+41
-28
lines changed

scheduler/helpers/queues/getters.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
from typing import List, Set
22

3-
from scheduler.types import ConnectionErrorTypes, BrokerMetaData
3+
from scheduler.types import ConnectionErrorTypes, BrokerMetaData, Broker
44
from scheduler.redis_models.worker import WorkerModel
55
from scheduler.settings import (
66
SCHEDULER_CONFIG,
77
get_queue_names,
88
get_queue_configuration,
99
QueueConfiguration,
1010
logger,
11-
Broker,
1211
)
1312
from .queue_logic import Queue
1413

@@ -22,9 +21,6 @@ class QueueConnectionDiscrepancyError(Exception):
2221

2322
def _get_connection(config: QueueConfiguration, use_strict_broker=False):
2423
"""Returns a Broker connection to use based on parameters in SCHEDULER_QUEUES"""
25-
"""
26-
Returns a redis connection from a connection config
27-
"""
2824
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
2925
import fakeredis
3026

scheduler/models/task.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from scheduler.helpers.queues import Queue
2020
from scheduler.helpers.queues import get_queue
2121
from scheduler.redis_models import JobModel
22-
from scheduler.settings import logger, get_queue_names, SCHEDULER_CONFIG
22+
from scheduler.settings import logger, get_queue_names
23+
from scheduler import settings
2324
from scheduler.types import ConnectionType, TASK_TYPES
2425
from .args import TaskArg, TaskKwarg
2526
from ..helpers import utils
@@ -402,17 +403,18 @@ def clean_queue(self):
402403
)
403404

404405
def clean_interval_unit(self):
405-
if SCHEDULER_CONFIG.SCHEDULER_INTERVAL > self.interval_seconds():
406+
config = settings.SCHEDULER_CONFIG
407+
if config.SCHEDULER_INTERVAL > self.interval_seconds():
406408
raise ValidationError(
407409
_("Job interval is set lower than %(queue)r queue's interval. minimum interval is %(interval)"),
408410
code="invalid",
409-
params={"queue": self.queue, "interval": SCHEDULER_CONFIG.SCHEDULER_INTERVAL},
411+
params={"queue": self.queue, "interval": config.SCHEDULER_INTERVAL},
410412
)
411-
if self.interval_seconds() % SCHEDULER_CONFIG.SCHEDULER_INTERVAL:
413+
if self.interval_seconds() % config.SCHEDULER_INTERVAL:
412414
raise ValidationError(
413415
_("Job interval is not a multiple of rq_scheduler's interval frequency: %(interval)ss"),
414416
code="invalid",
415-
params={"interval": SCHEDULER_CONFIG.SCHEDULER_INTERVAL},
417+
params={"interval": config.SCHEDULER_INTERVAL},
416418
)
417419

418420
def clean_result_ttl(self) -> None:

scheduler/tests/test_mgmt_commands/test_scheduler_stats.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from django.core.management import call_command
2-
from django.test import TestCase
2+
from django.test import TestCase, override_settings
33

44
from scheduler.tests import test_settings # noqa
5+
from scheduler.types import SchedulerConfiguration
56

67

78
class SchedulerStatsTest(TestCase):
9+
@override_settings(SCHEDULER_CONFIG=SchedulerConfiguration(SCHEDULER_INTERVAL=1))
810
def test_scheduler_stats__does_not_fail(self):
911
call_command("scheduler_stats", "-j")
1012
call_command("scheduler_stats", "-y")

scheduler/tests/test_mgmt_commands/test_scheduler_worker.py

+7
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
class SchedulerWorkerTestCase(TestCase):
1212
def test_scheduler_worker__no_queues_params(self):
13+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1
1314
queue = get_queue("default")
1415

1516
# enqueue some jobs that will fail
@@ -25,6 +26,7 @@ def test_scheduler_worker__no_queues_params(self):
2526
for job_name in job_names:
2627
job = JobModel.get(name=job_name, connection=queue.connection)
2728
self.assertTrue(job.is_failed)
29+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 10
2830

2931
def test_scheduler_worker__run_jobs(self):
3032
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1
@@ -43,8 +45,10 @@ def test_scheduler_worker__run_jobs(self):
4345
for job_name in job_names:
4446
job = JobModel.get(name=job_name, connection=queue.connection)
4547
self.assertTrue(job.is_failed)
48+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 10
4649

4750
def test_scheduler_worker__worker_with_two_queues(self):
51+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1
4852
queue = get_queue("default")
4953
queue2 = get_queue("django_tasks_scheduler_test")
5054

@@ -63,8 +67,10 @@ def test_scheduler_worker__worker_with_two_queues(self):
6367
for job_name in job_names:
6468
job = JobModel.get(name=job_name, connection=queue.connection)
6569
self.assertTrue(job.is_failed)
70+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 10
6671

6772
def test_scheduler_worker__worker_with_one_queue__does_not_perform_other_queue_job(self):
73+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1
6874
queue = get_queue("default")
6975
queue2 = get_queue("django_tasks_scheduler_test")
7076

@@ -80,3 +86,4 @@ def test_scheduler_worker__worker_with_one_queue__does_not_perform_other_queue_j
8086
other_job = JobModel.get(other_job.name, connection=queue.connection)
8187

8288
self.assertTrue(other_job.is_queued, f"Expected other job to be queued but status={other_job.status}")
89+
SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 10

scheduler/tests/test_task_types/test_repeatable_task.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from scheduler.tests.test_task_types.test_task_model import BaseTestCases
1010
from scheduler.tests.testtools import task_factory, _get_task_scheduled_job_from_registry
1111
from scheduler.models import TaskType
12+
from scheduler.types import SchedulerConfiguration
1213

1314

1415
class TestRepeatableTask(BaseTestCases.TestSchedulableTask):
@@ -41,7 +42,7 @@ def test_clean_seconds(self):
4142
job.interval_unit = "seconds"
4243
self.assertIsNone(job.clean())
4344

44-
@override_settings(SCHEDULER_CONFIG={"SCHEDULER_INTERVAL": 10})
45+
@override_settings(SCHEDULER_CONFIG=SchedulerConfiguration(SCHEDULER_INTERVAL=10))
4546
def test_clean_too_frequent(self):
4647
job = task_factory(self.task_type)
4748
job.queue = self.queue_name
@@ -52,6 +53,7 @@ def test_clean_too_frequent(self):
5253
with self.assertRaises(ValidationError):
5354
job.clean_interval_unit()
5455

56+
@override_settings(SCHEDULER_CONFIG=SchedulerConfiguration(SCHEDULER_INTERVAL=10))
5557
def test_clean_not_multiple(self):
5658
job = task_factory(self.task_type)
5759
job.queue = self.queue_name

scheduler/views/queue_views.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414
from scheduler.types import ConnectionErrorTypes, ResponseErrorTypes
1515
from scheduler.helpers.queues import Queue, get_all_workers
1616
from scheduler.redis_models import JobModel, JobNamesRegistry, WorkerModel
17-
from scheduler.settings import SCHEDULER_CONFIG, get_queue_names, logger
17+
from scheduler.settings import get_queue_names, logger
18+
from scheduler import settings
1819
from scheduler.views.helpers import get_queue
1920
from scheduler.worker.commands import StopJobCommand, send_command
2021

2122

2223
def _get_registry_job_list(queue: Queue, registry: JobNamesRegistry, page: int) -> Tuple[List[JobModel], int, range]:
23-
items_per_page = SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE
24+
items_per_page = settings.SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE
2425
num_jobs = registry.count(queue.connection)
2526
job_list = list()
2627

@@ -85,7 +86,7 @@ def queue_workers(request: HttpRequest, queue_name: str) -> HttpResponse:
8586

8687
def stats_json(request: HttpRequest) -> Union[JsonResponse, HttpResponseNotFound]:
8788
auth_token = request.headers.get("Authorization")
88-
token_validation_func = SCHEDULER_CONFIG.TOKEN_VALIDATION_METHOD
89+
token_validation_func = settings.SCHEDULER_CONFIG.TOKEN_VALIDATION_METHOD
8990
if request.user.is_staff or (token_validation_func and auth_token and token_validation_func(auth_token)):
9091
return JsonResponse(get_statistics())
9192

scheduler/worker/scheduler.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class SchedulerStatus(str, Enum):
2424

2525

2626
def _reschedule_tasks():
27-
enabled_jobs = Task.objects.filter(enabled=True)
27+
enabled_jobs = list(Task.objects.filter(enabled=True))
2828
for item in enabled_jobs:
2929
logger.debug(f"Rescheduling {str(item)}")
3030
item.save()
@@ -48,7 +48,7 @@ def __init__(
4848
self.connection = connection
4949
self.interval = interval
5050
self._stop_requested = False
51-
self._status = SchedulerStatus.STOPPED
51+
self.status = SchedulerStatus.STOPPED
5252
self._thread = None
5353
self._pid: Optional[int] = None
5454
self.worker_name = worker_name
@@ -92,7 +92,7 @@ def start(self) -> None:
9292
locks = self._acquire_locks()
9393
if len(locks) == 0:
9494
return
95-
self._status = SchedulerStatus.STARTED
95+
self.status = SchedulerStatus.STARTED
9696
self._thread = Thread(target=run_scheduler, args=(self,), name="scheduler-thread")
9797
self._thread.start()
9898

@@ -117,7 +117,7 @@ def stop(self):
117117
f"[Scheduler {self.worker_name}/{self.pid}] Stopping scheduler, releasing locks for {', '.join(self._locks.keys())}..."
118118
)
119119
self.release_locks()
120-
self._status = SchedulerStatus.STOPPED
120+
self.status = SchedulerStatus.STOPPED
121121

122122
def release_locks(self):
123123
"""Release acquired locks"""
@@ -145,7 +145,7 @@ def work(self) -> None:
145145

146146
def enqueue_scheduled_jobs(self) -> None:
147147
"""Enqueue jobs whose timestamp is in the past"""
148-
self._status = SchedulerStatus.WORKING
148+
self.status = SchedulerStatus.WORKING
149149
_reschedule_tasks()
150150

151151
for registry in self._scheduled_job_registries:
@@ -163,7 +163,7 @@ def enqueue_scheduled_jobs(self) -> None:
163163
if job is not None:
164164
queue.enqueue_job(job, connection=pipeline, at_front=bool(job.at_front))
165165
pipeline.execute()
166-
self._status = SchedulerStatus.STARTED
166+
self.status = SchedulerStatus.STARTED
167167

168168

169169
def run_scheduler(scheduler: WorkerScheduler):

scheduler/worker/worker.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
ResponseErrorTypes,
3232
)
3333
from .commands import WorkerCommandsChannelListener
34-
from .scheduler import WorkerScheduler
34+
from .scheduler import WorkerScheduler, SchedulerStatus
3535
from ..redis_models.lock import QueueLock
3636
from ..redis_models.worker import WorkerStatus
3737

@@ -337,8 +337,8 @@ def bootstrap(self):
337337
self.scheduler.start()
338338
self._model.has_scheduler = True
339339
self._model.save(connection=self.connection)
340-
if self.burst and self.with_scheduler:
341-
self.scheduler.stop()
340+
if self.with_scheduler and self.burst:
341+
self.scheduler.request_stop_and_wait()
342342
self._model.has_scheduler = False
343343
self._model.save(connection=self.connection)
344344
qnames = [queue.name for queue in self.queues]
@@ -371,16 +371,19 @@ def run_maintenance_tasks(self):
371371
1. Check if scheduler should be started.
372372
2. Cleaning registries
373373
"""
374-
if self.with_scheduler and not self._model.has_scheduler:
374+
self.clean_registries()
375+
if not self.with_scheduler:
376+
return
377+
if self.scheduler is None:
375378
self.scheduler = WorkerScheduler(self.queues, worker_name=self.name, connection=self.connection)
379+
if self.scheduler.status == SchedulerStatus.STOPPED:
376380
self.scheduler.start()
377381
self._model.has_scheduler = True
378382
self._model.save(connection=self.connection)
379-
if self.burst and self.with_scheduler:
380-
self.scheduler.stop()
383+
if self.burst:
384+
self.scheduler.request_stop_and_wait()
381385
self._model.has_scheduler = False
382386
self._model.save(connection=self.connection)
383-
self.clean_registries()
384387

385388
def dequeue_job_and_maintain_ttl(
386389
self, timeout: Optional[int], max_idle_time: Optional[int] = None

0 commit comments

Comments
 (0)