Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
* Reuben Balik <[email protected]>
* Erik Larson <[email protected]>
* Jeremy Tidemann <[email protected]>
* Parker LeBlanc <[email protected]>
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# What's New in Hologram Python SDK

## v0.10.2
2025-10-07 Hologram <[email protected]>
* Add support for Quectel EC-25 and EG25

## v0.10.0
2023-09-05 Hologram <[email protected]>
* targets python version 3.9
Expand Down
108 changes: 61 additions & 47 deletions Hologram/Network/Cellular.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,18 @@
from Hologram.Event import Event
from Exceptions.HologramError import NetworkError
from Hologram.Network.Route import Route
from Hologram.Network.Modem import Modem, E303, MS2131, E372, BG96, EC21, Nova_U201, NovaM, DriverLoader
from Hologram.Network.Modem import (
Modem,
E303,
MS2131,
E372,
BG96,
EC21,
EC25,
Nova_U201,
NovaM,
DriverLoader,
)
from Hologram.Network import Network, NetworkScope
import time
from typing import Union
Expand All @@ -24,19 +35,21 @@
CLOUD_ERR_SIGNAL = 5
CLOUD_ERR_CONNECT = 12

DEFAULT_CELLULAR_TIMEOUT = 200 # slightly more than 3 mins
DEFAULT_CELLULAR_TIMEOUT = 200 # slightly more than 3 mins


class Cellular(Network):

_modemHandlers = {
'e303': E303.E303,
'ms2131': MS2131.MS2131,
'e372': E372.E372,
'bg96': BG96.BG96,
'ec21': EC21.EC21,
'nova': Nova_U201.Nova_U201,
'novam': NovaM.NovaM,
'': Modem
"e303": E303.E303,
"ms2131": MS2131.MS2131,
"e372": E372.E372,
"bg96": BG96.BG96,
"ec21": EC21.EC21,
"ec25": EC25.EC25,
"nova": Nova_U201.Nova_U201,
"novam": NovaM.NovaM,
"": Modem,
}

def __init__(self, event=Event()):
Expand All @@ -46,70 +59,69 @@ def __init__(self, event=Event()):
self._route = Route()
self.__receive_port = None


def autodetect_modem(self):
# scan for a modem and set it if found
first_modem_handler = Cellular._scan_and_select_first_supported_modem()
if first_modem_handler is None:
raise NetworkError('Modem not detected')
raise NetworkError("Modem not detected")
self.modem = first_modem_handler(event=self.event)

def load_modem_drivers(self):
self._load_modem_drivers()


def getConnectionStatus(self):
return self._connection_status

def is_connected(self):
return self._connection_status == CLOUD_CONNECTED or self.modem.is_connected()

def connect(self, timeout = DEFAULT_CELLULAR_TIMEOUT):
self.logger.info('Connecting to cell network with timeout of %s seconds', timeout)
def connect(self, timeout=DEFAULT_CELLULAR_TIMEOUT):
self.logger.info(
"Connecting to cell network with timeout of %s seconds", timeout
)
success = False
try:
success = self.modem.connect(timeout = timeout)
success = self.modem.connect(timeout=timeout)
except KeyboardInterrupt as e:
pass


if success:
self.logger.info('Successfully connected to cell network')
self.logger.info("Successfully connected to cell network")
# Disable at sockets mode since we're already establishing PPP.
# This call is needed in certain modems that have limited interfaces to work with.
time.sleep(2)
# give the device a little time to enumerate
self.disable_at_sockets_mode()
self.__configure_routing()
self._connection_status = CLOUD_CONNECTED
self.event.broadcast('cellular.connected')
self.event.broadcast("cellular.connected")
super().connect()
else:
self.logger.info('Failed to connect to cell network')
self.logger.info("Failed to connect to cell network")

return success

def disconnect(self):
self.logger.info('Disconnecting from cell network')
self.logger.info("Disconnecting from cell network")
self.__remove_routing()
success = self.modem.disconnect()
if success:
self.logger.info('Successfully disconnected from cell network')
self.logger.info("Successfully disconnected from cell network")
self.enable_at_sockets_mode()
self._connection_status = CLOUD_DISCONNECTED
self.event.broadcast('cellular.disconnected')
self.event.broadcast("cellular.disconnected")
super().disconnect()
else:
self.logger.info('Failed to disconnect from cell network')
self.logger.info("Failed to disconnect from cell network")

return success

def reconnect(self):
self.logger.info('Reconnecting to cell network')
self.logger.info("Reconnecting to cell network")
success = self.disconnect()

if success == False:
self.logger.info('Failed to disconnect from cell network')
self.logger.info("Failed to disconnect from cell network")
return False

return self.connect()
Expand All @@ -136,7 +148,9 @@ def send_message(self, data):

def open_receive_socket(self, receive_port):
self.__receive_port = receive_port
self.event.subscribe('cellular.forced_disconnect', self.__reconnect_after_forced_disconnect)
self.event.subscribe(
"cellular.forced_disconnect", self.__reconnect_after_forced_disconnect
)
return self.modem.open_receive_socket(receive_port)

def pop_received_message(self):
Expand Down Expand Up @@ -167,57 +181,57 @@ def __reconnect_and_receive(self):
self.open_receive_socket(self.__receive_port)

def __reconnect_after_forced_disconnect(self):
self.logger.info('Reconnecting after forced disconnect...')
self.logger.info("Reconnecting after forced disconnect...")
time.sleep(5) # uBlox takes some time to update internal state after disconnect
self.__reconnect_and_receive()
while not self.is_connected():
self.logger.info('Reconnect failed. Retrying in 5 seconds...')
self.logger.info("Reconnect failed. Retrying in 5 seconds...")
time.sleep(5)
self.__reconnect_and_receive()
self.logger.info('Ready to receive data on port %s', self.__receive_port)
self.logger.info("Ready to receive data on port %s", self.__receive_port)

def __configure_routing(self):
# maybe we don't have to tear down the routes but we probably should
self.logger.info('Adding routes to Hologram cloud')
self._route.add('10.176.0.0/16', self.localIPAddress)
self._route.add('10.254.0.0/16', self.localIPAddress)
self.logger.info("Adding routes to Hologram cloud")
self._route.add("10.176.0.0/16", self.localIPAddress)
self._route.add("10.254.0.0/16", self.localIPAddress)
if self.scope == NetworkScope.SYSTEM:
self.logger.info('Adding system-wide default route to cellular interface')
self.logger.info("Adding system-wide default route to cellular interface")
self._route.add_default(self.localIPAddress)

def __remove_routing(self):
self.logger.info('Removing routes to Hologram cloud')
self.logger.info("Removing routes to Hologram cloud")
if self.localIPAddress:
self._route.delete('10.176.0.0/16', self.localIPAddress)
self._route.delete('10.254.0.0/16', self.localIPAddress)
self._route.delete("10.176.0.0/16", self.localIPAddress)
self._route.delete("10.254.0.0/16", self.localIPAddress)
if self.scope == NetworkScope.SYSTEM:
self.logger.info('Removing system-wide default route to cellular interface')
self.logger.info(
"Removing system-wide default route to cellular interface"
)
self._route.delete_default(self.localIPAddress)

def _load_modem_drivers(self):
dl = DriverLoader.DriverLoader()
for (modemName, modemHandler) in self._modemHandlers.items():
for modemName, modemHandler in self._modemHandlers.items():
module = modemHandler.module
if module:
if not dl.is_module_loaded(module):
self.logger.info('Loading module %s for %s', module, modemName)
self.logger.info("Loading module %s for %s", module, modemName)
dl.load_module(module)
syspath = modemHandler.syspath
if syspath:
usb_ids = modemHandler.usb_ids
for vid_pid in usb_ids:
dl.force_driver_for_device(syspath, vid_pid[0], vid_pid[1])


@staticmethod
def _scan_and_select_first_supported_modem() -> Union[Modem, None]:
for (_, modemHandler) in Cellular._modemHandlers.items():
for _, modemHandler in Cellular._modemHandlers.items():
modem_exists = Cellular._does_modem_exist_for_handler(modemHandler)
if modem_exists:
return modemHandler
return None


@staticmethod
def _does_modem_exist_for_handler(modemHandler):
usb_ids = modemHandler.usb_ids
Expand All @@ -232,12 +246,14 @@ def _does_modem_exist_for_handler(modemHandler):
def scan_for_all_usable_modems() -> list[Modem]:
modems = []
unique_imeis = set()
for (_, modemHandler) in Cellular._modemHandlers.items():
for _, modemHandler in Cellular._modemHandlers.items():
modem_exists = Cellular._does_modem_exist_for_handler(modemHandler)
if modem_exists:
try:
test_handler = modemHandler()
usable_ports = test_handler.detect_usable_serial_port(stop_on_first=False)
usable_ports = test_handler.detect_usable_serial_port(
stop_on_first=False
)
for port in usable_ports:
modem = modemHandler(device_name=port)
imei = modem.imei
Expand All @@ -249,8 +265,6 @@ def scan_for_all_usable_modems() -> list[Modem]:
pass
return modems



@property
def modem(self):
return self._modem
Expand Down
36 changes: 36 additions & 0 deletions Hologram/Network/Modem/EC25.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# EC25.py - Hologram Python SDK Quectel EC25 modem interface
#
# Author: Hologram <[email protected]>
#
# Copyright 2025 - Hologram (Konekt, Inc.)
#
#
# LICENSE: Distributed under the terms of the MIT License
#

from Hologram.Network.Modem.Quectel import Quectel
from UtilClasses import ModemResult

DEFAULT_EC21_TIMEOUT = 200


class EC25(Quectel):
usb_ids = [("2c7c", "0125")]

def connect(self, timeout=DEFAULT_EC21_TIMEOUT):
success = super().connect(timeout)
return success

def _tear_down_pdp_context(self):
if not self._is_pdp_context_active():
return True
self.logger.info("Tearing down PDP context")
ok, _ = self.set("+QIDEACT", "1", timeout=30)
if ok != ModemResult.OK:
self.logger.error("PDP Context tear down failed")
else:
self.logger.info("PDP context deactivated")

@property
def description(self):
return "Quectel EC25"
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.10.1
0.10.2