Queueing tasks from synchronous code #82
-
|
Hi, On our IoT platform, we have a MQTT listener which enqueues job so that the relevant workers handle messages on their own. mqtt listener code is synchronous. Previously, to send it to arq, my previous developer inserted the task signature directly into the redis queue so that it can be run by arq later. Should I do the same or is there a better way to do this ? I was surprised to see that When I use it, it returns a Task but on worker side, it's never triggered. Enqueued task Task(args=('flowbox/init', None, b'<REDACTED>'), id='f459552d9e634a5986abb8c19523d3d7', _after=None, after=[], delay=None, schedule=None, priority=None, _triggers=None)I will try to use stubs as you recommend but it requires me to run async code block in a sync code. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
|
Side question : if I customise the name of the task, does it change the way we have to call the task or is it used only for logging purposes ? Should we use |
Beta Was this translation helpful? Give feedback.
-
|
Worker is defined as follow: # worker_mqtt.py
worker_mqtt = get_worker(settings.WORKER_MQTT_QUEUE)
@worker_mqtt.task()
async def mqtt_listener_callback_task(mqtt_generic_topic, flowbox_board_id, payload):
... some code ...
@worker_mqtt.cron("* * * * *", name="send_heartbeat", ttl=0)
async def send_heartbeat_mqtt() -> None:
await send_heartbeat_worker(settings.WORKER_MQTT_QUEUE)It is executed as follow: And on backend (fastapi) side: # mqttlistener.py
def enqueue_task(generic_topic, board_id, payload):
if "telemetry" in generic_topic:
worker_mqtt_telemetry = get_worker(settings.WORKER_MQTT_TELEMETRY_QUEUE)
t = worker_mqtt_telemetry.enqueue_unsafe(
"mqtt_listener_telemetry_callback_task",
generic_topic,
board_id,
payload,
)
else:
worker_mqtt = get_worker(settings.WORKER_MQTT_QUEUE)
t = worker_mqtt.enqueue_unsafe(
"mqtt_listener_callback_task",
generic_topic,
board_id,
payload,
)
logger.info(f"Enqueued task {t}")I don't get what is blocking from working... |
Beta Was this translation helpful? Give feedback.
-
|
Looks like you figured it out! The way enqueuing works is a bit quirky but it allows for cool things like the |
Beta Was this translation helpful? Give feedback.
Ok got it - the point was mentionned indeed that for
enqueuefunction:So the working way for me was the addition of
loop.rununtilcomplete()available in the code of my mqtt listener.