diff --git a/README.md b/README.md index e50e21e..2bd0953 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ > Note that this is NOT [pyTibber](https://github.com/Danielhiversen/pyTibber) which is the official python wrapper endorsed by Tibber themselves. tibber.py is an unofficial python wrapper package for communication with the [Tibber API](https://developer.tibber.com/). -This package aims to cover all functionalities of the Tibber API in the most beginner-friendly modern Pythonic way. You can read all the capabilites of the API and explore it +This package aims to cover all functionalities of the Tibber API in the most beginner-friendly modern Pythonic way. You can read all the capabilites of the API and explore it with [Tibbers' API explorer](https://developer.tibber.com/explorer). For documentation on how to use tibber.py head over to https://tibberpy.readthedocs.io/en/latest/. Every field of the API types should be found in the corresponding `tibber.type` (e.g. the `size: Int` field of `Home` @@ -21,21 +21,24 @@ docs (located on the right side of the Tibber API explorer). [![Pytest Python 3.7 / 3.11](https://github.com/BeatsuDev/tibber.py/actions/workflows/pytests.yml/badge.svg)](https://github.com/BeatsuDev/tibber.py/actions/workflows/pytests.yml) ![Publish to PyPi status](https://github.com/BeatsuDev/tibber.py/actions/workflows/publish-to-pypi.yml/badge.svg) - - Do you want to ask a question, report an issue, or even showcase your project that uses tibber.py? 🤩
Find out where to post by [checking out this overview](https://github.com/BeatsuDev/tibber.py/discussions/46). - ## Installation + ### Install via pip + ``` python -m pip install tibber.py ``` + ### Requirements + tibber.py depends on `gql`, `gql[aiohttp]`, `gql[websockets]` and `graphql-core`. tibber.py supports Python versions 3.7 and up! ## Examples + ### Getting basic account data + ```python import tibber @@ -56,6 +59,7 @@ print(account.name) ``` ### Getting basic home data + ```python import tibber @@ -75,6 +79,7 @@ print(home.main_fuse_size) # 25 ``` ### Reading historical data + ```python import tibber @@ -99,8 +104,10 @@ for hour in hour_data: ``` ### Reading live measurements + Note how you can register multiple callbacks for the same event. These will be run in asynchronously (at the same time)! + ```python import tibber @@ -115,11 +122,52 @@ async def show_current_power(data): @home.event("live_measurement") async def show_accumulated_cost(data): print(f"{data.accumulated_cost} {data.currency}") - + def when_to_stop(data): return data.power < 1500 # Start the live feed. This runs until data.power is less than 1500. # If a user agent was not defined earlier, this will be required here -home.start_live_feed(user_agent = "UserAgent/0.0.1", exit_condition = when_to_stop) +home.start_live_feed(user_agent = "UserAgent/0.0.1", exit_condition = when_to_stop) +``` + +### Handling errors in a websocket connection + +```python +import tibber + +from tibber.exceptions import ConnectionErrorList +from tibber.exceptions import QueryErrorList +from tibber.exceptions import MalformedQueryException + +account = tibber.Account(tibber.DEMO_TOKEN) +home = account.homes[0] + +@home.event("live_measurement") +async def show_current_power(data): + print(data.power) + + +# Define error handlers: +def connection_error_handler(errors: ConnectionErrorList, additional_data: dict): + for error in errors: + print(error) + + # Returning true will make tibber.py continue retrying the connection + # to the websocket until max retries has been reached. + # Returning False will exit the real time data feed regardless of + # how many connection attempts has already been done. + return True + +def query_error_handler(errors: QueryErrorList, additional_data: dict): + if isinstance(error, MalformedQueryException): + print(data.query) # Prints the query that failed + return False # Exit the real time data feed when a MalformedQueryException happens + return True + +home.start_live_feed( + user_agent = "UserAgent/0.0.1", + on_connection_error = connection_error_handler # Runs when the websocket fails to connect + on_query_error = query_error_handler # Run when the subscription query returns an error +) ``` diff --git a/tests/realtime/test_live_measurements.py b/tests/realtime/test_live_measurements.py index a27378b..f48fbdc 100644 --- a/tests/realtime/test_live_measurements.py +++ b/tests/realtime/test_live_measurements.py @@ -22,12 +22,14 @@ def test_starting_live_feed_with_no_listeners_shows_warning(caplog): home.start_live_feed(f"tibber.py-tests/{__version__}", exit_condition = lambda data: True) assert "The event that was broadcasted has no listeners / callbacks! Nothing was run." in caplog.text + def test_retrieving_live_measurements(): account = tibber.Account(tibber.DEMO_TOKEN) home = account.homes[0] global callback_was_run callback_was_run = False + @home.event("live_measurement") async def callback(data): global callback_was_run @@ -37,31 +39,6 @@ async def callback(data): timestamp = timestamp.replace(tzinfo=None) now = datetime.now().replace(tzinfo=None) assert timestamp > now - timedelta(seconds=30) - assert data.power > 0 - assert data.last_meter_consumption > 0 - assert data.accumulated_consumption > 0 - assert isinstance(data.accumulated_production, (int, float)) - assert data.accumulated_consumption_last_hour > 0 - assert isinstance(data.accumulated_production_last_hour, (int, float)) - assert data.accumulated_cost > 0 - assert isinstance(data.accumulated_reward, (int, float)) - assert data.currency == "SEK" - assert data.min_power >= 0 - assert data.max_power > 0 - assert data.average_power > 0 - assert isinstance(data.power_production, (int, float)) - assert isinstance(data.power_reactive, (int, float)) - assert data.power_production_reactive > 0 - assert isinstance(data.min_power_production, (int, float)) - assert isinstance(data.max_power_production, (int, float)) - assert data.last_meter_production > 0 - assert data.power_factor > 0 - assert data.voltage_phase_1 > 0 - assert data.voltage_phase_2 > 0 - assert data.voltage_phase_3 > 0 - assert data.currentL1 > 0 - assert data.currentL2 > 0 - assert data.currentL3 > 0 # Return immediately after the first callback home.start_live_feed(f"tibber.py-tests/{__version__}", exit_condition = lambda data: True) diff --git a/tibber/types/home.py b/tibber/types/home.py index 468af30..29b399a 100644 --- a/tibber/types/home.py +++ b/tibber/types/home.py @@ -4,12 +4,11 @@ import asyncio import inspect import logging -from typing import TYPE_CHECKING, Callable, Union +import random +import time +from typing import TYPE_CHECKING, Callable -import backoff import gql -import websockets -from gql.transport.exceptions import TransportQueryError from gql.transport.websockets import WebsocketsTransport from graphql import parse @@ -208,8 +207,15 @@ class TibberHome(NonDecoratedTibberHome): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.running = False self._websocket_client = None self._callbacks = {"live_measurement": []} + self._connection_retry_attempts = ( + 0 # The amount of times the websocket connection has been retried. + ) + self._query_retry_attempts = ( + 0 # The amount of times the query has been retried. + ) def event(self, event_to_listen_for) -> Callable: """Returns a decorator that registers the function being @@ -231,7 +237,7 @@ def decorator(callback): ) # If the key is not found - the event is not a valid event! - # Valid events will be added directly to the line where _callbacks is initialized. + # In the future, valid events will be added directly to the line where _callbacks is initialized. try: self._callbacks[event_to_listen_for].append(callback) except KeyError: @@ -247,16 +253,23 @@ def start_live_feed( self, user_agent=None, exit_condition: Callable[[LiveMeasurement], bool] = None, - retries: int = 3, - retry_interval: Union[float, int] = 10, + connection_retries: int = 5, + query_retries: int = 5, + on_connection_error: Callable[[Exception], None] = None, + on_query_error: Callable[[Exception], None] = None, **kwargs, ) -> None: """Creates a websocket and starts pushing data out to registered callbacks. + :param user_agent: The user agent to use when connecting to the websocket. :param exit_condition: A function that takes a LiveMeasurement as input and returns a boolean. If the function returns True, the websocket will be closed. - :param retries: The number of times to retry connecting to the websocket if it fails. - :param retry_interval: The interval in seconds to wait before retrying to connect to the websocket. + :param connection_retries: The number of times to retry connecting to the websocket if it fails. + :param query_retries: The number of times to retry sending a query to the websocket if it fails. + :param on_connection_error: A function that is run when an error occurs while connecting to the websocket. + This will be called every time a connection attempt fails. + :param on_query_error: A function that is run when query to the websocket returns an error. + This will be called every time a query fails. :param kwargs: Additional arguments to pass to the websocket (gql.transport.WebsocketsTransport). """ if not self.features.real_time_consumption_enabled: @@ -269,42 +282,75 @@ def start_live_feed( self.tibber_client.user_agent = user_agent or self.tibber_client.user_agent - # The folllowing code is just to run the websocket loop in the correct loop. + self.running = True + # Keep trying to connect to the websocket until it succeeds or has tried `retries` times + while self._connection_retry_attempts < connection_retries and self.running: + to_sleep = min( + (2**self._connection_retry_attempts - 1) * random.random(), 100 + ) + if self._connection_retry_attempts > 0: + _logger.warning( + f"Retrying to CONNECT in {to_sleep:.1f} seconds. This is retry number {self._connection_retry_attempts}." + ) + time.sleep(to_sleep) + + try: + websocket_loop_coroutine = self.start_websocket_loop( + exit_condition, + query_retries=query_retries, + on_query_error=on_query_error, + **kwargs, + ) + self._run_async_in_correct_event_loop(websocket_loop_coroutine) + except KeyboardInterrupt: + self.running = False + _logger.info("Keyboard interrupt detected. Stopping live feed.") + break + except Exception as e: + self._connection_retry_attempts += 1 + _logger.warning( + "Exception occured when attempting to CONNECT to the websocket!: [" + + e.__class__.__name__ + + "] " + + str(e) + ) + # Raise the exception if no connection error handler is specified. + if on_connection_error: + on_connection_error(e) + + # Show error message if the connection failed too many times. + if self._connection_retry_attempts >= connection_retries: + _logger.error( + f"Could not connect to the websocket after {connection_retries - 1} retries." + ) + + def _run_async_in_correct_event_loop(self, coroutine): try: loop = asyncio.get_running_loop() except RuntimeError: loop = None - try: - if loop and loop.is_running(): - loop.run_until_complete( - self.start_websocket_loop(exit_condition, retries=retries, **kwargs) - ) - else: - asyncio.run( - self.start_websocket_loop(exit_condition, retries=retries, **kwargs) - ) - except KeyboardInterrupt: - _logger.info("Keyboard interrupt detected. Websocket should be closed now.") + if loop and loop.is_running(): + return loop.run_until_complete(coroutine) + else: + return asyncio.run(coroutine) async def start_websocket_loop( self, exit_condition: Callable[[LiveMeasurement], bool] = None, - retries: int = 3, - retry_interval: Union[float, int] = 10, + query_retries: int = 5, + on_query_error: Callable[[Exception], None] = None, **kwargs, ) -> None: - """Starts a websocket to subscribe for live measurements. + """Connects a websocket for live measurements. :param exit_condition: A function that takes a LiveMeasurement as input and returns a boolean. If the function returns True, the websocket will be closed. - :param retries: The number of times to retry connecting to the websocket if it fails. - :param retry_interval: The interval in seconds to wait before retrying to connect to the websocket. + :param query_retries: The number of times to retry connecting to the websocket if it fails. + :param on_query_error: A function that is run when query to the websocket returns an error. + This will be called every time a query fails. :param kwargs: Additional arguments to pass to the websocket (gql.transport.WebsocketsTransport). """ - if retry_interval < 1: - raise ValueError("The retry interval must be at least 1 second.") - # Create the websocket transport = WebsocketsTransport( **kwargs, @@ -314,8 +360,6 @@ async def start_websocket_loop( headers={ "User-Agent": f"{self.tibber_client.user_agent} tibber.py/{__version__}" }, - ping_interval=10, - pong_timeout=10, ) self._websocket_client = gql.Client( @@ -323,48 +367,50 @@ async def start_websocket_loop( fetch_schema_from_transport=True, ) - retry_connect = backoff.on_exception( - backoff.expo, - [ - gql.transport.exceptions.TransportClosed, - websockets.exceptions.ConnectionClosedError, - ], - max_value=100, - max_tries=retries, - on_backoff=lambda details: _logger.warning( - "Retrying to connect with backoff. Running {target} in {wait:.1f} seconds after {tries} tries.".format( - **details - ) - ), - jitter=backoff.full_jitter, - giveup=lambda e: isinstance(e, TransportQueryError) - or isinstance(e, ValueError), + # Connect to the websocket + _logger.debug("connecting to websocket") + session = await self._websocket_client.connect_async() + _logger.info("Connected to websocket.") + + self._connection_retry_attempts = ( + 0 # Connection was successful. Reset the counter. ) - retry_subscribe = backoff.on_exception( - backoff.expo, - Exception, - max_value=100, - max_tries=retries, - on_backoff=lambda details: _logger.warning( - "Retrying to subscribe with backoff. Running {target} in {wait:.1f} seconds after {tries} tries.".format( - **details + # Subscribe to the websocket + while self._query_retry_attempts < query_retries and self.running: + to_sleep = min((2**self._query_retry_attempts - 1) * random.random(), 100) + if self._query_retry_attempts > 0: + _logger.warning( + f"Retrying QUERY in {to_sleep:.1f} seconds. This is retry number {self._query_retry_attempts}." ) - ), - jitter=backoff.full_jitter, - giveup=lambda e: isinstance(e, TransportQueryError) - or isinstance(e, ValueError), - ) + await asyncio.sleep(to_sleep) + try: + await self._run_websocket_loop(session, exit_condition) + except KeyboardInterrupt: + self.running = False + _logger.info("Keyboard interrupt detected. Stopping live feed.") + break + except Exception as e: + self._query_retry_attempts += 1 + _logger.warning( + "Exception occured when attempting to send subscription QUERY!: [" + + e.__class__.__name__ + + "] " + + str(e) + ) + if on_query_error: + on_query_error(e) - _logger.debug("connecting to websocket") - session = await self._websocket_client.connect_async( - reconnecting=True, - retry_connect=retry_connect, - ) - _logger.info("Connected to websocket.") - await retry_subscribe(self.run_websocket_loop)(session, exit_condition) + # If the query fails, we want to close the websocket and reconnect before trying again. + # Next time around, if the query fails again, we will try the query again but with a longer delay. + # (the query retry counter will stay incremented and will not reset until a successful query is made) - async def run_websocket_loop(self, session, exit_condition) -> None: + # TODO: Note to future self: It might not be necessary to retry queries. Retrying the connection should be enough. + break + + await self._websocket_client.close_async() + + async def _run_websocket_loop(self, session, exit_condition) -> None: # Check if real time consumption is enabled _logger.info( "Updating home information to check if real time consumption is enabled." @@ -384,18 +430,18 @@ async def run_websocket_loop(self, session, exit_condition) -> None: _logger.info("Subscribing to websocket.") async for data in session.subscribe(document_node_query): _logger.debug("real time data received.") + self._query_retry_attempts = 0 # Query was successful. Reset the counter. # Returns True if exit condition is met - exit_condition_met = await self.process_websocket_response( + exit_condition_met = await self._process_websocket_response( data, exit_condition=exit_condition ) if exit_condition_met: _logger.info("Exit condition met. The live loop is now exiting.") + self.running = False break - await self.close_websocket_connection() - - async def process_websocket_response( + async def _process_websocket_response( self, data: dict, exit_condition: Callable[[LiveMeasurement], bool] = None ) -> bool: """Processes a response with data from the live data websocket. This function will call all registered callbacks @@ -407,14 +453,14 @@ async def process_websocket_response( # Broadcast the event # TODO: Differentiate between consumption data, production data and other data. cleaned_data = LiveMeasurement(data["liveMeasurement"], self.tibber_client) - await self.broadcast_event("live_measurement", cleaned_data) + await self._broadcast_event("live_measurement", cleaned_data) # Check if the exit condition is met if exit_condition and exit_condition(cleaned_data): return True return False - async def broadcast_event(self, event, data) -> None: + async def _broadcast_event(self, event, data) -> None: if event not in self._callbacks: _logger.warning( f'The event "{event}" was attempted emitted, but does not exist. Nothing was run.' @@ -428,24 +474,6 @@ async def broadcast_event(self, event, data) -> None: await asyncio.gather(*[c(data) for c in self._callbacks[event]]) - async def close_websocket_connection(self) -> None: - _logger.debug("attempting to close websocket connection") - if self.websocket_running: - try: - await self._websocket_client.close_async() - self._websocket_client = None # Dereference for gc - _logger.info("Websocket connection closed.") - except KeyboardInterrupt as e: - _logger.warning( - "Keyboard interrupt detected while closing wbsocket connection. This may cause the websocket to be left open." - ) - raise e - else: - _logger.info( - "The websocket was not running when attempting to close the websocket." - + " The invocation of close_websocket_connection() therefore did nothing..." - ) - @property def websocket_running(self) -> bool: """Returns True if the websocket is running. False otherwise."""