@@ -24,31 +24,12 @@ def __init__(self, host, port=1883, username=None, password=None):
24
24
self .data_sources = []
25
25
self .listeners = {}
26
26
27
+ self .listen_task = None
27
28
async def connect (self ):
28
29
# Create task
29
30
await self .client_handler ()
30
31
31
32
async def client_handler (self ):
32
- logging .info (f"Starting MQTT client to URL: { self .host } " )
33
- reconnect_interval = 5 # [seconds]
34
-
35
- client = Client (self .host , self .port , username = self .username , password = self .password )
36
- while True :
37
- try :
38
- async with client :
39
- await self .listen (client )
40
- except MqttError as error :
41
- logging .error (f'Disconnected from MQTT broker with error: { error } ' )
42
- logging .debug (f'MQTT client disconnected/ended, reconnecting in { reconnect_interval } ...' )
43
- await asyncio .sleep (reconnect_interval )
44
- except (KeyboardInterrupt , asyncio .CancelledError ):
45
- return False
46
- except Exception as error :
47
- logging .error (f'Stopping MQTT client with error: { error } ' )
48
- return False
49
-
50
- async def listen (self , client ):
51
- logging .info (f'MQTT client connected' )
52
33
# Add tasks for each data source handler
53
34
for ds in self .data_sources :
54
35
# Get handlers from data source
@@ -68,19 +49,46 @@ async def listen(self, client):
68
49
logging .debug (f'Creating new prime Listener for topic: { topic } ' )
69
50
self .listeners [topic ] = MQTTListener (topic , funcs )
70
51
71
- async with client .messages () as messages :
72
- # Subscribe to specified topics
73
- for topic , handlers in self .listeners .items ():
74
- await client .subscribe (topic )
75
- # Handle messages that come in
76
- async for message in messages :
77
- topic = message .topic .value
78
- handlers = self .listeners [topic ].handlers
79
- logging .debug (f'Got message for topic: { topic } ' )
80
- for func in handlers :
81
- # Decode to UTF-8
82
- payload = message .payload .decode ()
83
- await func (payload )
52
+ logging .info (f"Starting MQTT client to URL: { self .host } " )
53
+ reconnect_interval = 10 # [seconds]
54
+ loop = asyncio .get_event_loop ()
55
+
56
+ while True :
57
+ try :
58
+ # Listen for MQTT messages in (unawaited) asyncio task
59
+ self .listen_task = loop .create_task (self .listen ())
60
+ await self .listen_task
61
+ except MqttError as error :
62
+ logging .error (f'Disconnected from MQTT broker with error: { error } ' )
63
+ logging .debug (f'MQTT client disconnected/ended, reconnecting in { reconnect_interval } ...' )
64
+ # Cancel task and wait
65
+ self .listen_task .cancel ()
66
+ await asyncio .sleep (reconnect_interval )
67
+ except Exception as error :
68
+ logging .error (f'Stopping MQTT client with error: { error } ' )
69
+ logging .debug (f'MQTT client disconnected/ended, reconnecting in { reconnect_interval } ...' )
70
+ # Cancel task and wait
71
+ self .listen_task .cancel ()
72
+ await asyncio .sleep (reconnect_interval )
73
+ except (KeyboardInterrupt , asyncio .CancelledError ):
74
+ return False
75
+
76
+ async def listen (self ):
77
+ logging .info (f'MQTT client connected' )
78
+ async with Client (self .host , self .port , username = self .username , password = self .password ) as client :
79
+ async with client .messages () as messages :
80
+ # Subscribe to specified topics
81
+ for topic , handlers in self .listeners .items ():
82
+ await client .subscribe (topic )
83
+ # Handle messages that come in
84
+ async for message in messages :
85
+ topic = message .topic .value
86
+ handlers = self .listeners [topic ].handlers
87
+ logging .debug (f'Got message for topic: { topic } ' )
88
+ for func in handlers :
89
+ # Decode to UTF-8
90
+ payload = message .payload .decode ()
91
+ await func (payload )
84
92
85
93
86
94
if __name__ == "__main__" :
0 commit comments