From 340e9ea34f5c8213b68876e812f7b2ab1302de7c Mon Sep 17 00:00:00 2001 From: matt bark Date: Fri, 22 Aug 2025 15:58:31 -0400 Subject: [PATCH 1/4] Introduce TopicFilter support Adds support for TopicFilter --- onvif/client.py | 3 ++- onvif/managers.py | 59 ++++++++++++++++++++++++++++++++++++++++++----- onvif/types.py | 19 +++++++++++++++ 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/onvif/client.py b/onvif/client.py index 55df04c..f0e5081 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -560,9 +560,10 @@ async def create_pullpoint_manager( self, interval: dt.timedelta, subscription_lost_callback: Callable[[], None], + topic_filter: str | None = None, ) -> PullPointManager: """Create a pullpoint manager.""" - manager = PullPointManager(self, interval, subscription_lost_callback) + manager = PullPointManager(self, interval, subscription_lost_callback, topic_filter) await manager.start() return manager diff --git a/onvif/managers.py b/onvif/managers.py index cc60add..0cf3105 100644 --- a/onvif/managers.py +++ b/onvif/managers.py @@ -5,22 +5,29 @@ import asyncio import datetime as dt import logging + from abc import abstractmethod from collections.abc import Callable from typing import TYPE_CHECKING, Any + +from lxml.etree import XPath, XPathSyntaxError +from zeep.client import Client from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError from zeep.loader import parse_xml from zeep.wsdl.bindings.soap import SoapOperation +from zeep.xsd import Element, AnyObject import aiohttp from onvif.exceptions import ONVIFError from .settings import DEFAULT_SETTINGS from .transport import ASYNC_TRANSPORT +from .types import TopicExpression from .util import normalize_url, stringify_onvif_error from .wrappers import retry_connection_error + logger = logging.getLogger("onvif") @@ -34,6 +41,7 @@ # this value, we will use this value instead to prevent subscribing over and over # again. MINIMUM_SUBSCRIPTION_SECONDS = 60.0 +MINIMUM_SUBSCRIPTION_INTERVAL = dt.timedelta(seconds=MINIMUM_SUBSCRIPTION_SECONDS) if TYPE_CHECKING: from onvif.client import ONVIFCamera, ONVIFService @@ -50,7 +58,7 @@ def __init__( ) -> None: """Initialize the notification processor.""" self._device = device - self._interval = interval + self._interval = interval if interval <= MINIMUM_SUBSCRIPTION_INTERVAL else MINIMUM_SUBSCRIPTION_INTERVAL self._subscription: ONVIFService | None = None self._restart_or_renew_task: asyncio.Task | None = None self._loop = asyncio.get_event_loop() @@ -290,6 +298,33 @@ def process(self, content: bytes) -> Any | None: class PullPointManager(BaseManager): """Manager for PullPoint.""" + def __init__( + self, + device: ONVIFCamera, + interval: dt.timedelta, + subscription_lost_callback: Callable[[], None], + topic_filter: TopicExpression | None = None, + ) -> None: + """ + Create a Manager for PullPoint + :param device: ONVIFCamera + :param interval: Controls the termination time of the PullPoint session. Minimum value is 60s + :param subscription_lost_callback: Called when a subscroption is lost and cannot be re-established + :param topic_filter: An optional XPATH used to control the topics the subscription will listen to. + + :raises XPathSyntaxError: If an invalid, non-None topic_filter is provided + + Notes: + If your ONVIFCamera has a FixedTopicSet, you will likely not be able to use wildcards in your topic_filter + + Examples: + >>> from datetime import timedelta + >>> PullPointManager(cam, timedelta(seconds=60), lambda: print("Lost connection!"), "tns1:RuleEngine/CellMotionDetector/Motion") + + """ + self._topic_filter: str | None = XPath(topic_filter).path if topic_filter else None + super().__init__(device, interval, subscription_lost_callback) + async def _start(self) -> float: """ Start the PullPoint manager. @@ -299,14 +334,25 @@ async def _start(self) -> float: device = self._device logger.debug("%s: Setup the PullPoint manager", device.host) events_service = await device.create_events_service() - result = await events_service.CreatePullPointSubscription( - { - "InitialTerminationTime": device.get_next_termination_time( - self._interval - ), + + subscription_params = { + "InitialTerminationTime": device.get_next_termination_time( + self._interval + ), + } + # Alternatively, filter could be accepted as an argument + # and we can expect the caller to create the TopicExpression + # this would allow them to control the dialect too? + if self._topic_filter: + subscription_params["Filter"] = { + "_value_1": TopicExpression.from_client(events_service.zeep_client, self._topic_filter), } + + result = await events_service.CreatePullPointSubscription( + subscription_params ) # pylint: disable=protected-access + device.xaddrs[ "http://www.onvif.org/ver10/events/wsdl/PullPointSubscription" ] = normalize_url(result.SubscriptionReference.Address._value_1) @@ -314,6 +360,7 @@ async def _start(self) -> float: self._subscription = await device.create_subscription_service( "PullPointSubscription" ) + # Create the service that will be used to pull messages from the device. self._service = await device.create_pullpoint_service() if device.has_broken_relative_time( diff --git a/onvif/types.py b/onvif/types.py index c6a0d2f..c7245be 100644 --- a/onvif/types.py +++ b/onvif/types.py @@ -2,7 +2,12 @@ from datetime import datetime, timedelta, time import ciso8601 +from zeep.client import Client +from zeep.xsd.types.any import AnyObject +from zeep.xsd.types.complex import ComplexType from zeep.xsd.types.builtins import DateTime, treat_whitespace, Time +from zeep.xsd.elements import Element + import isodate @@ -99,3 +104,17 @@ def pythonvalue(self, value: str) -> time: if fixed_dt := _try_parse_datetime(f"2024-01-15T{fixed_time}Z"): return (fixed_dt + timedelta(**offset)).time() return isodate.parse_time(value) + + +class TopicExpression(AnyObject): + NAMESPACE = "{http://docs.oasis-open.org/wsn/b-2}" + DIALECT = "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet" + + def __init__(self, topic_expression_type: ComplexType, value: str, dialect: str = DIALECT): + expression = Element(f"{self.NAMESPACE}TopicExpression", topic_expression_type) + super().__init__(expression, topic_expression_type(_value_1=value, Dialect=dialect)) + + @classmethod + def from_client(cls, client: Client, expression: str, dialect: str = DIALECT) -> "TopicExpression": + return cls(client.get_type(f"{cls.NAMESPACE}TopicExpressionType"), expression, dialect) + \ No newline at end of file From 5c5aad30b9b2a64b4c98eb40a8024386f89ea474 Mon Sep 17 00:00:00 2001 From: matt bark Date: Fri, 22 Aug 2025 17:41:47 -0400 Subject: [PATCH 2/4] Update managers.py --- onvif/managers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/onvif/managers.py b/onvif/managers.py index 0cf3105..c652750 100644 --- a/onvif/managers.py +++ b/onvif/managers.py @@ -5,7 +5,6 @@ import asyncio import datetime as dt import logging - from abc import abstractmethod from collections.abc import Callable from typing import TYPE_CHECKING, Any From 2893a078ed6524de959e36346a0023cd7b12dab8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 22 Aug 2025 21:43:28 +0000 Subject: [PATCH 3/4] chore(pre-commit.ci): auto fixes --- onvif/client.py | 4 +++- onvif/managers.py | 36 +++++++++++++++++++----------------- onvif/types.py | 19 +++++++++++++------ 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/onvif/client.py b/onvif/client.py index f0e5081..641107e 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -563,7 +563,9 @@ async def create_pullpoint_manager( topic_filter: str | None = None, ) -> PullPointManager: """Create a pullpoint manager.""" - manager = PullPointManager(self, interval, subscription_lost_callback, topic_filter) + manager = PullPointManager( + self, interval, subscription_lost_callback, topic_filter + ) await manager.start() return manager diff --git a/onvif/managers.py b/onvif/managers.py index c652750..bf344bc 100644 --- a/onvif/managers.py +++ b/onvif/managers.py @@ -10,12 +10,10 @@ from typing import TYPE_CHECKING, Any -from lxml.etree import XPath, XPathSyntaxError -from zeep.client import Client +from lxml.etree import XPath from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError from zeep.loader import parse_xml from zeep.wsdl.bindings.soap import SoapOperation -from zeep.xsd import Element, AnyObject import aiohttp from onvif.exceptions import ONVIFError @@ -57,7 +55,11 @@ def __init__( ) -> None: """Initialize the notification processor.""" self._device = device - self._interval = interval if interval <= MINIMUM_SUBSCRIPTION_INTERVAL else MINIMUM_SUBSCRIPTION_INTERVAL + self._interval = ( + interval + if interval <= MINIMUM_SUBSCRIPTION_INTERVAL + else MINIMUM_SUBSCRIPTION_INTERVAL + ) self._subscription: ONVIFService | None = None self._restart_or_renew_task: asyncio.Task | None = None self._loop = asyncio.get_event_loop() @@ -298,11 +300,11 @@ class PullPointManager(BaseManager): """Manager for PullPoint.""" def __init__( - self, - device: ONVIFCamera, - interval: dt.timedelta, - subscription_lost_callback: Callable[[], None], - topic_filter: TopicExpression | None = None, + self, + device: ONVIFCamera, + interval: dt.timedelta, + subscription_lost_callback: Callable[[], None], + topic_filter: TopicExpression | None = None, ) -> None: """ Create a Manager for PullPoint @@ -321,7 +323,9 @@ def __init__( >>> PullPointManager(cam, timedelta(seconds=60), lambda: print("Lost connection!"), "tns1:RuleEngine/CellMotionDetector/Motion") """ - self._topic_filter: str | None = XPath(topic_filter).path if topic_filter else None + self._topic_filter: str | None = ( + XPath(topic_filter).path if topic_filter else None + ) super().__init__(device, interval, subscription_lost_callback) async def _start(self) -> float: @@ -335,21 +339,19 @@ async def _start(self) -> float: events_service = await device.create_events_service() subscription_params = { - "InitialTerminationTime": device.get_next_termination_time( - self._interval - ), + "InitialTerminationTime": device.get_next_termination_time(self._interval), } # Alternatively, filter could be accepted as an argument # and we can expect the caller to create the TopicExpression # this would allow them to control the dialect too? if self._topic_filter: subscription_params["Filter"] = { - "_value_1": TopicExpression.from_client(events_service.zeep_client, self._topic_filter), + "_value_1": TopicExpression.from_client( + events_service.zeep_client, self._topic_filter + ), } - result = await events_service.CreatePullPointSubscription( - subscription_params - ) + result = await events_service.CreatePullPointSubscription(subscription_params) # pylint: disable=protected-access device.xaddrs[ diff --git a/onvif/types.py b/onvif/types.py index c7245be..313f6e9 100644 --- a/onvif/types.py +++ b/onvif/types.py @@ -110,11 +110,18 @@ class TopicExpression(AnyObject): NAMESPACE = "{http://docs.oasis-open.org/wsn/b-2}" DIALECT = "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet" - def __init__(self, topic_expression_type: ComplexType, value: str, dialect: str = DIALECT): + def __init__( + self, topic_expression_type: ComplexType, value: str, dialect: str = DIALECT + ): expression = Element(f"{self.NAMESPACE}TopicExpression", topic_expression_type) - super().__init__(expression, topic_expression_type(_value_1=value, Dialect=dialect)) - + super().__init__( + expression, topic_expression_type(_value_1=value, Dialect=dialect) + ) + @classmethod - def from_client(cls, client: Client, expression: str, dialect: str = DIALECT) -> "TopicExpression": - return cls(client.get_type(f"{cls.NAMESPACE}TopicExpressionType"), expression, dialect) - \ No newline at end of file + def from_client( + cls, client: Client, expression: str, dialect: str = DIALECT + ) -> "TopicExpression": + return cls( + client.get_type(f"{cls.NAMESPACE}TopicExpressionType"), expression, dialect + ) From c8e60156ebb1457e4765d1840ad0ce7683e0dafd Mon Sep 17 00:00:00 2001 From: matt bark Date: Fri, 22 Aug 2025 17:44:14 -0400 Subject: [PATCH 4/4] remove unused imports, fix whitespace --- onvif/managers.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/onvif/managers.py b/onvif/managers.py index c652750..501ed01 100644 --- a/onvif/managers.py +++ b/onvif/managers.py @@ -10,12 +10,10 @@ from typing import TYPE_CHECKING, Any -from lxml.etree import XPath, XPathSyntaxError -from zeep.client import Client +from lxml.etree import XPath from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError from zeep.loader import parse_xml from zeep.wsdl.bindings.soap import SoapOperation -from zeep.xsd import Element, AnyObject import aiohttp from onvif.exceptions import ONVIFError @@ -346,12 +344,10 @@ async def _start(self) -> float: subscription_params["Filter"] = { "_value_1": TopicExpression.from_client(events_service.zeep_client, self._topic_filter), } - result = await events_service.CreatePullPointSubscription( subscription_params ) # pylint: disable=protected-access - device.xaddrs[ "http://www.onvif.org/ver10/events/wsdl/PullPointSubscription" ] = normalize_url(result.SubscriptionReference.Address._value_1) @@ -359,7 +355,6 @@ async def _start(self) -> float: self._subscription = await device.create_subscription_service( "PullPointSubscription" ) - # Create the service that will be used to pull messages from the device. self._service = await device.create_pullpoint_service() if device.has_broken_relative_time(