Skip to content

Commit

Permalink
exposing private properties used by client libraries
Browse files Browse the repository at this point in the history
  • Loading branch information
brentru committed Aug 23, 2019
1 parent 25b344a commit 35ba546
Showing 1 changed file with 88 additions and 89 deletions.
177 changes: 88 additions & 89 deletions adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,33 +115,32 @@ def __init__(self, socket, broker, port=None, username=None,
if port is not None:
self.port = port
# session identifiers
self._user = username
self.user = username
# [MQTT-3.1.3.5]
self._pass = password
if self._pass is not None and len(password.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
self.password = password
if self.password is not None and len(password.encode('utf-8')) > MQTT_TOPIC_LENGTH_LIMIT:
raise MMQTTException('Password length is too large.')
if client_id is not None:
# user-defined client_id MAY allow client_id's > 23 bytes or
# non-alpha-numeric characters
self._client_id = client_id
self.client_id = client_id
else:
# assign a unique client_id
self._client_id = 'cpy{0}{1}'.format(microcontroller.cpu.uid[randint(0, 15)],
randint(0, 9))
self.client_id = 'cpy{0}{1}'.format(microcontroller.cpu.uid[randint(0, 15)],
randint(0, 9))
# generated client_id's enforce spec.'s length rules
if len(self._client_id) > 23 or not self._client_id:
if len(self.client_id) > 23 or not self.client_id:
raise ValueError('MQTT Client ID must be between 1 and 23 bytes')
self._logger = None
self.keep_alive = keep_alive
self.user_data = None
self.logger = None
if log is True:
self._logger = logging.getLogger('log')
self._logger.setLevel(logging.INFO)
self.logger = logging.getLogger('log')
self.logger.setLevel(logging.INFO)
self._sock = None
self._is_connected = False
self._msg_size_lim = MQTT_MSG_SZ_LIM
self.packet_id = 0
self._keep_alive = keep_alive
self._pid = 0
self._user_data = None
self._timestamp = 0
# List of subscribed topics, used for tracking
self._subscribed_topics = []
Expand Down Expand Up @@ -177,8 +176,8 @@ def last_will(self, topic=None, message=None, qos=0, retain=False):
raise MMQTTException('Last Will should be defined before connect() is called.')
if qos < 0 or qos > 2:
raise MMQTTException("Invalid QoS level, must be between 0 and 2.")
if self._logger is not None:
self._logger.debug('Setting last will properties')
if self.logger is not None:
self.logger.debug('Setting last will properties')
self._lw_qos = qos
self._lw_topic = topic
self._lw_msg = message
Expand All @@ -190,14 +189,14 @@ def connect(self, clean_session=True):
:param bool clean_session: Establishes a persistent session.
"""
self._set_interface()
if self._logger is not None:
self._logger.debug('Creating new socket')
if self.logger is not None:
self.logger.debug('Creating new socket')
self._sock = self._socket.socket()
self._sock.settimeout(10)
if self.port == 8883:
try:
if self._logger is not None:
self._logger.debug('Attempting to establish secure MQTT connection...')
if self.logger is not None:
self.logger.debug('Attempting to establish secure MQTT connection...')
self._sock.connect((self.broker, self.port), TLS_MODE)
except RuntimeError:
raise MMQTTException("Invalid broker address defined.")
Expand All @@ -207,8 +206,8 @@ def connect(self, clean_session=True):
else:
addr = (self.broker, self.port)
try:
if self._logger is not None:
self._logger.debug('Attempting to establish insecure MQTT connection...')
if self.logger is not None:
self.logger.debug('Attempting to establish insecure MQTT connection...')
#self._sock.connect((self.broker, self.port), TCP_MODE)
self._sock.connect(addr, TCP_MODE)
except RuntimeError as e:
Expand All @@ -223,14 +222,14 @@ def connect(self, clean_session=True):
var_header[6] = clean_session << 1

# Set up variable header and remaining_length
remaining_length = 12 + len(self._client_id)
if self._user is not None:
remaining_length += 2 + len(self._user) + 2 + len(self._pass)
remaining_length = 12 + len(self.client_id)
if self.user is not None:
remaining_length += 2 + len(self.user) + 2 + len(self.password)
var_header[6] |= 0xC0
if self._keep_alive:
assert self._keep_alive < MQTT_TOPIC_LENGTH_LIMIT
var_header[7] |= self._keep_alive >> 8
var_header[8] |= self._keep_alive & 0x00FF
if self.keep_alive:
assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT
var_header[7] |= self.keep_alive >> 8
var_header[8] |= self.keep_alive & 0x00FF
if self._lw_topic:
remaining_length += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
Expand All @@ -254,25 +253,25 @@ def connect(self, clean_session=True):
fixed_header.append(remaining_length)
fixed_header.append(0x00)

if self._logger is not None:
self._logger.debug('Sending CONNECT to broker')
self._logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header,
var_header))
if self.logger is not None:
self.logger.debug('Sending CONNECT to broker')
self.logger.debug('Fixed Header: {}\nVariable Header: {}'.format(fixed_header,
var_header))
self._sock.write(fixed_header)
self._sock.write(var_header)
# [MQTT-3.1.3-4]
self._send_str(self._client_id)
self._send_str(self.client_id)
if self._lw_topic:
# [MQTT-3.1.3-11]
self._send_str(self._lw_topic)
self._send_str(self._lw_msg)
if self._user is None:
self._user = None
if self.user is None:
self.user = None
else:
self._send_str(self._user)
self._send_str(self._pass)
if self._logger is not None:
self._logger.debug('Receiving CONNACK packet from broker')
self._send_str(self.user)
self._send_str(self.password)
if self.logger is not None:
self.logger.debug('Receiving CONNACK packet from broker')
while True:
op = self._wait_for_msg()
if op == 32:
Expand All @@ -283,34 +282,34 @@ def connect(self, clean_session=True):
self._is_connected = True
result = rc[0] & 1
if self.on_connect is not None:
self.on_connect(self, self._user_data, result, rc[2])
self.on_connect(self, self.user_data, result, rc[2])
return result

def disconnect(self):
"""Disconnects the MiniMQTT client from the MQTT broker.
"""
self.is_connected()
if self._logger is not None:
self._logger.debug('Sending DISCONNECT packet to broker')
if self.logger is not None:
self.logger.debug('Sending DISCONNECT packet to broker')
self._sock.write(MQTT_DISCONNECT)
if self._logger is not None:
self._logger.debug('Closing socket')
if self.logger is not None:
self.logger.debug('Closing socket')
self._sock.close()
self._is_connected = False
self._subscribed_topics = None
if self.on_disconnect is not None:
self.on_disconnect(self, self._user_data, 0)
self.on_disconnect(self, self.user_data, 0)

def ping(self):
"""Pings the MQTT Broker to confirm if the broker is alive or if
there is an active network connection.
"""
self.is_connected()
if self._logger is not None:
self._logger.debug('Sending PINGREQ')
if self.logger is not None:
self.logger.debug('Sending PINGREQ')
self._sock.write(MQTT_PINGREQ)
if self._logger is not None:
self._logger.debug('Checking PINGRESP')
if self.logger is not None:
self.logger.debug('Checking PINGRESP')
while True:
op = self._wait_for_msg(0.5)
if op == 208:
Expand Down Expand Up @@ -373,23 +372,23 @@ def publish(self, topic, msg, retain=False, qos=0):
sz >>= 7
i += 1
pkt[i] = sz
if self._logger is not None:
self._logger.debug('Sending PUBLISH\nTopic: {0}\nMsg: {1}\
if self.logger is not None:
self.logger.debug('Sending PUBLISH\nTopic: {0}\nMsg: {1}\
\nQoS: {2}\nRetain? {3}'.format(topic, msg, qos, retain))
self._sock.write(pkt)
self._send_str(topic)
if qos == 0:
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, self._pid)
self.on_publish(self, self.user_data, topic, self._pid)
if qos > 0:
self._pid += 1
pid = self._pid
struct.pack_into("!H", pkt, 0, pid)
self._sock.write(pkt)
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, pid)
if self._logger is not None:
self._logger.debug('Sending PUBACK')
self.on_publish(self, self.user_data, topic, pid)
if self.logger is not None:
self.logger.debug('Sending PUBACK')
self._sock.write(msg)
if qos == 1:
while True:
Expand All @@ -401,12 +400,12 @@ def publish(self, topic, msg, retain=False, qos=0):
rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1]
if pid == rcv_pid:
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, rcv_pid)
self.on_publish(self, self.user_data, topic, rcv_pid)
return
elif qos == 2:
assert 0
if self.on_publish is not None:
self.on_publish(self, self._user_data, topic, rcv_pid)
self.on_publish(self, self.user_data, topic, rcv_pid)

def subscribe(self, topic, qos=0):
"""Subscribes to a topic on the MQTT Broker.
Expand Down Expand Up @@ -466,9 +465,9 @@ def subscribe(self, topic, qos=0):
topic_size = len(t).to_bytes(2, 'big')
qos_byte = q.to_bytes(1, 'big')
packet += topic_size + t + qos_byte
if self._logger is not None:
if self.logger is not None:
for t, q in topics:
self._logger.debug('SUBSCRIBING to topic {0} with QoS {1}'.format(t, q))
self.logger.debug('SUBSCRIBING to topic {0} with QoS {1}'.format(t, q))
self._sock.write(packet)
while True:
op = self._wait_for_msg()
Expand All @@ -479,7 +478,7 @@ def subscribe(self, topic, qos=0):
raise MMQTTException('SUBACK Failure!')
for t, q in topics:
if self.on_subscribe is not None:
self.on_subscribe(self, self._user_data, t, q)
self.on_subscribe(self, self.user_data, t, q)
self._subscribed_topics.append(t)
return

Expand Down Expand Up @@ -521,12 +520,12 @@ def unsubscribe(self, topic):
for t in topics:
topic_size = len(t).to_bytes(2, 'big')
packet += topic_size + t
if self._logger is not None:
if self.logger is not None:
for t in topics:
self._logger.debug('UNSUBSCRIBING from topic {0}.'.format(t))
self.logger.debug('UNSUBSCRIBING from topic {0}.'.format(t))
self._sock.write(packet)
if self._logger is not None:
self._logger.debug('Waiting for UNSUBACK...')
if self.logger is not None:
self.logger.debug('Waiting for UNSUBACK...')
while True:
op = self._wait_for_msg()
if op == 176:
Expand All @@ -536,7 +535,7 @@ def unsubscribe(self, topic):
assert return_code[1] == packet_id_bytes[0] and return_code[2] == packet_id_bytes[1]
for t in topics:
if self.on_unsubscribe is not None:
self.on_unsubscribe(self, self._user_data, t, self._pid)
self.on_unsubscribe(self, self.user_data, t, self._pid)
self._subscribed_topics.remove(t)
return

Expand All @@ -558,12 +557,12 @@ def reconnect_socket(self):
"""Re-establishes the socket's connection with the MQTT broker.
"""
try:
if self._logger is not None:
self._logger.debug("Attempting to reconnect with MQTT Broker...")
if self.logger is not None:
self.logger.debug("Attempting to reconnect with MQTT Broker...")
self.reconnect()
except RuntimeError as err:
if self._logger is not None:
self._logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
if self.logger is not None:
self.logger.debug('Failed to reconnect with MQTT Broker, retrying...', err)
time.sleep(1)
self.reconnect_socket()

Expand All @@ -572,12 +571,12 @@ def reconnect_wifi(self):
"""
while not self.is_wifi_connected:
try:
if self._logger is not None:
self._logger.debug('Connecting to WiFi AP...')
if self.logger is not None:
self.logger.debug('Connecting to WiFi AP...')
self._wifi.connect()
except (RuntimeError, ValueError):
if self._logger is not None:
self._logger.debug('Failed to reset WiFi module, retrying...')
if self.logger is not None:
self.logger.debug('Failed to reset WiFi module, retrying...')
time.sleep(1)
# we just reconnected, is the socket still connected?
if not self.is_sock_connected:
Expand All @@ -587,14 +586,14 @@ def reconnect(self, resub_topics=True):
"""Attempts to reconnect to the MQTT broker.
:param bool resub_topics: Resubscribe to previously subscribed topics.
"""
if self._logger is not None:
self._logger.debug('Attempting to reconnect with MQTT broker')
if self.logger is not None:
self.logger.debug('Attempting to reconnect with MQTT broker')
self.connect()
if self._logger is not None:
self._logger.debug('Reconnected with broker')
if self.logger is not None:
self.logger.debug('Reconnected with broker')
if resub_topics:
if self._logger is not None:
self._logger.debug('Attempting to resubscribe to previously subscribed topics.')
if self.logger is not None:
self.logger.debug('Attempting to resubscribe to previously subscribed topics.')
while self._subscribed_topics:
feed = self._subscribed_topics.pop()
self.subscribe(feed)
Expand Down Expand Up @@ -628,10 +627,10 @@ def loop(self):
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
if current_time - self._timestamp >= self._keep_alive:
if current_time - self._timestamp >= self.keep_alive:
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
if self._logger is not None:
self._logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
if self.logger is not None:
self.logger.debug('KeepAlive period elapsed - requesting a PINGRESP from the server...')
self.ping()
self._timestamp = 0
self._sock.settimeout(0.1)
Expand Down Expand Up @@ -753,22 +752,22 @@ def attach_logger(self, logger_name='log'):
"""Initializes and attaches a logger to the MQTTClient.
:param str logger_name: Name of the logger instance
"""
self._logger = logging.getLogger(logger_name)
self._logger.setLevel(logging.INFO)
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)

def set_logger_level(self, log_level):
"""Sets the level of the logger, if defined during init.
:param string log_level: Level of logging to output to the REPL.
"""
if self._logger is None:
if self.logger is None:
raise MMQTTException('No logger attached - did you create it during initialization?')
if log_level == 'DEBUG':
self._logger.setLevel(logging.DEBUG)
self.logger.setLevel(logging.DEBUG)
elif log_level == 'INFO':
self._logger.setLevel(logging.INFO)
self.logger.setLevel(logging.INFO)
elif log_level == 'WARNING':
self._logger.setLevel(logging.WARNING)
self.logger.setLevel(logging.WARNING)
elif log_level == 'ERROR':
self._logger.setLevel(logging.CRITICIAL)
self.logger.setLevel(logging.CRITICIAL)
else:
raise MMQTTException('Incorrect logging level provided!')

0 comments on commit 35ba546

Please sign in to comment.