|
| 1 | +import os |
| 2 | +from pickle import FALSE |
| 3 | +import sys |
| 4 | +import time |
| 5 | +import serial |
| 6 | +from queue import Queue |
| 7 | +try: |
| 8 | + from aceinna.models import WebserverArgs |
| 9 | + from aceinna.core.driver import (Driver, DriverEvents) |
| 10 | + from aceinna.framework import AppLogger |
| 11 | + from aceinna.framework.utils import resource |
| 12 | + from aceinna.framework.context import APP_CONTEXT |
| 13 | + from aceinna.framework.utils import helper |
| 14 | + |
| 15 | +except: # pylint: disable=bare-except |
| 16 | + print('load package from local') |
| 17 | + sys.path.append('./src') |
| 18 | + from aceinna.models import WebserverArgs |
| 19 | + from aceinna.core.driver import (Driver, DriverEvents) |
| 20 | + from aceinna.framework import AppLogger |
| 21 | + from aceinna.framework.utils import resource |
| 22 | + from aceinna.framework.context import APP_CONTEXT |
| 23 | + from aceinna.framework.utils import helper |
| 24 | + |
| 25 | +LOOP_TIMES = 100 |
| 26 | + |
| 27 | +PORT = "/dev/cu.usbserial-143300" |
| 28 | + |
| 29 | +BAUDRATE = 115200 |
| 30 | + |
| 31 | + |
| 32 | +def _match(result, check_data): |
| 33 | + check_data_len = len(check_data) |
| 34 | + result_len = len(result) |
| 35 | + |
| 36 | + if result_len < check_data_len: |
| 37 | + return False |
| 38 | + |
| 39 | + for m in range(result_len): |
| 40 | + is_diff = False |
| 41 | + |
| 42 | + if m + check_data_len > result_len: |
| 43 | + return False |
| 44 | + |
| 45 | + for n in range(check_data_len): |
| 46 | + if result[m+n] != check_data[n]: |
| 47 | + is_diff = True |
| 48 | + break |
| 49 | + |
| 50 | + if not is_diff: |
| 51 | + return True |
| 52 | + |
| 53 | + return False |
| 54 | + |
| 55 | + |
| 56 | +def read_until(serial_port: serial.Serial, check_data, read_times=1000, read_len=None): |
| 57 | + is_match = False |
| 58 | + |
| 59 | + while read_times > 0: |
| 60 | + if read_len: |
| 61 | + result = serial_port.read(read_len) |
| 62 | + else: |
| 63 | + result = serial_port.read_all() |
| 64 | + if len(result) > 0: |
| 65 | + is_match = _match(result, check_data) |
| 66 | + break |
| 67 | + |
| 68 | + time.sleep(0.01) |
| 69 | + read_times -= 1 |
| 70 | + |
| 71 | + return is_match |
| 72 | + |
| 73 | + |
| 74 | +def send_JS(serial_port: serial.Serial): |
| 75 | + cmd = helper.build_bootloader_input_packet('JS') |
| 76 | + serial_port.write(cmd) |
| 77 | + time.sleep(0.5) |
| 78 | + |
| 79 | + response = helper.read_untils_have_data( |
| 80 | + serial_port, 'JS', read_length=1000, read_timeout=5) |
| 81 | + |
| 82 | + return True |
| 83 | + |
| 84 | + |
| 85 | +def send_sync(serial_port): |
| 86 | + sync = [0xfd, 0xc6, 0x49, 0x28] |
| 87 | + expect_response = [0x3A, 0x54, 0x2C, 0xA6] |
| 88 | + retry_times = 10 |
| 89 | + is_matched = False |
| 90 | + |
| 91 | + for i in range(retry_times): |
| 92 | + serial_port.write(sync) |
| 93 | + serial_port.write(sync) |
| 94 | + serial_port.write(sync) |
| 95 | + time.sleep(0.5) |
| 96 | + |
| 97 | + is_matched = read_until(serial_port, expect_response, 100) |
| 98 | + if is_matched: |
| 99 | + break |
| 100 | + |
| 101 | + return is_matched |
| 102 | + |
| 103 | + |
| 104 | +def send_JG(serial_port): |
| 105 | + cmd = helper.build_bootloader_input_packet('JG') |
| 106 | + serial_port.write(cmd) |
| 107 | + time.sleep(0.5) |
| 108 | + response = helper.read_untils_have_data(serial_port, 'JG') |
| 109 | + return True if response is not None else False |
| 110 | + |
| 111 | + |
| 112 | +class TestApp: |
| 113 | + def __init__(self): |
| 114 | + pass |
| 115 | + |
| 116 | + def send_commands(self, serial_port): |
| 117 | + # send JS |
| 118 | + if not send_JS(serial_port): |
| 119 | + print('send JS failed') |
| 120 | + return False |
| 121 | + # send sync |
| 122 | + if not send_sync(serial_port): |
| 123 | + print('send sync failed') |
| 124 | + return False |
| 125 | + # send JG |
| 126 | + if not send_JG(serial_port): |
| 127 | + print('send JG failed') |
| 128 | + return False |
| 129 | + |
| 130 | + return True |
| 131 | + |
| 132 | + def start(self): |
| 133 | + # set a loop time |
| 134 | + loop = 0 |
| 135 | + while loop < LOOP_TIMES: |
| 136 | + print('Loop: {0}/{1}'.format(loop+1, LOOP_TIMES)) |
| 137 | + # connect to device |
| 138 | + serial_port = serial.Serial(port=PORT, baudrate=BAUDRATE, timeout=0.1) |
| 139 | + #serial_port.open() |
| 140 | + |
| 141 | + if not serial_port.is_open: |
| 142 | + print('Cannot open serial port') |
| 143 | + return |
| 144 | + |
| 145 | + try: |
| 146 | + result = self.send_commands(serial_port) |
| 147 | + if not result: |
| 148 | + raise Exception('commands failed') |
| 149 | + except Exception as ex: |
| 150 | + print(ex) |
| 151 | + break |
| 152 | + |
| 153 | + serial_port.close() |
| 154 | + time.sleep(20) |
| 155 | + |
| 156 | + loop += 1 |
| 157 | + |
| 158 | +if __name__ == '__main__': |
| 159 | + TestApp().start() |
0 commit comments