diff --git a/onvif/client.py b/onvif/client.py index 55df04c..641107e 100644 --- a/onvif/client.py +++ b/onvif/client.py @@ -560,9 +560,12 @@ async def create_pullpoint_manager( self, interval: dt.timedelta, subscription_lost_callback: Callable[[], None], + topic_filter: str | None = None, ) -> PullPointManager: """Create a pullpoint manager.""" - manager = PullPointManager(self, interval, subscription_lost_callback) + manager = PullPointManager( + self, interval, subscription_lost_callback, topic_filter + ) await manager.start() return manager diff --git a/onvif/managers.py b/onvif/managers.py index cc60add..2122b89 100644 --- a/onvif/managers.py +++ b/onvif/managers.py @@ -9,6 +9,8 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any + +from lxml.etree import XPath from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError from zeep.loader import parse_xml from zeep.wsdl.bindings.soap import SoapOperation @@ -18,9 +20,11 @@ from .settings import DEFAULT_SETTINGS from .transport import ASYNC_TRANSPORT +from .types import TopicExpression from .util import normalize_url, stringify_onvif_error from .wrappers import retry_connection_error + logger = logging.getLogger("onvif") @@ -34,6 +38,7 @@ # this value, we will use this value instead to prevent subscribing over and over # again. MINIMUM_SUBSCRIPTION_SECONDS = 60.0 +MINIMUM_SUBSCRIPTION_INTERVAL = dt.timedelta(seconds=MINIMUM_SUBSCRIPTION_SECONDS) if TYPE_CHECKING: from onvif.client import ONVIFCamera, ONVIFService @@ -50,7 +55,11 @@ def __init__( ) -> None: """Initialize the notification processor.""" self._device = device - self._interval = interval + self._interval = ( + interval + if interval <= MINIMUM_SUBSCRIPTION_INTERVAL + else MINIMUM_SUBSCRIPTION_INTERVAL + ) self._subscription: ONVIFService | None = None self._restart_or_renew_task: asyncio.Task | None = None self._loop = asyncio.get_event_loop() @@ -290,6 +299,35 @@ def process(self, content: bytes) -> Any | None: class PullPointManager(BaseManager): """Manager for PullPoint.""" + def __init__( + self, + device: ONVIFCamera, + interval: dt.timedelta, + subscription_lost_callback: Callable[[], None], + topic_filter: TopicExpression | None = None, + ) -> None: + """ + Create a Manager for PullPoint + :param device: ONVIFCamera + :param interval: Controls the termination time of the PullPoint session. Minimum value is 60s + :param subscription_lost_callback: Called when a subscroption is lost and cannot be re-established + :param topic_filter: An optional XPATH used to control the topics the subscription will listen to. + + :raises XPathSyntaxError: If an invalid, non-None topic_filter is provided + + Notes: + If your ONVIFCamera has a FixedTopicSet, you will likely not be able to use wildcards in your topic_filter + + Examples: + >>> from datetime import timedelta + >>> PullPointManager(cam, timedelta(seconds=60), lambda: print("Lost connection!"), "tns1:RuleEngine/CellMotionDetector/Motion") + + """ + self._topic_filter: str | None = ( + XPath(topic_filter).path if topic_filter else None + ) + super().__init__(device, interval, subscription_lost_callback) + async def _start(self) -> float: """ Start the PullPoint manager. @@ -299,13 +337,21 @@ async def _start(self) -> float: device = self._device logger.debug("%s: Setup the PullPoint manager", device.host) events_service = await device.create_events_service() - result = await events_service.CreatePullPointSubscription( - { - "InitialTerminationTime": device.get_next_termination_time( - self._interval + + subscription_params = { + "InitialTerminationTime": device.get_next_termination_time(self._interval), + } + # Alternatively, filter could be accepted as an argument + # and we can expect the caller to create the TopicExpression + # this would allow them to control the dialect too? + if self._topic_filter: + subscription_params["Filter"] = { + "_value_1": TopicExpression.from_client( + events_service.zeep_client, self._topic_filter ), } - ) + + result = await events_service.CreatePullPointSubscription(subscription_params) # pylint: disable=protected-access device.xaddrs[ "http://www.onvif.org/ver10/events/wsdl/PullPointSubscription" diff --git a/onvif/types.py b/onvif/types.py index c6a0d2f..313f6e9 100644 --- a/onvif/types.py +++ b/onvif/types.py @@ -2,7 +2,12 @@ from datetime import datetime, timedelta, time import ciso8601 +from zeep.client import Client +from zeep.xsd.types.any import AnyObject +from zeep.xsd.types.complex import ComplexType from zeep.xsd.types.builtins import DateTime, treat_whitespace, Time +from zeep.xsd.elements import Element + import isodate @@ -99,3 +104,24 @@ def pythonvalue(self, value: str) -> time: if fixed_dt := _try_parse_datetime(f"2024-01-15T{fixed_time}Z"): return (fixed_dt + timedelta(**offset)).time() return isodate.parse_time(value) + + +class TopicExpression(AnyObject): + NAMESPACE = "{http://docs.oasis-open.org/wsn/b-2}" + DIALECT = "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet" + + def __init__( + self, topic_expression_type: ComplexType, value: str, dialect: str = DIALECT + ): + expression = Element(f"{self.NAMESPACE}TopicExpression", topic_expression_type) + super().__init__( + expression, topic_expression_type(_value_1=value, Dialect=dialect) + ) + + @classmethod + def from_client( + cls, client: Client, expression: str, dialect: str = DIALECT + ) -> "TopicExpression": + return cls( + client.get_type(f"{cls.NAMESPACE}TopicExpressionType"), expression, dialect + )