diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c4201ec --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/env/* +/__pycache__/* diff --git a/ecu-simulator.py b/ecu-simulator.py deleted file mode 100644 index 84b9c9a..0000000 --- a/ecu-simulator.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python - -from __future__ import print_function - -from random import randint - -import logging as log -import getopt, sys - -import can -from can.bus import BusState - -def service1(bus, msg): - if msg.data[2] == 0x00: - log.debug(">> Caps") - msg = can.Message(arbitration_id=0x7e8, - data=[0x06, 0x41, 0x00, 0xBF, 0xDF, 0xB9, 0x91], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x04: - log.debug(">> Calculated engine load") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x04, 0x20], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x05: - log.debug(">> Engine coolant temperature") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x05, randint(88 + 40, 95 + 40)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x0B: - log.debug(">> Intake manifold absolute pressure") - msg = can.Message(arbitration_id=0x7e8, - data=[0x04, 0x41, 0x0B, randint(10, 40)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x0C: - log.debug(">> RPM") - msg = can.Message(arbitration_id=0x7e8, - data=[0x04, 0x41, 0x0C, randint(18, 70), randint(0, 255)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x0D: - log.debug(">> Speed") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x0D, randint(40, 60)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x0F: - log.debug(">> Intake air temperature") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x0F, randint(60, 64)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x10: - log.debug(">> MAF air flow rate") - msg = can.Message(arbitration_id=0x7e8, - data=[0x04, 0x41, 0x10, 0x00, 0xFA], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x11: - log.debug(">> Throttle position") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x11, randint(20, 60)], - is_extended_id=False) - bus.send(msg) - elif msg.data[2] == 0x33: - log.debug(">> Absolute Barometric Pressure") - msg = can.Message(arbitration_id=0x7e8, - data=[0x03, 0x41, 0x33, randint(20, 60)], - is_extended_id=False) - bus.send(msg) - else: - log.warning("!!! Service 1, unknown code 0x%02x", msg.data[2]) - - -def receive_all(): - - bus = can.interface.Bus(bustype='socketcan',channel='can0') - #bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=250000) - #bus = can.interface.Bus(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000) - - #bus.state = BusState.ACTIVE - #bus.state = BusState.PASSIVE - - try: - while True: - msg = bus.recv(1) - if msg is not None: - #print(msg) - if msg.arbitration_id == 0x7df and msg.data[1] == 0x01: - service1(bus, msg) - else: - log.warning("Unknown ID %d or service code 0x%02x", msg.arbitration_id, msg.data[1]) - - except KeyboardInterrupt: - pass - -def usage(): - # DOTO: implement - pass - -def main(): - try: - opts, args = getopt.getopt(sys.argv[1:], "l:v", ["loglevel="]) - except getopt.GetoptError as err: - # print help information and exit: - print(err) # will print something like "option -a not recognized" - usage() - sys.exit(2) - - loglevel = "INFO" - - for o, a in opts: - if o == "-v": - loglevel = "DEBUG" - elif o in ("-l", "--loglevel"): - loglevel = a - elif o in ("-h", "--help"): - usage() - sys.exit() - else: - assert False, "unhandled option" - - numeric_level = getattr(log, loglevel.upper(), None) - if not isinstance(numeric_level, int): - raise ValueError('Invalid log level: %s' % loglevel) - log.basicConfig(level=numeric_level) - receive_all() - -if __name__ == "__main__": - main(); diff --git a/ecuSimulator.py b/ecuSimulator.py new file mode 100755 index 0000000..97d6628 --- /dev/null +++ b/ecuSimulator.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python + +from __future__ import print_function + +from random import randint + +import logging as log +import getopt, sys + +import can +from can.bus import BusState + + +emu_ecu_can_id = 0x7e8 + +class globs: + mil_on = 0 + confirmed_DTCs = 2 + dtcs = [0xc158, 0x0001] + + +def cansend(bus, msg: can.message): + data =msg.data + leng = len(data) + if data[0] == 0: + l = leng -1 + data[0] = l + if len(data)!=8: # padding + data += b"\x00"*(8-len(data)) + # data += b"\xcc"*(8-len(data)) + bus.send(msg) + +def service1(bus, msg :can.message): + pid = msg.data[2] + txmsg = can.Message(arbitration_id= emu_ecu_can_id, + data=[0x00, 0x41, pid], + is_extended_id=False) + # print(hex(msg.arbitration_id), msg.data, pid) + if pid == 0x00: + log.debug(">> Supported PIDs") + if 0: + p0= 0b10110000 # pids 1,3,4 + p1, p2, p3 =0,0,0 + pids = [p0,p1,p2,p3] # big endian + pids = [0xBF, 0xDF, 0xB9, 0x91] + txmsg.data[3:] = pids + cansend(bus, txmsg) + elif pid == 0x01: # added by rundekugel + log.debug(">> Status") + txmsg.data[3:]= bytes([ (globs.mil_on <<7)|globs.confirmed_DTCs, 0,0,0,0]) + cansend(bus, txmsg) + elif pid == 0x04: + log.debug(">> Calculated engine load") + txmsg.data[3:]= bytes([0x20]) + cansend(bus, txmsg) + elif pid == 0x05: + log.debug(">> Engine coolant temperature") + txmsg.data[3:]= bytes([randint(88 + 40, 95 + 40)]) + cansend(bus, txmsg) + elif pid == 0x0B: + log.debug(">> Intake manifold absolute pressure") + txmsg.data[3:]= bytes([randint(10, 40)]) + cansend(bus, txmsg) + elif pid == 0x0C: + log.debug(">> RPM") + txmsg.data[3:]= bytes([ randint(18, 70), randint(0, 255)]) + cansend(bus, txmsg) + elif pid == 0x0D: + log.debug(">> Speed") + txmsg.data[3:]= bytes([ randint(40, 60)]) + cansend(bus, txmsg) + elif pid == 0x0F: + log.debug(">> Intake air temperature") + txmsg.data[3:]= bytes([ randint(60, 64)]) + cansend(bus, txmsg) + elif pid == 0x10: + log.debug(">> MAF air flow rate") + txmsg.data[3:]= bytes([ 0x00, 0xFA]) + cansend(bus, txmsg) + elif pid == 0x11: + log.debug(">> Throttle position") + txmsg.data[3:]= bytes([ randint(20, 60)]) + cansend(bus, txmsg) + elif pid == 0x33: + log.debug(">> Absolute Barometric Pressure") + txmsg.data[3:]= bytes([ randint(20, 60)]) + cansend(bus, txmsg) + else: + log.warning("!!! Service 1, unknown code 0x%02x", pid) + txmsg.data[3:]= bytes([0x02, 0x7f, 0x22, 0x31]) + cansend(bus, txmsg) + +def service3(bus, msg :can.message): + """Show stored Diagnostic Trouble Codes (DTCs)""" + dtc2bytes = [0xc1, 0x15, 0x00, 0x01] + pid = msg.data[2] + print(hex(msg.arbitration_id), msg.data, pid) + txmsg = can.Message(arbitration_id =emu_ecu_can_id, + data=[0x00, 0x43, globs.confirmed_DTCs ] +dtc2bytes , + is_extended_id=False) + cansend(bus, txmsg) + +def service7(bus, msg :can.message): + """Show pending Diagnostic Trouble Codes (detected during current or last driving cycle) """ + pid = msg.data[2] + print(hex(msg.arbitration_id), msg.data, pid) + txmsg = can.Message(arbitration_id =emu_ecu_can_id, + data=[0x00, 0x47, 0, 0,0,0,0], + is_extended_id=False) + cansend(bus, txmsg) + +def service10(bus, msg :can.message): + """send Permanent Diagnostic Trouble Codes (DTCs) (Cleared DTCs) """ + pid = msg.data[2] + print(hex(msg.arbitration_id), msg.data, pid) + txmsg = can.Message(arbitration_id =emu_ecu_can_id, + data=[0x00, 0x4a, 0, 0,0,0,0], + is_extended_id=False) + cansend(bus, txmsg) + + +def service4(bus, msg :can.message): + """ Clear DTCs""" + log.debug(">> Clear DTCs") + pid = msg.data[2] + print(hex(msg.arbitration_id), msg.data, pid) + txmsg = can.Message(arbitration_id =emu_ecu_can_id, + data=[0x00, 0x44, 0], + is_extended_id=False) + cansend(bus, txmsg) + globs.confirmed_DTCs = 0 + + +def receive_all(): + + bus = can.interface.Bus(interface='socketcan',channel='can0') + #bus = can.interface.Bus(bustype='ixxat', channel=0, bitrate=250000) + #bus = can.interface.Bus(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000) + + #bus.state = BusState.ACTIVE + #bus.state = BusState.PASSIVE + + try: + while True: + msg = bus.recv(1) + while 1: + #print(msg) + if msg is None: + break + if msg.arbitration_id in [emu_ecu_can_id]: + log.warning("Other ECU is also responding!") + break + if not msg.arbitration_id in [0x7df, 0x7e0]: + log.warning("Unknown ID %d (=0x%x) "%(msg.arbitration_id,msg.arbitration_id)) + break + if msg.data[1] == 0x01: + service1(bus, msg) + break + if msg.data[1] == 0x03: + service3(bus, msg) + break + if msg.data[1] == 0x04: + service4(bus, msg) + break + if msg.data[1] == 0x07: + service7(bus, msg) + break + if msg.data[1] == 0x0a: + service10(bus, msg) + break + log.warning("Unknown service code 0x%02x", msg.data[1]) + break + except KeyboardInterrupt: + pass + +def usage(): + # DOTO: implement + pass + +def main(): + try: + opts, args = getopt.getopt(sys.argv[1:], "l:v", ["loglevel="]) + except getopt.GetoptError as err: + # print help information and exit: + print(err) # will print something like "option -a not recognized" + usage() + sys.exit(2) + + loglevel = "INFO" + + for o, a in opts: + if o == "-v": + loglevel = "DEBUG" + elif o in ("-l", "--loglevel"): + loglevel = a + elif o in ("-h", "--help"): + usage() + sys.exit() + else: + assert False, "unhandled option" + + numeric_level = getattr(log, loglevel.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError('Invalid log level: %s' % loglevel) + log.basicConfig(level=numeric_level) + receive_all() + +if __name__ == "__main__": + main(); diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..db96487 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +can==0.0.0 +packaging==25.0 +python-can==4.6.1 +typing_extensions==4.15.0 +wrapt==1.17.3 diff --git a/ui.py b/ui.py index 9c35166..2ae5af5 100755 --- a/ui.py +++ b/ui.py @@ -1,5 +1,16 @@ #!/usr/bin/env python3 +""" +this is a ui for ecu-simulator +you have to init a can-socket first + +there are a few options: +-v: verbose +-l: loglevel +-h: this help +""" + + import tkinter as tk from tkinter import filedialog, messagebox import glob @@ -10,10 +21,25 @@ from datetime import datetime import threading import logging as log +import time import can from can.bus import BusState +import ecuSimulator as ecusim + +__version__ = "0.1.0" + +emu_ecu_can_id = 0x7e8 + +class globs: + mil_on = 0 + confirmed_DTCs = 2 + dtcs = [0xc158, 0x0001] + continue_mode = 0 + vinpos=0 + + class Application(tk.Frame): def __init__(self, master=None): super().__init__(master) @@ -207,7 +233,7 @@ def can_connect(self): messagebox.showwarning(message="CAN interface is not available or selected") return - self.bus = can.interface.Bus(bustype='socketcan', channel=self.can_device_var.get()) + self.bus = can.interface.Bus(interface='socketcan', channel=self.can_device_var.get()) if self.bus is None: self.add_log('Bus {:s} cannot be connected'.format(self.can_device_var.get())) return @@ -281,35 +307,38 @@ def on_cb_rpm_auto(self): pass def service1(self, msg): - if msg.data[2] == 0x00: + pid = msg.data[2] + if pid == 0x00: log.debug(">> Caps") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x06, 0x41, 0x00, 0x18, 0x3B, 0x80, 0x00], is_extended_id=False) - self.bus.send(msg) - elif msg.data[2] == 0x04: + self.bus.send(msg) + elif pid == 0x01: # added by rundekugel + ecusim.service1(self.bus, msg) + elif pid == 0x04: log.debug(">> Calculated engine load") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x04, 0x20], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x05: + elif pid == 0x05: log.debug(">> Engine coolant temperature") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x05, randint(88 + 40, 95 + 40)], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x0B: + elif pid == 0x0B: log.debug(">> Intake manifold absolute pressure") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x04, 0x41, 0x0B, randint(10, 40)], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x0C: + elif pid == 0x0C: log.debug(">> RPM") if self.rpm_var_auto.get(): - val = randint(self.rpm_var_min.get(), self.rpm_var_max.get()) + val = int(randint(int(self.rpm_var_min.get()), int(self.rpm_var_max.get()))) else: val = self.rpm_var.get() @@ -317,11 +346,11 @@ def service1(self, msg): valA = int(val / 256) valB = int(val - valA*256) - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x04, 0x41, 0x0C, valA, valB], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x0D: + elif pid == 0x0D: log.debug(">> Speed") if self.speed_var_auto.get(): @@ -329,83 +358,120 @@ def service1(self, msg): else: val = self.speed_var.get() - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x0D, val], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x0F: + elif pid == 0x0F: log.debug(">> Intake air temperature") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x0F, randint(60, 64)], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x10: + elif pid == 0x10: log.debug(">> MAF air flow rate") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x04, 0x41, 0x10, 0x00, 0xFA], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x11: + elif pid == 0x11: log.debug(">> Throttle position") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x11, randint(20, 60)], is_extended_id=False) self.bus.send(msg) - elif msg.data[2] == 0x33: + elif pid == 0x33: log.debug(">> Absolute Barometric Pressure") - msg = can.Message(arbitration_id=0x7e8, + msg = can.Message(arbitration_id=emu_ecu_can_id, data=[0x03, 0x41, 0x33, randint(20, 60)], is_extended_id=False) self.bus.send(msg) + elif pid == 0x5c: + log.debug(">> Oil Temp.") + msg = can.Message(arbitration_id=emu_ecu_can_id, + data=[0x03, 0x41, pid, randint(50, 105)], + is_extended_id=False) + self.bus.send(msg) else: - self.add_log('Service 1, unknown PID=0x{:02x}'.format(msg.data[2])) + self.add_log('Service 1, unknown PID=0x{:02x}'.format(pid)) def service9(self, msg): - if msg.data[2] == 0x02: + vin= b"1A2B3C4D5E6F7G8HZ" + + if msg.data[0]==0x30: + log.debug(">> VIN code part2+3") + globs.continue_mode+=1 + txmsg = can.Message(arbitration_id=emu_ecu_can_id, + data= [0x20+globs.continue_mode] +list(vin[globs.vinpos:globs.vinpos+7]), + is_extended_id=False) + globs.vinpos += len(msg.data[1:]) + self.bus.send(txmsg) + globs.continue_mode+=1 + txmsg = can.Message(arbitration_id=emu_ecu_can_id, + data= [0x20+globs.continue_mode] +list(vin[globs.vinpos:globs.vinpos+7]), + is_extended_id=False) + globs.vinpos += len(msg.data[1:]) + self.bus.send(txmsg) + return + if msg.data[2] == 0x00: + log.debug(">> VIN support") + txmsg = can.Message(arbitration_id=emu_ecu_can_id, + data=[0x04, 0x14, 0x00, 0x42, 0x02, 0x00], + is_extended_id=False) + self.bus.send(txmsg) + globs.vinpos=0 + return + if msg.data[2] in [0x02]: log.debug(">> VIN code") - msg = can.Message(arbitration_id=0x7e8, - data=[0x10, 0x14, 0x49, 0x02, 0x01, 0x33, 0x46, 0x41], + txmsg = can.Message(arbitration_id=emu_ecu_can_id, + data=[0x10, 0x14, 0x49, 0x02, 0x01] +list(vin[globs.vinpos:globs.vinpos+3]), is_extended_id=False) - self.bus.send(msg) + self.bus.send(txmsg) + globs.vinpos+=3 + globs.continue_mode = 0 # # XXX: Need to be designed and implemented correct handling for "continue" request: # 7E0 [8] 30 00 00 00 00 00 00 00 # - # Right now we just sending all VIN code, i.e. without hand-shaking - that is not good - # - # Also, here hardcoded VIN of some unknown Ford and it need to be replaced with editable entry - # - msg = can.Message(arbitration_id=0x7e8, - data=[0x21, 0x44, 0x50, 0x34, 0x46, 0x4A, 0x32, 0x42], - is_extended_id=False) - self.bus.send(msg) - msg = can.Message(arbitration_id=0x7e8, - data=[0x22, 0x4D, 0x31, 0x31, 0x33, 0x39, 0x31, 0x33], - is_extended_id=False) - self.bus.send(msg) + return + else: self.add_log('Service 9, unknown PID=0x{:02x}'.format(msg.data[2])) def receive_all(self): self.event.wait() - + while self.can_is_started: msg = self.bus.recv(1) # Just skip 'bad' messages if msg is None: continue - if msg.arbitration_id != 0x7df: + print(msg) + if msg.arbitration_id not in [0x7df, 0x7e0]: self.add_log('Unknown Id 0x{:03x}'.format(msg.arbitration_id)) continue - - if msg.data[1] == 0x01: + serviceid = msg.data[1] + if serviceid == 0x01: self.service1(msg) - elif msg.data[1] == 0x09: + elif serviceid == 0x09: + self.service9(msg) + elif serviceid == 0x04: # delete DTCs + ecusim.service4(self.bus, msg) + elif serviceid == 0x03: # read DTCs + ecusim.service3(self.bus, msg) + elif serviceid == 0x07: + ecusim.service7(self.bus, msg) + elif serviceid == 0x0a: + ecusim.service10(self.bus, msg) + elif msg.arbitration_id == 0x7e0 and msg.data[0] == 0x30: self.service9(msg) else: self.add_log('Service {:d} is not supported'.format(msg.data[1])) +def usage(): + print(__doc__) + if __name__ == "__main__": try: opts, args = getopt.getopt(sys.argv[1:], "l:v", ["loglevel="]) @@ -433,6 +499,9 @@ def receive_all(self): raise ValueError('Invalid log level: %s' % loglevel) log.basicConfig(level=numeric_level) + ecusim.globs = globs + ecusim.emu_ecu_can_id = emu_ecu_can_id + window = tk.Tk() window.title("ECU Simulator") app = Application(master=window)