Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion tap_exacttarget/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from tap_exacttarget.state import save_state

from tap_exacttarget.client import get_auth_stub
from tap_exacttarget.client import get_auth_stub, change_retry_count
from tap_exacttarget.client import change_min_retry_delay_seconds
from tap_exacttarget.client import change_max_retry_delay_seconds

from tap_exacttarget.endpoints.campaigns \
import CampaignDataAccessObject
Expand Down Expand Up @@ -88,6 +90,10 @@ def do_sync(args):

auth_stub = get_auth_stub(config)

change_retry_count(config.get('retry_count', 5))
change_min_retry_delay_seconds(config.get('min_retry_delay_seconds', 5))
change_max_retry_delay_seconds(config.get('max_retry_delay_seconds', 600))

stream_accessors = []

subscriber_selected = False
Expand Down
87 changes: 73 additions & 14 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import traceback

import FuelSDK
import singer

Expand All @@ -17,10 +20,75 @@ def _get_response_items(response):
return items


__all__ = ['get_auth_stub', 'request', 'request_from_cursor']
class RetryDecorator(object):
retry_count = 1
min_retry_delay_seconds = 5
max_retry_delay_seconds = 600

def __init__(self, func):
self.func = func

def __call__(self, *args, **kwargs):
retry_number = 0

while retry_number < self.retry_count:
try:
return self.func(*args, **kwargs)
except Exception as e:
LOGGER.error(
u"Error reading data from API on try {}".format(
retry_number + 1))
LOGGER.error(traceback.format_exc())

retry_number += 1
retry_delay = min(
self.min_retry_delay_seconds * retry_number * retry_number,
self.max_retry_delay_seconds)
time.sleep(retry_delay)
continue

raise RuntimeError("Maximum number of retries reached")


@RetryDecorator
def _get_data_from_cursor(cursor):
response = cursor.get()
if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))
return response


@RetryDecorator
def _get_more_results(cursor, batch_size):
response = tap_exacttarget__getMoreResults(
cursor, batch_size=batch_size)
if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))
return response


__all__ = ['change_retry_count',
'change_min_retry_delay_seconds',
'change_max_retry_delay_seconds',
'get_auth_stub',
'request',
'request_from_cursor']


# PUBLIC FUNCTIONS
def change_retry_count(new_retry_count):
RetryDecorator.retry_count = new_retry_count


def change_min_retry_delay_seconds(new_min_retry_delay_seconds):
RetryDecorator.min_retry_delay_seconds = new_min_retry_delay_seconds


def change_max_retry_delay_seconds(new_max_retry_delay_seconds):
RetryDecorator.max_retry_delay_seconds = new_max_retry_delay_seconds


def get_auth_stub(config):
"""
Expand Down Expand Up @@ -108,26 +176,17 @@ def request_from_cursor(name, cursor, batch_size):
to be customized. See tap_exacttarget.endpoints.data_extensions for
an example.
"""
response = cursor.get()

if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))

response = _get_data_from_cursor(cursor)
for item in _get_response_items(response):
yield item

while response.more_results:
LOGGER.info("Getting more results from '{}' endpoint".format(name))

# Override call to getMoreResults to add a batch_size parameter
# response = cursor.getMoreResults()
response = tap_exacttarget__getMoreResults(cursor, batch_size=batch_size)
LOGGER.info("Fetched {} results from '{}' endpoint".format(len(response.results), name))
response = _get_more_results(cursor, batch_size)

if not response.status:
raise RuntimeError("Request failed with '{}'"
.format(response.message))
LOGGER.info("Fetched {} results from '{}' endpoint".format(
len(response.results), name))

for item in _get_response_items(response):
yield item
Expand Down