Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion onvif/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,12 @@ async def create_pullpoint_manager(
self,
interval: dt.timedelta,
subscription_lost_callback: Callable[[], None],
topic_filter: str | None = None,
) -> PullPointManager:
"""Create a pullpoint manager."""
manager = PullPointManager(self, interval, subscription_lost_callback)
manager = PullPointManager(
self, interval, subscription_lost_callback, topic_filter
)
await manager.start()
return manager

Expand Down
58 changes: 52 additions & 6 deletions onvif/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Any


from lxml.etree import XPath
from zeep.exceptions import Fault, XMLParseError, XMLSyntaxError
from zeep.loader import parse_xml
from zeep.wsdl.bindings.soap import SoapOperation
Expand All @@ -18,9 +20,11 @@

from .settings import DEFAULT_SETTINGS
from .transport import ASYNC_TRANSPORT
from .types import TopicExpression
from .util import normalize_url, stringify_onvif_error
from .wrappers import retry_connection_error


logger = logging.getLogger("onvif")


Expand All @@ -34,6 +38,7 @@
# this value, we will use this value instead to prevent subscribing over and over
# again.
MINIMUM_SUBSCRIPTION_SECONDS = 60.0
MINIMUM_SUBSCRIPTION_INTERVAL = dt.timedelta(seconds=MINIMUM_SUBSCRIPTION_SECONDS)

if TYPE_CHECKING:
from onvif.client import ONVIFCamera, ONVIFService
Expand All @@ -50,7 +55,11 @@ def __init__(
) -> None:
"""Initialize the notification processor."""
self._device = device
self._interval = interval
self._interval = (
interval
if interval <= MINIMUM_SUBSCRIPTION_INTERVAL
else MINIMUM_SUBSCRIPTION_INTERVAL
)
self._subscription: ONVIFService | None = None
self._restart_or_renew_task: asyncio.Task | None = None
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -290,6 +299,35 @@ def process(self, content: bytes) -> Any | None:
class PullPointManager(BaseManager):
"""Manager for PullPoint."""

def __init__(
self,
device: ONVIFCamera,
interval: dt.timedelta,
subscription_lost_callback: Callable[[], None],
topic_filter: TopicExpression | None = None,
) -> None:
"""
Create a Manager for PullPoint
:param device: ONVIFCamera
:param interval: Controls the termination time of the PullPoint session. Minimum value is 60s
:param subscription_lost_callback: Called when a subscroption is lost and cannot be re-established
:param topic_filter: An optional XPATH used to control the topics the subscription will listen to.

:raises XPathSyntaxError: If an invalid, non-None topic_filter is provided

Notes:
If your ONVIFCamera has a FixedTopicSet, you will likely not be able to use wildcards in your topic_filter

Examples:
>>> from datetime import timedelta
>>> PullPointManager(cam, timedelta(seconds=60), lambda: print("Lost connection!"), "tns1:RuleEngine/CellMotionDetector/Motion")

"""
self._topic_filter: str | None = (
XPath(topic_filter).path if topic_filter else None
)
super().__init__(device, interval, subscription_lost_callback)

async def _start(self) -> float:
"""
Start the PullPoint manager.
Expand All @@ -299,13 +337,21 @@ async def _start(self) -> float:
device = self._device
logger.debug("%s: Setup the PullPoint manager", device.host)
events_service = await device.create_events_service()
result = await events_service.CreatePullPointSubscription(
{
"InitialTerminationTime": device.get_next_termination_time(
self._interval

subscription_params = {
"InitialTerminationTime": device.get_next_termination_time(self._interval),
}
# Alternatively, filter could be accepted as an argument
# and we can expect the caller to create the TopicExpression
# this would allow them to control the dialect too?
if self._topic_filter:
subscription_params["Filter"] = {
"_value_1": TopicExpression.from_client(
events_service.zeep_client, self._topic_filter
),
}
)

result = await events_service.CreatePullPointSubscription(subscription_params)
# pylint: disable=protected-access
device.xaddrs[
"http://www.onvif.org/ver10/events/wsdl/PullPointSubscription"
Expand Down
26 changes: 26 additions & 0 deletions onvif/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from datetime import datetime, timedelta, time
import ciso8601
from zeep.client import Client
from zeep.xsd.types.any import AnyObject
from zeep.xsd.types.complex import ComplexType
from zeep.xsd.types.builtins import DateTime, treat_whitespace, Time
from zeep.xsd.elements import Element

import isodate


Expand Down Expand Up @@ -99,3 +104,24 @@ def pythonvalue(self, value: str) -> time:
if fixed_dt := _try_parse_datetime(f"2024-01-15T{fixed_time}Z"):
return (fixed_dt + timedelta(**offset)).time()
return isodate.parse_time(value)


class TopicExpression(AnyObject):
NAMESPACE = "{http://docs.oasis-open.org/wsn/b-2}"
DIALECT = "http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet"

def __init__(
self, topic_expression_type: ComplexType, value: str, dialect: str = DIALECT
):
expression = Element(f"{self.NAMESPACE}TopicExpression", topic_expression_type)
super().__init__(
expression, topic_expression_type(_value_1=value, Dialect=dialect)
)

@classmethod
def from_client(
cls, client: Client, expression: str, dialect: str = DIALECT
) -> "TopicExpression":
return cls(
client.get_type(f"{cls.NAMESPACE}TopicExpressionType"), expression, dialect
)