From 7d175c0c6bd607801c6a6d78679eb3e0feedb0ec Mon Sep 17 00:00:00 2001 From: slawekrewaj Date: Wed, 18 Sep 2019 13:44:47 +0200 Subject: [PATCH] Adding retries on errors with exponenential backoff --- tap_exacttarget/__init__.py | 8 +++- tap_exacttarget/client.py | 87 +++++++++++++++++++++++++++++++------ 2 files changed, 80 insertions(+), 15 deletions(-) diff --git a/tap_exacttarget/__init__.py b/tap_exacttarget/__init__.py index 66c61a0..16ae22c 100644 --- a/tap_exacttarget/__init__.py +++ b/tap_exacttarget/__init__.py @@ -9,7 +9,9 @@ from tap_exacttarget.state import save_state -from tap_exacttarget.client import get_auth_stub +from tap_exacttarget.client import get_auth_stub, change_retry_count +from tap_exacttarget.client import change_min_retry_delay_seconds +from tap_exacttarget.client import change_max_retry_delay_seconds from tap_exacttarget.endpoints.campaigns \ import CampaignDataAccessObject @@ -88,6 +90,10 @@ def do_sync(args): auth_stub = get_auth_stub(config) + change_retry_count(config.get('retry_count', 5)) + change_min_retry_delay_seconds(config.get('min_retry_delay_seconds', 5)) + change_max_retry_delay_seconds(config.get('max_retry_delay_seconds', 600)) + stream_accessors = [] subscriber_selected = False diff --git a/tap_exacttarget/client.py b/tap_exacttarget/client.py index 08e40a3..2cbabc9 100644 --- a/tap_exacttarget/client.py +++ b/tap_exacttarget/client.py @@ -1,3 +1,6 @@ +import time +import traceback + import FuelSDK import singer @@ -17,10 +20,75 @@ def _get_response_items(response): return items -__all__ = ['get_auth_stub', 'request', 'request_from_cursor'] +class RetryDecorator(object): + retry_count = 1 + min_retry_delay_seconds = 5 + max_retry_delay_seconds = 600 + + def __init__(self, func): + self.func = func + + def __call__(self, *args, **kwargs): + retry_number = 0 + + while retry_number < self.retry_count: + try: + return self.func(*args, **kwargs) + except Exception as e: + LOGGER.error( + u"Error reading data from API on try {}".format( + retry_number + 1)) + LOGGER.error(traceback.format_exc()) + + retry_number += 1 + retry_delay = min( + self.min_retry_delay_seconds * retry_number * retry_number, + self.max_retry_delay_seconds) + time.sleep(retry_delay) + continue + + raise RuntimeError("Maximum number of retries reached") + + +@RetryDecorator +def _get_data_from_cursor(cursor): + response = cursor.get() + if not response.status: + raise RuntimeError("Request failed with '{}'" + .format(response.message)) + return response + + +@RetryDecorator +def _get_more_results(cursor, batch_size): + response = tap_exacttarget__getMoreResults( + cursor, batch_size=batch_size) + if not response.status: + raise RuntimeError("Request failed with '{}'" + .format(response.message)) + return response + + +__all__ = ['change_retry_count', + 'change_min_retry_delay_seconds', + 'change_max_retry_delay_seconds', + 'get_auth_stub', + 'request', + 'request_from_cursor'] # PUBLIC FUNCTIONS +def change_retry_count(new_retry_count): + RetryDecorator.retry_count = new_retry_count + + +def change_min_retry_delay_seconds(new_min_retry_delay_seconds): + RetryDecorator.min_retry_delay_seconds = new_min_retry_delay_seconds + + +def change_max_retry_delay_seconds(new_max_retry_delay_seconds): + RetryDecorator.max_retry_delay_seconds = new_max_retry_delay_seconds + def get_auth_stub(config): """ @@ -108,26 +176,17 @@ def request_from_cursor(name, cursor, batch_size): to be customized. See tap_exacttarget.endpoints.data_extensions for an example. """ - response = cursor.get() - - if not response.status: - raise RuntimeError("Request failed with '{}'" - .format(response.message)) - + response = _get_data_from_cursor(cursor) for item in _get_response_items(response): yield item while response.more_results: LOGGER.info("Getting more results from '{}' endpoint".format(name)) - # Override call to getMoreResults to add a batch_size parameter - # response = cursor.getMoreResults() - response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size) - LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name)) + response = _get_more_results(cursor, batch_size) - if not response.status: - raise RuntimeError("Request failed with '{}'" - .format(response.message)) + LOGGER.info("Fetched {} results from '{}' endpoint".format( + len(response.results), name)) for item in _get_response_items(response): yield item