diff --git a/mqtt/MQTT.php b/mqtt/MQTT.php index 5fd0033..3fa83ed 100644 --- a/mqtt/MQTT.php +++ b/mqtt/MQTT.php @@ -1089,52 +1089,59 @@ protected function handle_publish($msgid=0) } } - /** - * Main Loop - * - * @return bool - * @throws \Exception - */ - public function loop() + /** + * Main Loop + * + * @param null|int $handleMessagesCount - if null loop will be infinity. If number loop will be ending after getting $handleMessagesCount + * @return void + * @throws Exception + */ + public function loop($handleMessagesCount = NULL) { - Debug::Log(Debug::DEBUG, 'loop()'); - - while (true) { - # check if any commands awaits or topics to subscribe - if (!$this->cmdstore->countWaits() && empty($this->topics) && empty($this->topics_to_subscribe) && empty($this->subscribe_awaits)) { - Debug::Log(Debug::INFO, "loop(): No tasks, leaving..."); - break; - } - - # Subscribe topics - if (!empty($this->topics_to_subscribe)) { - list($last_subscribe_msgid, $last_subscribe_topics) = $this->do_subscribe(); - $this->subscribe_awaits[$last_subscribe_msgid] = $last_subscribe_topics; - } - # Unsubscribe topics - if (!empty($this->topics_to_unsubscribe)) { - list($last_unsubscribe_msgid, $last_unsubscribe_topics) = $this->do_unsubscribe(); - $this->unsubscribe_awaits[$last_unsubscribe_msgid] = $last_unsubscribe_topics; - } - - try { - # It is the responsibility of the Client to ensure that the interval between Control Packets - # being sent does not exceed the Keep Alive value. In the absence of sending any other Control - # Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23]. - $this->keepalive(); - - $this->handle_message(); - - } catch (Exception\NetworkError $e) { - Debug::Log(Debug::INFO, 'loop(): Connection lost.'); - $this->reconnect(); - $this->subscribe($this->topics); - } catch (\Exception $e) { - throw $e; - } - } - - return true; + Debug::Log(Debug::DEBUG, 'loop()'); + + $iterations = empty($handleMessagesCount) ? TRUE : $handleMessagesCount + 1; + + while ($iterations) { + # check if any commands awaits or topics to subscribe + if (!$this->cmdstore->countWaits() && empty($this->topics) && empty($this->topics_to_subscribe) && empty($this->subscribe_awaits)) { + Debug::Log(Debug::INFO, "loop(): No tasks, leaving..."); + break; + } + + # Subscribe topics + if (!empty($this->topics_to_subscribe)) { + list($last_subscribe_msgid, $last_subscribe_topics) = $this->do_subscribe(); + $this->subscribe_awaits[$last_subscribe_msgid] = $last_subscribe_topics; + } + # Unsubscribe topics + if (!empty($this->topics_to_unsubscribe)) { + list($last_unsubscribe_msgid, $last_unsubscribe_topics) = $this->do_unsubscribe(); + $this->unsubscribe_awaits[$last_unsubscribe_msgid] = $last_unsubscribe_topics; + } + + try { + # It is the responsibility of the Client to ensure that the interval between Control Packets + # being sent does not exceed the Keep Alive value. In the absence of sending any other Control + # Packets, the Client MUST send a PINGREQ Packet [MQTT-3.1.2-23]. + $this->keepalive(); + + $this->handle_message(); + + } catch (Exception\NetworkError $e) { + Debug::Log(Debug::INFO, 'loop(): Connection lost.'); + $this->reconnect(); + $this->subscribe($this->topics); + } catch (\Exception $e) { + throw $e; + } + + if ($iterations !== TRUE) { + $iterations--; + } + } + + $this->disconnect(); } protected $last_ping_time = 0;