-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoobDemo.py
2489 lines (2189 loc) · 123 KB
/
oobDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
# © 2024 Microchip Technology Inc. and its subsidiaries
# Subject to your compliance with these terms, you may use this Microchip software
# and any derivatives exclusively with Microchip products. You are responsible for
# complying with third party license terms applicable to your use of third party
# software (including open source software) that may accompany this Microchip
# software.
# Redistribution of this Microchip software in source or binary form is allowed and
# must include the above terms of use and the following disclaimer with the
# distribution and accompanying materials.
# SOFTWARE IS “AS IS.” NO WARRANTIES, WHETHER EXPRESS, IMPLIED OR STATUTORY, APPLY
# TO THIS SOFTWARE, INCLUDING ANY IMPLIED WARRANTIES OF NON-INFRINGEMENT,
# MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. IN NO EVENT WILL MICROCHIP BE
# LIABLE FOR ANY INDIRECT, SPECIAL, PUNITIVE, INCIDENTAL OR CONSEQUENTIAL LOSS,
# DAMAGE, COST OR EXPENSE OF ANY KIND WHATSOEVER RELATED TO THE SOFTWARE, HOWEVER
# CAUSED, EVEN IF MICROCHIP HAS BEEN ADVISED OF THE POSSIBILITY OR THE DAMAGES ARE
# FORESEEABLE. TO THE FULLEST EXTENT ALLOWED BY LAW, MICROCHIP’S TOTAL LIABILITY ON
# ALL CLAIMS RELATED TO THE SOFTWARE WILL NOT EXCEED AMOUNT OF FEES, IF ANY, YOU
# PAID DIRECTLY TO MICROCHIP FOR THIS SOFTWARE.
import pathlib
try:
import serial
import serial.tools.list_ports
import time
from datetime import datetime
import kbhit
import os
from cloud_config import iot_parameters
from print_utils import *
import re
from time import sleep
import argparse
import pathlib
import random
from argparse import ArgumentParser
from pathvalidate.argparse import validate_filename_arg, validate_filepath_arg
# import atexit
# import json
except ModuleNotFoundError:
print(f'\n\n----------------------------------------------')
print(f' Error! Python module not found.')
print(f' Please run "pip install -r requirements.txt"')
print(f' from the command line. Then try again.')
print(f'----------------------------------------------\n\n')
user_in = input(f'Install required Python modules now? [Y|n] ')
if user_in.upper() == 'Y' or user_in == '':
import os
os.system("pip install -r requirements.txt")
print(f'\n Rerun the demo now...\n\n')
else:
print(f'\n Please manually run "pip install -r requirements.txt" from the command line')
exit(1)
APP_REL_VERSION = "2.5.0" # Application/Demo Version
APP_BUILD = "" # Application/Demo Build version (Git Hash)
#APP Control Constants
EN_LOCAL_TELEMETRY = True # Enables local updates instead of using subscription telemetry
EN_CERT_SUPPORT = True # Set to True to enable cert listing & deletion
# Setting False -> True requires uncommenting the 2
# fields 'device_cert_filename' & 'device_key_filename'
# in the script 'cloud_config.py'
WIFI_TIMEOUT_S = 20 # AT+WSTA=1 command, Wi-Fi connection timeout in seconds
WIFI_TIMEOUT_TLS_S = 45 # AT+WSTA=1 command, TLS broker Wi-Fi connection timeout in seconds
MQTT_TIMEOUT_S = 60 # MQTT server login timeout in seconds
# LOCAL_ECHO - Set to True to display each command char
LOCAL_ECHO = True
BLOCK_PERIODIC_TIME_RESP = True
BANNER_BORDER_LEV_1 = '■' # "─ ━ ═ ■ "
BANNER_BORDER_LEV_2 = '━' # "─ ━ ═ ■ "
BANNER_BORDER_LEV_3 = '─' # "─ ━ ═ ■ "
# Single character keyboard codes for CLI commands
# Always use the UPPERCASE version as the code changes everything to upper case
EN_RAW_CODE_DISPLAY: bool = False # Display new RAW codes by setting bool to True
COUNT_KEY = 67 # 'C' Increment count, one time
BUTTON_KEY = 66 # 'B' Toggle button state 1->0->1, etc 1 time
TEMP_KEY = 84 # 'T' Update temperature by a random temperature delta 1 time
REPORT_RATE_KEY = 73 # 'I' Update telemetry: button, count & temp 10 times (every 2s)
# Each 'I' press changes rate: 2s -> 5s -> 10s ->0s (stopped)
REPORT_RATE_INF_KEY = 9 # 'CTRL+I' All of these keys act just the 'I' key except the updates
# 'CTRL+i' continue forever (not 10 times). Like the 'I' key, each
# 'Tab' key press cycles the rate: 2s -> 5s -> 10s ->0s (stopped)
RESUME_KEY = 82 # 'R' Reconnect Wi-Fi, broker (from the Demo) or RESET & Run (from the CLI)
DISCONN_KEY = 88 # 'X' Disconnect MQTT broker / Wi-Fi key
HELP_KEY = 72 # 'H' Display Help screen for the App or CLI
# Application Configuration File
APP_CONFIG_FILE = "app.cfg" # Default file name for application configuration settings
APP_CMD_LOG_PATH = "logs" # Hardcoded relative log file path
def val_args(args, parser) -> tuple:
""" Validates command line argument(s)
"""
if args.cfg == None:
args.cfg = globals()["APP_CONFIG_FILE"]
else:
try:
file = pathlib.Path(args.cfg)
path = str(file.parent)
if not os.path.exists(path):
os.makedirs(path)
validate_filepath_arg(args.cfg) #, platform="auto")
except OSError as e:
banner(f'ERROR: {e}')
exit()
APP_CONFIG_FILE = args.cfg
return args
def get_args() -> tuple:
""" Retrieves command line arguements and returns the result as a tuple.
"""
# os.system('cls') # Clear terminal screen
description='\n\nPython OOB Demo for "test.mosquitto.org'
script_usage='\n python oobdemo.py [-c <cfg_file>]\n'
parser = argparse.ArgumentParser(description=description, usage=script_usage)
parser.add_argument("-c", "--cfg", metavar='<Path\Filename>', required = False, help="Set an alternate configuration file path/name")
args = parser.parse_args()
args = val_args(args, parser)
return args
# Object read/write configuration json file
ARGS = get_args() # Put command line args into global space
iotp = iot_parameters(ARGS.cfg, False) # Puts config files values into global space var 'iotp'
cert_support = globals()["EN_CERT_SUPPORT"]
print(f'CERT SUPPORT: {cert_support}')
try:
APP_CMD_LOG_FILE = iotp.params["log_filename"]
# Supported part numbers are in a dictionary of tuples.
# The device is the key and the tuple contains all the possible
# device names returned by the AT+GMM command. The identified
# device determines how the script will run.
SUPPORTED_RNS_DICT = {"RNWF02": ("PIC32MZW2", "RNWF02"),
"RNWF11": ("PIC32MZW1", "RNWF11")
}
APP_DISPLAY_LEVEL = int(iotp.params["display_level"])
AT_COMMAND_TIMEOUT = int(iotp.params["at_command_timeout"]) # AT cmds timeout in seconds
except KeyError as e:
banner(f' Error: Configuration parameter {e} missing \n\n'
f'Verify the parameter {e} in "{APP_CONFIG_FILE}"\n'
f' Manually add/edit the parameter OR\n'
f' Delete "{APP_CONFIG_FILE}" to recreate it on the next run', BANNER_BORDER_LEV_3)
exit(1)
TLS_CFG_INDEX = 1 # All AT+TLSC commands can be programmed into 1 of 2 banks, 1 or 2.
# Then AT+MQTT=7,x will set which bank is used for the TLSC commands.
# Use 1 or 2, not 0(AT Spec RNWF11 doc is incorrect)
TLS_CERT_BUILDS = "./Tools/CertificateTool/CertBuilds"
#
# Display states can be set in the configuration file; eg "display_level": "3",
APP_DISPLAY_OFF = 0 # Extra displays off...cleanest output
APP_DISPLAY_STATES = 1 # Display State Banners & lower
APP_DISPLAY_INFO = 2 # Display info and events & lower
APP_DISPLAY_DEMO = 3 # Display 'Demo' IOTC data and lower
APP_DISPLAY_DECODES = 4 # Display Decodes like JSON, CRx & lower
DEMO_LOOP_COUNT = 10 # Number of times to send Telemetry data
#
# Application States
APP_STATE_CLI = 0 # CLI state occurs on fatal error, user 'ESC' OR after the DEMO
APP_STATE_INIT = 1 # Bare minimum commands required to start; RESET, NETIFC, etc
APP_STATE_WIFI_CONNECT = 2 # Connect to Wi-Fi
APP_STATE_MQTT_SETTINGS = 3 # Setup MQTT
APP_STATE_MQTT_CONNECT_BROKER = 4 # Connects to broker
APP_STATE_DEMO = 5 # Demo state application
APP_STATE_NEXT_STATE = 999 # Very high state value will cause jump to next state
APP_STATE_START = APP_STATE_WIFI_CONNECT # Sets the beginning STATE (after APP_STATE_INIT)
APP_SUB_STATE_DEMO_LOOP = 1
# Special states or state value indicating the beginning or completion of a state
APP_STATE_BEGIN = 0
APP_STATE_COMPLETE = 65535
if EN_CERT_SUPPORT:
help_str_cli = f' {APP_STATE_CLI} - APP_STATE_CLI Help\n' \
' H - This help screen\n' \
' X - Disconnect broker or Wi-Fi\n' \
' R - Resume Demo (Full Reset & Run)\n' \
' AT+ - AT command with or w/o the \'AT\' eg: \'AT+GMM\' or \'+GMM\'\n' \
' DIR - List certs & keys. eg: dir [c | k] \n' \
' DEL - Delete certs & keys. eg: del [c | k] <FILENAME>\n' \
' SCAN - Scan & displays Wi-Fi information\n' \
' SYS - Displays network, system, and module info\n' \
' ESC - [ESC] key quit application\n'
help_str_demo = f' {APP_STATE_DEMO} - APP_STATE_IOTC_DEMO\n' \
' H - This help screen\n' \
' B - Toggles Button state \'Button\'(0,1)\n' \
' C - Increment \'Count\'(0->N)\n' \
' T - Update \'Temp\' value (random -5.0 to +5.0 degrees)\n' \
' I - Increment \'Report Rate\' (0s, 2s, 5s, 10s) then...\n' \
' updates all telemetry at the \'reportRate\' 10 times\n' \
' TAB - Same as \'I\' except updates all telemetry FOREVER \n' \
' R - Resume Demo (Wi-Fi, Broker Reconnect)\n' \
' AT+ - AT command with or w/o the \'AT\' eg: \'AT+GMM\' or \'+GMM\'\n' \
' DIR - List certs & keys. eg: dir [c | k] \n' \
' DEL - Delete certs & keys. eg: del [c | k] <FILENAME>\n' \
' SCAN - Scan & displays Wi-Fi information\n' \
' SYS - SYS - Displays network, system, and module info\n' \
' ESC - [ESC] key exit to CLI. [ESC][ESC] exits application\n'
else:
help_str_cli = f' {APP_STATE_CLI} - APP_STATE_CLI Help\n' \
' H - This help screen\n' \
' X - Disconnect broker or Wi-Fi\n' \
' R - Resume Demo (Full Reset & Run)\n' \
' AT+ - AT command with or w/o the \'AT\' eg: \'AT+GMM\' or \'+GMM\'\n' \
' SCAN - Scan & displays Wi-Fi information\n' \
' SYS - Displays network, system, and module info\n' \
' ESC - [ESC] key. Exits application\n'
help_str_demo = f' {APP_STATE_DEMO} - APP_STATE_IOTC_DEMO\n' \
' H - This help screen\n' \
' B - Toggles Button state \'Button\'(0,1)\n' \
' C - Increment \'Count\'(0->N)\n' \
' T - Update \'Temp\' value (random -5.0 to +5.0 degrees)\n' \
' I - Increment \'Report Rate\' (0s, 2s, 5s, 10s) then...\n' \
' updates all telemetry at the \'reportRate\' 10 times\n' \
' TAB - Same as \'I\' except updates all telemetry FOREVER \n' \
' R - Resume Demo (Wi-Fi, Broker Reconnect)\n' \
' AT+ - AT command with or w/o the \'AT\' eg: \'AT+GMM\' or \'+GMM\'\n' \
' SCAN - Scan & displays Wi-Fi information\n' \
' SYS - SYS - Displays network, system, and module info\n' \
' ESC - [ESC] key exits to CLI. [ESC][ESC] exits application\n'
MQTT_MINIMUM_READ_THRESHOLD_SZ = 300
MQTT_FIELDS = ["button", "temp", "count"]
MQTT_IQOS = 0 # IQOS: 0: Message deleted, does not survive failures, no duplicates
# IQOS: 1: Message stored, survives connection loss, duplicates possible
# IQOS: 2: Message stored, survives connection loss, no duplicates
MQTT_IRETAIN = 0 # Keep this as 0 to not retain message on server
WIFI_MAX_SSID_LEN = 32 # RNWF02 v1/2 & RNWF11 max SSID length
WIFI_MAX_PW_LEN = 128 # RNWF02 v1/2 & RNWF11 max passphrase length
WIFI_SECURITY_LIST = [0, *range(2, 9, 1)] # RNWF02 v1/2 & RNWF11 security options 0, 2-9 (1 not supported)
WIFI_SHOW_BLANK_SSID = False # Set to False for release. Blocks reported blank SSID's. Set to True for debug.
# # -----------------------------------------------------------------------------
# APP OS Return/Error codes
APP_RET_OK = 0
APP_RET_COM_NOT_FOUND = 1
APP_RET_COM_BUSY = 2
def detect_port(com_ports: list, supported_pn: dict) -> tuple:
""" Detect the connected COM port by sending +GMM command to each
USB UART port and testing against the supported dict devices.
"""
# Loop through supported part numbers to find the connected device
# supported_pn: type 'SUPPORTED_RNS_DICT' = {"RNWF02": ("PIC32MZW2", "RNWF02"), next device...
for device, val in supported_pn.items():
for signature in val:
#print(f'Signature:{sig} is a {device} device')
for port in com_ports:
rx_data = []
try:
s = serial.Serial(port=port, baudrate=230400, bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
timeout=4.0, write_timeout=4.0, inter_byte_timeout=0.5)
try:
sleep(0.2)
s.write(f'AT+GMM\r\n'.encode('UTF-8'))
sleep(0.2)
while s.in_waiting:
if s.in_waiting == 1:
break
c = s.readline()
c = c.decode('UTF-8').strip('\r\n').replace('"', '').replace('+GMM:', '')
rx_data.append(c)
except UnicodeDecodeError:
pass
try:
if len(rx_data) != 0:
if rx_data[1] == signature and rx_data[2] == 'OK':
s.close()
return device, port
except IndexError:
pass
s.close()
except serial.SerialException:
pass
return "", ""
def find_com_port() -> tuple:
""" Attempts to find a COM port. If found returns a Windows
compatible "COMx" string in a tuple. If not found the
returned string is empty.
"""
ports = serial.tools.list_ports.comports()
usb_com_ports = []
if APP_DISPLAY_LEVEL >= APP_DISPLAY_DECODES:
print(' USB COM Ports Detected\n ----------------------')
for port, desc, hwid in sorted(ports):
if hwid.find("USB ") != -1 and APP_DISPLAY_LEVEL >= APP_DISPLAY_DECODES:
print(" {}: [{}]".format(port, hwid))
# Check for the USB com ports, i.e. not Bluetooth, etc
if hwid.find("USB") != -1:
# Save USB ports in a list so we can find
usb_com_ports.append(port)
for i in range(3):
os.system('cls') # Clear terminal screen
print(f'\n\nDetecting COM ports...({i+1} of 3)')
port, device = detect_port(usb_com_ports, SUPPORTED_RNS_DICT)
if port:
os.system('cls') # Clear terminal screen
break
return port, device
class Polling_KB_CMD_Input:
""" Class is used to poll the keyboard for character and word commands from the user.
"""
def __init__(self) -> None:
self.kb = kbhit.KBHit()
self.input_buf = ""
self.cmd = "" # Word commands
self.key_cmd = "" # Single key commands
# Single key commands from the CLI
self.EXIT_KEY = 27 # ESC
self.key_commands = [COUNT_KEY, BUTTON_KEY, TEMP_KEY, REPORT_RATE_KEY, REPORT_RATE_INF_KEY, RESUME_KEY, DISCONN_KEY, HELP_KEY]
def poll_keyboard(self, enable_key_cmds: bool) -> bool:
""" Routine polls the keyboard for both single key commands and word commands. The parameter 'enable_key_cmds'
is set to 'True' to support both. When passed a 'False', key commands, except for 'ESC', are ignored for
full CLI word commands or text input.
"""
if self.kb.kbhit():
c = self.kb.getch()
# RAW_CODE
# Need the raw code to detect CTRL or SHIFT modifiers
if EN_RAW_CODE_DISPLAY:
print(f'Raw KeyCmd: {str(ord(c))}\n') # Set to string because CTRL+I displays as a Tab!
if c == REPORT_RATE_INF_KEY:
c_upper = REPORT_RATE_INF_KEY
else:
c_upper = c.upper()
if len(c) == 0:
return True
if ord(c) == self.EXIT_KEY: # Single key 'ESC' is always supported
return False
# Limit key_cmd registration to 1st buffer char only
if enable_key_cmds == True: # Single key commands SUPPORTED
if len(self.input_buf) == 0:
for self.key_cmd in self.key_commands:
if self.key_cmd == c_upper or self.key_cmd == ord(c_upper):
break
else:
self.key_cmd = ''
if self.key_cmd:
# print(f'KeyCmd: {self.key_cmd} ')
self.cmd = ''
self.input_buf = ''
return True # Return immediately for processing
if LOCAL_ECHO:
# Handle backspace for end user
if c == '\b':
print(f'\b \b', end='', flush=True)
if len(self.input_buf) > 0:
self.input_buf = self.input_buf.rstrip(self.input_buf[-1])
else:
print(c, end='', flush=True)
self.input_buf += c
if ord(c) == 13: # Carriage Return (CR)
self.cmd = self.input_buf
self.key_cmd = ""
self.input_buf = ""
return True
def cmd_get(self) -> str:
return self.cmd
def cmd_received(self) -> bool:
if self.cmd != "":
return True
else:
return False
def cmd_clear(self) -> None:
self.cmd = ""
def __del__(self) -> None:
self.kb.set_normal_term()
class Delay_Non_Blocking:
def __init__(self) -> None:
self.isStarted = False
self.time_start = 0
def start(self) -> bool:
""" Called to start a non-blocking delay. Time is only reset or started if the timer is currently NOT runnning
:return: isStarted
"""
if self.isStarted == False:
self.time_start = time.time()
self.isStarted = True
return self.isStarted
def stop(self) -> None:
""" Stops the non-blocking delay clock.
"""
self.isStarted = False
self.time_start = 0
def delay_sec_poll(self, delay_sec: int) -> bool:
""" Non-blocking delay class returns True if the passed time (Sec) is exceeded. On a
True return the timer is cancelled.
:param delay_sec: Seconds of delay
:return: True - Time exceeded.
False - Time was NOT exceeded OR timer was NOT started
"""
if time.time() - self.time_start > delay_sec:
self.stop() # self.isStarted = False
return True
else:
return False
class IotCloud:
def __init__(self, port: str, baud: int, model: str) -> None:
""" Primary class to handle the app including Wi-Fi, MQTT,
user interface, etc
"""
self.__version__ = APP_REL_VERSION
# initialize class variables
self.ser_buf = "" # serial buffer for processing messages
# main application state
self.app_state = APP_STATE_INIT # INIT must be first to initialize key variables
self.app_state_prev = -1
self.app_sub_state = 0
self.app_sub_state_check = 0
self.app_check = 0
self.app_wait = False
self.next_sub_state_offset = 1
# firmware Syntax for parse RNFW02v1: '+GMR:"1.0.0 0 630f6fcf [13:57:15 Jun 27 2023]"'
# firmware Syntax for parse RNFW02v2: '+GMR:"2.0.0 0 e41f977cb [16:31:26 Apr 12 2024]"'
# firmware Syntax for parse RNFW11v1: '+GMR:"78de24c4 [09:48:06 Nov 2 2023]"'
self.fw_version = "Not Reported"
self.fw_sec_version = "Not Reported"
self.fw_hash = "Not Reported"
self.fw_datestamp = "Not Reported"
self.dev_model = model
self.dev_com_port = port
self.log_file_handle = "" # Log file handle if created
self.pub_topic = ""
self.pub_payload = ""
# wifi connection related variables
self.wifi_connected = False # Set to True when WiFi connected
self.wifi_ap_sta = '???' # AT+ASSOC; 0=AP, 1=Station
self.wifi_assoc_id = '?' # ASSOC ID
self.wifi_bssid = '00:00:00:00:00:00' # AT+ASSOC; MAC of AP connected to
self.wifi_rssi = '???' # AT+ASSOC; RSSI of AP in dbm
self.wifi_reg_domain = "" # Wi-Fi regulatory domain; eg: 'GEN', 'USA' or 'EMEA'
self.wifi_reg_domain_available = "" # Wi-Fi regulatory domains available in flash 'GEN','USA','EMEA'
# DPS connection variables
self.broker_connected = False # set to True when connected to DPS broker
# IOTC connection variables
self.iotc_topic_index = 1 # tracks how many topics have been subscribed to for
# iotc event call back to adjust the state variable.
# IOTC Demo variables
self.iotc_button = True # Telemetry: "button" toggle reported to cloud initial value
self.iotc_count = 0 # Telemetry: "count" reported to cloud initial value
self.iotc_temp = 78.0 # Telemetry: "temp" reported to cloud initial value
self.telemetry_interval = 0 # Property: Telemetry interval
self.ip_addr_ipv4 = 'n/a' # Property: IP Address reported to cloud
self.ip_addr_ipv6 = 'n/a' # IP Address returned from router
self.mac = '' # MAC address of the module "AT+NETIFC=0, 2" (with colons)
self.telemetry_ints = [0, 1, 2, 5, 10] # Demo state supported telemetry intervals in seconds
self.telemetry_index = 0 # Demo state index to current telemetry interval
self.demo_loops = 0 # Max number of telemetry updates in Demo state
self.last_utc_update = 0 # Update this each time the time signal come in
self.resp_dict = {"button":" ",
"count": " ",
"temp": " "
}
self.reboot_timer = Delay_Non_Blocking()
self.at_quiet_command = False # Disable CLI command output for 1 cmd before reset
self.at_command = "" # The AT command currently being executed
self.at_command_prev = "" # Previously executed AT command
self.at_command_resp = "" # Alt 'response' to use if the command itself isn't the desired response
self.at_command_timer = Delay_Non_Blocking()
self.at_command_timer.stop()
self.at_command_timeout = AT_COMMAND_TIMEOUT # AT command timeout. Commands must complete within this many seconds
self.mqtt_client_id = "" # Client ID = MODEL + last 4 MAC Address bytes. eg: "RNWF02_0D-AA-EF"
# Size of the MQTT Read Threshold buffer if needed "AT+MQTTC=9, 700"
self.mqtt_read_sz = MQTT_MINIMUM_READ_THRESHOLD_SZ
# Read these params from the configuration.cfg file
self.ca_cert_name = iotp.params["ca_cert_name"] # Certificate Authority certificate name
self.mqtt_root_topic = iotp.params["mqtt_root_topic"] # Decoded MQTT root topic taking into account %M, %N variables [Automatic]
self.mqtt_sub_topic = iotp.params["mqtt_sub_topic"] # Decoded MQTT sub-topic taking into account %M, %N variables [OPTIONAL]
self.mqtt_subscription = iotp.params["mqtt_subscription"] # Subscription string read from config. Default is '#' for all sub fields
self.mqtt_field_list = MQTT_FIELDS # Hard coded MQTT fields to use from the constant MQTT_FIELDS above
self.evt_handler = None
self.SER_TIMEOUT = 0.1 # sets how long pySerial will delay waiting for a character
# reading a character a time, no need to wait for long messages
try:
self.ser = serial.Serial(self.dev_com_port, baud, timeout=self.SER_TIMEOUT)
except:
print(f' Serial port open FAILED. Is {self.dev_com_port} in use?')
exit(APP_RET_COM_BUSY)
self.delay = Delay_Non_Blocking()
self.kb = Polling_KB_CMD_Input()
self.open_log() # Start the log file
def get_topic(self, data_field: str = '') -> str:
""" Constructs a MQTT topic path string and returns the result. Supports passing
in the requested data field, or not and supports a blank sub-topic too.
"""
if self.mqtt_sub_topic == '': # If the sub topic path is blank...
if data_field == '':
return f'{self.mqtt_root_topic}'
else:
return f'{self.mqtt_root_topic}/{data_field}'
else: # Else include the sub-topic path too...
if data_field == '':
return f'{self.mqtt_root_topic}/{self.mqtt_root_topic}'
else:
return f'{self.mqtt_root_topic}/{self.mqtt_root_topic}/{data_field}'
def chk_ss(self, set_check = 1) -> bool:
""" Resets and/or returns the internal 'check' variable used by the state machine
as the sub-state value. This replaced all constant value checks in if-else's.
"""
if set_check == APP_STATE_BEGIN:
self.app_sub_state_check = 0
elif set_check == APP_STATE_COMPLETE:
self.app_state += 1
self.app_sub_state = self.app_sub_state_check = APP_STATE_BEGIN
else:
self.app_sub_state_check += set_check
return bool(self.app_sub_state_check == self.app_sub_state)
def get_topic_name(self, topic: str) -> str:
""" Decodes topic string from the config file, encodes the requested
fields and returns the string.
"""
topic = topic.replace("%m", "%M").replace("%M", self.dev_model)
topic = topic.replace("%n", "%N").replace("%N", self.mac.replace(":", "-"))
return topic
def get_topic_path(self) -> str:
""" Returns the correct topic path based on the class variables
'mqtt_root_topic' and 'mqtt_sub_topic'.
"""
if len(self.mqtt_sub_topic):
topic = f'{self.mqtt_root_topic}/{self.mqtt_sub_topic}'
else:
topic = f'{self.mqtt_root_topic}'
return self.get_topic_name(topic)
def set_log_file_name(self) -> tuple:
""" Decodes and sets the global APP_CMD_LOG_FILE
and returns the constructed log 'filename' and the
datetime object for its creation.
"""
file_name = globals()["APP_CMD_LOG_FILE"]
now = datetime.now()
t: str = now.strftime("%H-%M-%S") # Get the Time string
d: str = now.strftime("%b_%d_%Y") # Get the Date string
c: str = str(pathlib.Path(iotp.filename)).split('.')[0] # Get the config filename w/o extension
dir, c = os.path.split(c)
file_name = file_name.replace("%m", "%M").replace("%M", self.dev_model) # Add Module name to log filename
file_name = file_name.replace("%d", "%D").replace("%D", d) # Add Date to log filename
file_name = file_name.replace("%t", "%T").replace("%T", t) # Add Time to log filename
file_name = file_name.replace("%c", "%C").replace("%C", c) # Add Config filename to log filename
globals()["APP_CMD_LOG_FILE"] = file_name
return file_name, now
def open_log(self) -> None:
""" Sets the logfile handle, if applicable and opens the log file for writing
"""
file_name, now = self.set_log_file_name()
if file_name:
try:
logfile = f'{globals()["APP_CMD_LOG_PATH"]}/{file_name}'
directory = os.path.dirname(logfile)
if not os.path.exists(directory):
os.makedirs(directory)
self.log_file_handle = open(f'{logfile}', "w+")
except:
banner(f' ERROR: Log file could not be created or written\n'
f' Verify {APP_CONFIG_FILE} \'log\' syntax\n',
BANNER_BORDER_LEV_1)
# print("\n")
self.log_file_handle = None
pass
else:
self.log_file_handle.write(f'Out-Of-Box MQTT Demonstration Command Log v{self.__version__}\n')
self.log_file_handle.write(f'{"-" * 46}\n')
self.log_file_handle.write(f'Filename: {file_name}\n')
self.log_file_handle.write(f'Config: {ARGS.cfg}\n')
self.log_file_handle.write(f'Created: {now.strftime("%b %d, %Y")} {now.strftime("%H:%M:%S")}\n')
self.log_file_handle.write(f'Model: {self.dev_model}\n')
self.log_file_handle.write(f'COM Port: {self.dev_com_port}\n')
self.log_file_handle.write(f'{"-" * 46}\n\n')
else:
self.log_file_handle = None
def log_state(self, msg: str, border_char: str = '#', single_line: bool = False) -> None:
""" Adds a banner in the log if log is used
"""
if self.log_file_handle:
str_caps = f'{border_char * 4}'
min_msg_len = 34
msg = f' {msg:^{min_msg_len}} '
msg = f'{str_caps} {msg} {str_caps}'
if single_line is True:
self.log_file_handle.write(f'{msg}\n')
else:
self.log_file_handle.write(f'{border_char * len(msg)}\n{msg}\n{border_char * len(msg)}\n')
self.log_file_handle.flush()
def cmd_log(self, msg: str) -> None:
""" Outputs the CMD/RSP strings to the CLI and optional log file.
If the 'self.at_quiet_command' bool is True, screen output is suppressed. Log
output is never suppressed.
"""
# Remove any NULL's returned by the device such as during AT+RST
msg = ''.join(msg.split('\x00'))
# msg = msg.strip('\n')
if "CMD[" in msg or "CRx[" in msg:
if self.at_quiet_command is False:
print(f'{msg}\n', flush=True, end='')
if self.log_file_handle: # Print to LOG file if enabled
self.log_file_handle.write(f'{msg}\n')
self.log_file_handle.flush()
elif "RSP[" in msg: # Print to CLI
if self.at_quiet_command is False:
print(f'{msg}\n', flush=True, end='')
if self.log_file_handle: # Print to LOG file if enabled
self.log_file_handle.write(f'{msg}\n\n')
self.log_file_handle.flush()
else:
if self.at_quiet_command is False:
print(f'{msg}', flush=True, end='')
if self.log_file_handle: # Print to LOG file if enabled
self.log_file_handle.write(f'{msg}\n')
self.log_file_handle.flush()
def wifi_validate(self) -> bool:
""" Tests the 3 Wi-Fi parameters for correctness and returns True if ok, False otherwise.
If this test fails, wifi_ssid, wifi_passphrase & wifi_security are set to an empty strings.
wifi_ssid: < 32 bytes long, spaces allowed
wifi_passphrase: < 128 bytes long, NO spaces. Blank only if security == 0.
wifi_security: Numeric containing 0, 2-9
"""
ret_val = True
# Blank check
if iotp.params["wifi_ssid"] == "" or iotp.params["wifi_security"] == "":
ret_val = False
# Security value check
elif iotp.params["wifi_security"].isnumeric() == False or (int(iotp.params["wifi_security"]) in WIFI_SECURITY_LIST) == False:
ret_val = False
# Passphrase 'blank space' with security set to non-zero
elif (iotp.params["wifi_passphrase"] == '') and int(iotp.params["wifi_security"]) != 0:
ret_val = False
# Passphrase is set but security is non-zero
elif (len(iotp.params["wifi_passphrase"])) and (iotp.params["wifi_security"] == '0' ):
ret_val = False
# Wi-Fi SSID max length check
elif len(iotp.params["wifi_ssid"]) > WIFI_MAX_SSID_LEN:
ret_val = False
# Wi-Fi Passphrase length check
elif len(iotp.params["wifi_passphrase"]) > WIFI_MAX_PW_LEN:
ret_val = False
if ret_val:
return True
iotp.params["wifi_ssid"] = ""
iotp.params["wifi_security"] = ""
iotp.params["wifi_passphrase"] = ""
return False
def is_subscribed_mqtt(self, val_true: any = True, val_false: any =False) -> any:
""" Checks if the 'app.cfg' has a 'mqtt_subscription' has a subscription string set.
If so, it returns 'val_true' otherwise 'val_false' is returned.
"""
if self.mqtt_subscription != '':
return val_true
return val_false
def is_primary_mqtt(self, val_true: any = True, val_false: any =False) -> any:
""" Checks to see if 'this' module is the primary by comparing the 'mqtt_root_topic'
string and 'self.mqtt_client_id'. If they are the same 'val_true' is returned, otherwise val_false
"""
if iotp.params["mqtt_root_topic"] == self.mqtt_client_id:
return val_true
return val_false
def is_tls(self, val_true: any = True, val_false: any = False) -> any:
""" Checks if the connection is to be TLS encrypted. This controls sending TLS and SNTP commands.
Since SNTP is only needed for TLS it grouped with the TLS check. If this test true the 'val_true'
parameter is returned. Otherwise the 'val_false' parameter is returned.
"""
if int(iotp.params["mqtt_broker_port"]) > 7999:
return val_true
return val_false
def is_model(self, model: str, fw_ver: str = "*", val_true: any = True, val_false: any = False) -> any:
""" Verifies device MODEL & Firmware version and returns the value passed as param 3 if true,
or param 4 if false. Parameters 3 & 4 default to boolean, but can be overridden if passed in.
"""
if model == self.dev_model and (fw_ver == "*" or self.fw_version == fw_ver):
return val_true
return val_false
def is_state_demo(self, val_true: any = True, val_false: any = False) -> any:
""" Verifies state machine's state is in the demo. Parameters 2 & 3 default to boolean, but can be overridden if passed in.
"""
if self.app_state == APP_STATE_DEMO:
return val_true
return val_false
def random_delta_temp(self) -> float:
""" Returns a random temperature delta that is guaranteed to be a non-zero value.
"""
delta_temp: float = 0.0
while (delta_temp == 0.0):
delta_temp = round((random.randrange(-50, +50) / 10), 1)
return float(delta_temp)
def hex_rid(self, rid: int = -1) -> str:
""" Converts class 'self.rid' into a hex string w/o '0x' prefix for some MQTT commands.
If passed an int, will set self.rid prior to string conversion
"""
if rid != -1:
self.rid = rid
return f'{self.rid:x}' # Converts dec value passed to a hex w/o '0x'
def set_rid_from_string(self, rid_str: str) -> None:
""" Sets the class 'rid' if found in passed in string. If not found class rid remains unchanged.
The 'rid' value is hex without the prefix '0x' and must increment as such."""
#rid_str = '+MQTTSUBRX:0,0,0,"$iothub/methods/POST/echo/?$rid=1","{\\"echoString\\":\\"hello\\"}"'
if rid_str.find('rid=') != -1:
try:
rid_str = self.substr_swap(rid_str, {'"': ''})
tmp_list = rid_str.split('rid=')
tmp_list = str(tmp_list[1]).split(',')
self.rid = int(tmp_list[0], 16)
except:
# self.cmd_log(f'\nWarning: Invalid "{self.rid = }" detected in "set_rid_from_string()"\n')
pass
# keyboard processing
def kb_data_process(self, received: str) -> bool:
""" Process a passed in str and returns True if an AT+
command or False if its a FS (File System) command.
If a FS command that command is saved in the class.
"""
if received.startswith("AT"):
self.pub_topic = ""
self.sub_topic = ""
return True
return False
def set_sub_state(self, sub_state_offset: int) -> None:
""" Updates the current sub-state by the offset passed.
"""
self.app_sub_state += sub_state_offset
def set_state_direct(self, new_state: int, new_sub_state: int = 0) -> None:
""" Sets the state and optional sub-state directly (not an offset). If the sub_state
is not passed, the default 0 is used showing the state banner.
"""
if APP_STATE_CLI <= new_state <= APP_STATE_DEMO:
self.app_state_prev = self.app_state
self.app_state = new_state
self.app_sub_state = new_sub_state
def cmd_kill(self, exit_to_cli: bool = False) -> None:
""" Called to terminate an outstanding AT+ command and reset
variable to an all a new command execution.
If 'exit_to_cli' is set to 'True', the state machine is directed
to exit to the CLI. If set to 'False', the state machine is not redirected
and allowed to continue execution.
"""
if self.at_command_timer.isStarted:
self.at_command_timer.stop()
self.at_command_prev = self.at_command
self.at_command = ""
self.at_command_resp = ""
self.at_command_timeout = 0
self.app_wait = False
self.at_quiet_command = False
if exit_to_cli:
self.set_state_direct(APP_STATE_CLI, APP_STATE_BEGIN)
def cmd_check(self, terminate: bool = False) -> None:
""" Checks running AT commands against time out period. If timeout is
exceeded, the command is cancelled. To force a command to stop,
pass 'True'. Returns True if command was not terminated and
'False' if it was".
"""
# kill_cmd = terminate
if self.at_command_timer.isStarted and self.at_command == "":
# dbg_banner(f'Command Timer AutoKill - No command outstanding')
self.at_command_timer.stop()
if self.at_command_timer.isStarted and self.at_command:
run_time = time.time() - self.at_command_timer.time_start
if run_time > self.at_command_timeout:
# banner(f' Command "{self.at_command}" timed out after {run_time:.2f}s', "▫")
# print(f'\n')
self.cmd_log(f'Command "{self.at_command}" timed OUT after {run_time:.2f}s')
print(f'\n')
self.cmd_kill(self.is_state_demo(False, True)) # Only exit to CLI if NOT in demo state
err_sig = f'{self.app_state:0>2}:{self.app_sub_state:0>2}'
# Special handling of time critical command calls
if err_sig == '02:12':
self.err_handler(run_time, f'AT+SNTPC=3,"{iotp.params["ntp_server_ip_addr"]}" [ER]:NTP server did not respond @ [{err_sig}]', '01:09')
# other elif go here
else:
pass
# Make sure this is last
# self.set_state_direct(APP_STATE_CLI)
elif terminate:
banner(f' Command \'{self.at_command}\' terminated at {run_time:.2f}s', '▫')
self.cmd_kill(self.is_state_demo(False, True)) # Only exit to CLI if NOT in demo state
def cmd_issue_quiet(self,
command: str,
next_sub_state_offset: int = 0,
alt_resp: str = "",
timeout: int = AT_COMMAND_TIMEOUT) -> None:
self.at_quiet_command = True
self.cmd_issue(command, next_sub_state_offset, alt_resp, timeout)
""" This is an alternate function to submit AT+ commands but blocks CLI screen output.
If a log file is used the command will appear in the logs.
Also the defaults 'next_sub_state_offset' is set to '0' so that it can be used
during startup and not increment the programmed sub-state.
"""
def cmd_issue(self,
command: str,
next_sub_state_offset: int = 1,
alt_resp: str = "OK",
timeout: int = AT_COMMAND_TIMEOUT) -> None:
command = self.substr_swap(command, {'\r': '', '\n': ''})
""" Primary method of sending a 'AT+ command with its command and return displayed and logged."""
# Debug support for NOOP command without any processing
if command == "NOOP":
# self.cmd_log(f'CMD[{self.app_state:0>2}.{self.app_sub_state:0>2}]: NOOP - Internal "No Operation" command')
# self.cmd_log(f'RSP[{self.app_state:0>2}.{self.app_sub_state:0>2}]: NOOP - Complete')
# print("\r")
self.app_sub_state += next_sub_state_offset
return
if self.at_command == "": # If empty, it means no AT commands are 'pending'
self.at_command = command
self.at_command_timeout = timeout
if alt_resp == "":
self.at_command_resp = command # Save cmd to verify returned response
else:
self.at_command_resp = alt_resp # Save the alternate, passed in response instead
self.next_sub_state_offset = next_sub_state_offset
else:
banner(f' AT Command still processing:\n Command Pending: {self.at_command}\n' f' Command Not Sent: {command}')
return
self.cmd_log(f'CMD[{self.app_state:0>2}.{self.app_sub_state:0>2}]: {command.lstrip().strip()}')
# Send the AT command and add required '\r\n'
self.at_command_timer.stop() # Reset the command timer
self.ser.write(bytearray(self.at_command + '\r\n', 'utf-8'))
self.at_command_timer.start() # Start command timer
self.app_wait = True # Enable command wait for completion
def serial_receive(self) -> str:
""" Polled serial receive function. Reads until the prompt is found '>'.
Reads entire message.
"""
read_val = self.ser.read(1)
if read_val != b'':
self.ser_buf = self.ser_buf + read_val.decode('utf8', errors='backslashreplace')
if read_val == b'>':
ret_val = self.ser_buf
self.ser_buf = ""
return ret_val
return ""
# subscribe to MQTT topic
def mqtt_subscribe(self,
topic: str,
iQOS: int,
next_step: int = 1,
alt_resp: str = "",
timeout: int = AT_COMMAND_TIMEOUT):
""" MQTT Subscription function used to construct the subscription string before
sending it to the module for execution.
"""
cmd = "AT+MQTTSUB=" + '\"' + topic + '\",' + str(iQOS)
return self.cmd_issue(cmd, next_step, alt_resp, timeout)
# publish to MQTT topic
def mqtt_publish(self,
iQoS: int,
iRetain: int,
strTopic: str,
strPayload: str,
next_step: int = 1,
alt_resp: str = "",
timeout: int = AT_COMMAND_TIMEOUT):
""" Publishes a MQTT payload to a topic passed in. First parameter is hard coded to '0'
for Duplicate Message == New Message.
"""
try: # try block looks for CR, and removes it if present before joining CMD
loc = strPayload.index('\r')
except ValueError:
pass
else:
strPayload = strPayload[0:loc]
cmd = "AT+MQTTPUB=0," + str(iQoS) + ',' + str(iRetain) + ',\"' + strTopic + '\",\"' + strPayload + '\"'
# dbg_banner(f' MQTTPUB Command:\n {cmd}', '=')
return self.cmd_issue(cmd, next_step, alt_resp, timeout)
def evt_init_error(self) -> None:
self.app_sub_state = APP_STATE_COMPLETE
banner( f' Error: Initialization failure\n'
f'Verify AT commands:\n'
f' \'+ATE1\' - Set local echo\n'
f' \'+WSTAC\' - Wi-Fi config', BANNER_BORDER_LEV_1)
def evt_ntp_received(self, rx_data: str) -> None:
""" Decodes and stores NTP (time) string in class """
# Get the returned UCT(s) from the returned string
# i.e. "+TIME:3896263328\r\n>"
try:
self.last_utc_update = int("".join(filter(str.isdigit, rx_data)))
except:
self.last_utc_update = 1
def user_prompt_int(self, int_min: int, int_max: int, prompt_str: str) -> bool | int:
""" Very simple user input prompt for an integer within a known range
"""
user_int: int = 0