diff --git a/AUTHORS.md b/AUTHORS.md index eff33fa..17fb3a8 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -4,3 +4,4 @@ * Reuben Balik * Erik Larson * Jeremy Tidemann +* Parker LeBlanc diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e53c30..70eee24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # What's New in Hologram Python SDK +## v0.10.2 +2025-10-07 Hologram +* Add support for Quectel EC-25 and EG25 + ## v0.10.0 2023-09-05 Hologram * targets python version 3.9 diff --git a/Hologram/Network/Cellular.py b/Hologram/Network/Cellular.py index b9c1d96..fc63c16 100644 --- a/Hologram/Network/Cellular.py +++ b/Hologram/Network/Cellular.py @@ -11,7 +11,18 @@ from Hologram.Event import Event from Exceptions.HologramError import NetworkError from Hologram.Network.Route import Route -from Hologram.Network.Modem import Modem, E303, MS2131, E372, BG96, EC21, Nova_U201, NovaM, DriverLoader +from Hologram.Network.Modem import ( + Modem, + E303, + MS2131, + E372, + BG96, + EC21, + EC25, + Nova_U201, + NovaM, + DriverLoader, +) from Hologram.Network import Network, NetworkScope import time from typing import Union @@ -24,19 +35,21 @@ CLOUD_ERR_SIGNAL = 5 CLOUD_ERR_CONNECT = 12 -DEFAULT_CELLULAR_TIMEOUT = 200 # slightly more than 3 mins +DEFAULT_CELLULAR_TIMEOUT = 200 # slightly more than 3 mins + class Cellular(Network): _modemHandlers = { - 'e303': E303.E303, - 'ms2131': MS2131.MS2131, - 'e372': E372.E372, - 'bg96': BG96.BG96, - 'ec21': EC21.EC21, - 'nova': Nova_U201.Nova_U201, - 'novam': NovaM.NovaM, - '': Modem + "e303": E303.E303, + "ms2131": MS2131.MS2131, + "e372": E372.E372, + "bg96": BG96.BG96, + "ec21": EC21.EC21, + "ec25": EC25.EC25, + "nova": Nova_U201.Nova_U201, + "novam": NovaM.NovaM, + "": Modem, } def __init__(self, event=Event()): @@ -46,35 +59,34 @@ def __init__(self, event=Event()): self._route = Route() self.__receive_port = None - def autodetect_modem(self): # scan for a modem and set it if found first_modem_handler = Cellular._scan_and_select_first_supported_modem() if first_modem_handler is None: - raise NetworkError('Modem not detected') + raise NetworkError("Modem not detected") self.modem = first_modem_handler(event=self.event) def load_modem_drivers(self): self._load_modem_drivers() - def getConnectionStatus(self): return self._connection_status def is_connected(self): return self._connection_status == CLOUD_CONNECTED or self.modem.is_connected() - def connect(self, timeout = DEFAULT_CELLULAR_TIMEOUT): - self.logger.info('Connecting to cell network with timeout of %s seconds', timeout) + def connect(self, timeout=DEFAULT_CELLULAR_TIMEOUT): + self.logger.info( + "Connecting to cell network with timeout of %s seconds", timeout + ) success = False try: - success = self.modem.connect(timeout = timeout) + success = self.modem.connect(timeout=timeout) except KeyboardInterrupt as e: pass - if success: - self.logger.info('Successfully connected to cell network') + self.logger.info("Successfully connected to cell network") # Disable at sockets mode since we're already establishing PPP. # This call is needed in certain modems that have limited interfaces to work with. time.sleep(2) @@ -82,34 +94,34 @@ def connect(self, timeout = DEFAULT_CELLULAR_TIMEOUT): self.disable_at_sockets_mode() self.__configure_routing() self._connection_status = CLOUD_CONNECTED - self.event.broadcast('cellular.connected') + self.event.broadcast("cellular.connected") super().connect() else: - self.logger.info('Failed to connect to cell network') + self.logger.info("Failed to connect to cell network") return success def disconnect(self): - self.logger.info('Disconnecting from cell network') + self.logger.info("Disconnecting from cell network") self.__remove_routing() success = self.modem.disconnect() if success: - self.logger.info('Successfully disconnected from cell network') + self.logger.info("Successfully disconnected from cell network") self.enable_at_sockets_mode() self._connection_status = CLOUD_DISCONNECTED - self.event.broadcast('cellular.disconnected') + self.event.broadcast("cellular.disconnected") super().disconnect() else: - self.logger.info('Failed to disconnect from cell network') + self.logger.info("Failed to disconnect from cell network") return success def reconnect(self): - self.logger.info('Reconnecting to cell network') + self.logger.info("Reconnecting to cell network") success = self.disconnect() if success == False: - self.logger.info('Failed to disconnect from cell network') + self.logger.info("Failed to disconnect from cell network") return False return self.connect() @@ -136,7 +148,9 @@ def send_message(self, data): def open_receive_socket(self, receive_port): self.__receive_port = receive_port - self.event.subscribe('cellular.forced_disconnect', self.__reconnect_after_forced_disconnect) + self.event.subscribe( + "cellular.forced_disconnect", self.__reconnect_after_forced_disconnect + ) return self.modem.open_receive_socket(receive_port) def pop_received_message(self): @@ -167,40 +181,42 @@ def __reconnect_and_receive(self): self.open_receive_socket(self.__receive_port) def __reconnect_after_forced_disconnect(self): - self.logger.info('Reconnecting after forced disconnect...') + self.logger.info("Reconnecting after forced disconnect...") time.sleep(5) # uBlox takes some time to update internal state after disconnect self.__reconnect_and_receive() while not self.is_connected(): - self.logger.info('Reconnect failed. Retrying in 5 seconds...') + self.logger.info("Reconnect failed. Retrying in 5 seconds...") time.sleep(5) self.__reconnect_and_receive() - self.logger.info('Ready to receive data on port %s', self.__receive_port) + self.logger.info("Ready to receive data on port %s", self.__receive_port) def __configure_routing(self): # maybe we don't have to tear down the routes but we probably should - self.logger.info('Adding routes to Hologram cloud') - self._route.add('10.176.0.0/16', self.localIPAddress) - self._route.add('10.254.0.0/16', self.localIPAddress) + self.logger.info("Adding routes to Hologram cloud") + self._route.add("10.176.0.0/16", self.localIPAddress) + self._route.add("10.254.0.0/16", self.localIPAddress) if self.scope == NetworkScope.SYSTEM: - self.logger.info('Adding system-wide default route to cellular interface') + self.logger.info("Adding system-wide default route to cellular interface") self._route.add_default(self.localIPAddress) def __remove_routing(self): - self.logger.info('Removing routes to Hologram cloud') + self.logger.info("Removing routes to Hologram cloud") if self.localIPAddress: - self._route.delete('10.176.0.0/16', self.localIPAddress) - self._route.delete('10.254.0.0/16', self.localIPAddress) + self._route.delete("10.176.0.0/16", self.localIPAddress) + self._route.delete("10.254.0.0/16", self.localIPAddress) if self.scope == NetworkScope.SYSTEM: - self.logger.info('Removing system-wide default route to cellular interface') + self.logger.info( + "Removing system-wide default route to cellular interface" + ) self._route.delete_default(self.localIPAddress) def _load_modem_drivers(self): dl = DriverLoader.DriverLoader() - for (modemName, modemHandler) in self._modemHandlers.items(): + for modemName, modemHandler in self._modemHandlers.items(): module = modemHandler.module if module: if not dl.is_module_loaded(module): - self.logger.info('Loading module %s for %s', module, modemName) + self.logger.info("Loading module %s for %s", module, modemName) dl.load_module(module) syspath = modemHandler.syspath if syspath: @@ -208,16 +224,14 @@ def _load_modem_drivers(self): for vid_pid in usb_ids: dl.force_driver_for_device(syspath, vid_pid[0], vid_pid[1]) - @staticmethod def _scan_and_select_first_supported_modem() -> Union[Modem, None]: - for (_, modemHandler) in Cellular._modemHandlers.items(): + for _, modemHandler in Cellular._modemHandlers.items(): modem_exists = Cellular._does_modem_exist_for_handler(modemHandler) if modem_exists: return modemHandler return None - @staticmethod def _does_modem_exist_for_handler(modemHandler): usb_ids = modemHandler.usb_ids @@ -232,12 +246,14 @@ def _does_modem_exist_for_handler(modemHandler): def scan_for_all_usable_modems() -> list[Modem]: modems = [] unique_imeis = set() - for (_, modemHandler) in Cellular._modemHandlers.items(): + for _, modemHandler in Cellular._modemHandlers.items(): modem_exists = Cellular._does_modem_exist_for_handler(modemHandler) if modem_exists: try: test_handler = modemHandler() - usable_ports = test_handler.detect_usable_serial_port(stop_on_first=False) + usable_ports = test_handler.detect_usable_serial_port( + stop_on_first=False + ) for port in usable_ports: modem = modemHandler(device_name=port) imei = modem.imei @@ -249,8 +265,6 @@ def scan_for_all_usable_modems() -> list[Modem]: pass return modems - - @property def modem(self): return self._modem diff --git a/Hologram/Network/Modem/EC25.py b/Hologram/Network/Modem/EC25.py new file mode 100644 index 0000000..4f5e4f7 --- /dev/null +++ b/Hologram/Network/Modem/EC25.py @@ -0,0 +1,36 @@ +# EC25.py - Hologram Python SDK Quectel EC25 modem interface +# +# Author: Hologram +# +# Copyright 2025 - Hologram (Konekt, Inc.) +# +# +# LICENSE: Distributed under the terms of the MIT License +# + +from Hologram.Network.Modem.Quectel import Quectel +from UtilClasses import ModemResult + +DEFAULT_EC21_TIMEOUT = 200 + + +class EC25(Quectel): + usb_ids = [("2c7c", "0125")] + + def connect(self, timeout=DEFAULT_EC21_TIMEOUT): + success = super().connect(timeout) + return success + + def _tear_down_pdp_context(self): + if not self._is_pdp_context_active(): + return True + self.logger.info("Tearing down PDP context") + ok, _ = self.set("+QIDEACT", "1", timeout=30) + if ok != ModemResult.OK: + self.logger.error("PDP Context tear down failed") + else: + self.logger.info("PDP context deactivated") + + @property + def description(self): + return "Quectel EC25" diff --git a/version.txt b/version.txt index 5712157..5eef0f1 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.10.1 +0.10.2