From a4ef4ea927e376debca1e36d2bc0b5e3bb9343b1 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Tue, 10 Dec 2013 15:18:35 -0500 Subject: [PATCH 01/16] implemented appropriately delayed retries and stall timeout as recommended by the Twitter API docs --- tweetstream.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 5ca2e79..b6c61f6 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -38,6 +38,8 @@ def callback(message): import time import oauth2 import urlparse +import logging +from datetime import datetime, timedelta class MissingConfiguration(Exception): """Raised when a configuration value is not found.""" @@ -69,6 +71,15 @@ def __init__(self, configuration, ioloop=None, clean=False): "twitter_stream_scheme", "https") self._twitter_stream_port = self._get_configuration_key( "twitter_stream_port", 443) + self._twitter_stream = None + self._stream_restart_scheduled = False + self._stream_restart_in_process = False + self._stream_restart_time = None + self._retry_delay = 5 + self._retry_rate_limited_delay = 60 + self._retry_before_established_delay = .25 + self._timeout_handle = None + self._stall_timeout_handle = None def _get_configuration_key(self, key, default=None): """ @@ -102,7 +113,7 @@ def fetch(self, path, method="GET", callback=None): urlparse.parse_qs(parts.query).iteritems() if value ]) - self.open_twitter_stream() + self.schedule_restart() def on_error(self, error): """ Just a wrapper for the error callback """ @@ -112,6 +123,20 @@ def on_error(self, error): raise error def open_twitter_stream(self): + logging.info("open_twitter_stream") + + self._stream_restart_scheduled = False + + if self._stream_restart_time is not None and datetime.now() - self._stream_restart_time < timedelta(seconds=5): + self.schedule_restart(5) + return + + self._stream_restart_time = datetime.now() + self._stream_restart_in_process = True + + if self._twitter_stream: + self.close() + """ Creates the client and watches stream """ address_info = socket.getaddrinfo(self._twitter_stream_host, self._twitter_stream_port, socket.AF_INET, socket.SOCK_STREAM, @@ -123,6 +148,7 @@ def open_twitter_stream(self): if self._twitter_stream_scheme == "https": stream_class = SSLIOStream self._twitter_stream = stream_class(sock, io_loop=self._ioloop) + self._twitter_stream.set_close_callback(self.close_before_established_callback) self._twitter_stream.connect(socket_address, self.on_connect) def on_connect(self): @@ -147,6 +173,7 @@ def on_connect(self): headers["Host"] = self._twitter_stream_host headers["User-Agent"] = "TweetStream" headers["Accept"] = "*/*" + # headers["Accept-Encoding"] = "deflate, gzip" request = ["GET %s HTTP/1.1" % self._full_path] for key, value in headers.iteritems(): request.append("%s: %s" % (key, value)) @@ -156,10 +183,25 @@ def on_connect(self): def on_headers(self, response): """ Starts monitoring for results. """ + self._twitter_stream.set_close_callback(lambda: None) status_line = response.splitlines()[0] response_code = status_line.replace("HTTP/1.1", "") response_code = int(response_code.split()[0].strip()) if response_code != 200: + if response_code == 420: + logging.error('stream connect being rate limited. next try in %s seconds' % self._retry_rate_limited_delay) + self._stream_restart_in_process = False + self.schedule_restart(self._retry_rate_limited_delay) + self._retry_rate_limited_delay *= 2 + return + elif response_code not in [401, 403, 404, 406, 413]: + logging.error('stream connect failed with response_code: %s. next try in %s seconds' % (response_code, self._retry_delay)) + self._stream_restart_in_process = False + self.schedule_restart(self._retry_delay) + if self._retry_delay < 320: + self._retry_delay *= 2 + return + exception_string = "Could not connect: %s\n%s" % ( status_line, response) headers = dict([ @@ -175,10 +217,23 @@ def get_error_body(content): return self._twitter_stream.read_bytes( content_length, get_error_body) + logging.info("stream connection established") + self._stream_restart_in_process = False + self._retry_delay = 5 + self._retry_rate_limited_delay = 60 + self._retry_before_established_delay = .25 + + self._twitter_stream.set_close_callback(self.close_callback) + self.set_stall_timeout() + self.wait_for_message() def wait_for_message(self): """ Throw a read event on the stack. """ + if self._twitter_stream.closed(): + logging.error("stream closed by remote host") + return + self._twitter_stream.read_until("\r\n", self.on_result) def on_result(self, response): @@ -189,6 +244,7 @@ def on_result(self, response): self._twitter_stream.read_bytes(length, self.parse_json) def parse_json(self, response): + self.set_stall_timeout() """ Checks JSON message """ if not response.strip(): # Empty line, happens sometimes for keep alive @@ -225,3 +281,57 @@ def parse_response(self, response): if self._callback: self._callback(response) self.wait_for_message() + + def schedule_restart(self, seconds=0): + if not self._stream_restart_scheduled and not self._stream_restart_in_process: + self._stream_restart_scheduled = True + if seconds == 0: + self.open_twitter_stream() + else: + self.add_timeout(seconds, self.open_twitter_stream) + + def set_stall_timeout(self): + self.remove_stall_timeout() + self._stall_timeout_handle = IOLoop.current().add_timeout(timedelta(seconds=90), self.stall_callback) + + def remove_stall_timeout(self): + if self._stall_timeout_handle: + IOLoop.current().remove_timeout(self._stall_timeout_handle) + self._stall_timeout_handle = None + + def add_timeout(self, seconds, callback): + self.remove_timeout() + self._timeout_handle = IOLoop.current().add_timeout(timedelta(seconds=seconds), callback) + + def remove_timeout(self): + if self._timeout_handle: + IOLoop.current().remove_timeout(self._timeout_handle) + self._timeout_handle = None + + def close_helper(self): + self.remove_timeout() + self.remove_stall_timeout() + self._twitter_stream.close() + + def close(self): + self._twitter_stream.set_close_callback(lambda: None) + self.close_helper() + + def close_before_established_callback(self): + logging.error('stream closed before establishing a connection') + self.close() + self._stream_restart_in_process = False + self.schedule_restart(self._retry_before_established_delay) + if self._retry_before_established_delay < 16: + self._retry_before_established_delay += .25 + + def close_callback(self): + logging.error('close callback') + self.close_helper() + self.schedule_restart() + + def stall_callback(self): + logging.error('stream stalled. restarting') + self.close() + self.schedule_restart() + From 48161be10fd37a582297c54b420040dcd7f06cb0 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Tue, 10 Dec 2013 15:34:26 -0500 Subject: [PATCH 02/16] using self._ioloop instead of IOLoop.current() --- tweetstream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index b6c61f6..a69776f 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -292,20 +292,20 @@ def schedule_restart(self, seconds=0): def set_stall_timeout(self): self.remove_stall_timeout() - self._stall_timeout_handle = IOLoop.current().add_timeout(timedelta(seconds=90), self.stall_callback) + self._stall_timeout_handle = self._ioloop.add_timeout(timedelta(seconds=90), self.stall_callback) def remove_stall_timeout(self): if self._stall_timeout_handle: - IOLoop.current().remove_timeout(self._stall_timeout_handle) + self._ioloop.remove_timeout(self._stall_timeout_handle) self._stall_timeout_handle = None def add_timeout(self, seconds, callback): self.remove_timeout() - self._timeout_handle = IOLoop.current().add_timeout(timedelta(seconds=seconds), callback) + self._timeout_handle = self._ioloop.add_timeout(timedelta(seconds=seconds), callback) def remove_timeout(self): if self._timeout_handle: - IOLoop.current().remove_timeout(self._timeout_handle) + self._ioloop.remove_timeout(self._timeout_handle) self._timeout_handle = None def close_helper(self): From 2774052dd58a8a55ba5fcbfef7b7db90d3af639a Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Tue, 10 Dec 2013 15:35:07 -0500 Subject: [PATCH 03/16] updated readme --- README.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 25be579..08f0e63 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,20 @@ Usage is pretty simple: ```python import tweetstream -tweetstream.TWITTER_APP_USER = "username" -tweetstream.TWITTER_APP_PASSWORD = "password" def callback(message): # this will be called every message print message -stream = tweetstream.TweetStream() -stream.fetch("/1/statuses/filter.json?track=foobar", callback=callback) +configuration = { + "twitter_consumer_key": "key", + "twitter_consumer_secret": "secret", + "twitter_access_token": "token", + "twitter_access_token_secret": "secret", +} + +stream = tweetstream.TweetStream(configuration) +stream.fetch("/1.1/statuses/filter.json?track=foobar", callback=callback) # if you aren't on a running ioloop... from tornado.ioloop import IOLoop @@ -24,5 +29,3 @@ The constructor takes two optional arguments, `ioloop` and `clean`. The `ioloop` argument just lets you specify a specific loop to run on, and `clean` is just a boolean (False by default) that will strip out basic data from the twitter message payload. - -TODO: Implement OAuth header instead of Basic Auth. From 2d68945998b9736f120574b1a11f2adda43b5ca0 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Sun, 23 Feb 2014 23:57:50 -0500 Subject: [PATCH 04/16] work around for this issue in tornado which causes callbacks to execute even after closing the stream: https://github.com/facebook/tornado/issues/987 --- tweetstream.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index a69776f..1c24377 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -80,6 +80,7 @@ def __init__(self, configuration, ioloop=None, clean=False): self._retry_before_established_delay = .25 self._timeout_handle = None self._stall_timeout_handle = None + self._current_iostream = None def _get_configuration_key(self, key, default=None): """ @@ -149,9 +150,12 @@ def open_twitter_stream(self): stream_class = SSLIOStream self._twitter_stream = stream_class(sock, io_loop=self._ioloop) self._twitter_stream.set_close_callback(self.close_before_established_callback) - self._twitter_stream.connect(socket_address, self.on_connect) + self._current_iostream = str(self._twitter_stream) + self._twitter_stream.connect(socket_address, lambda: self.on_connect(self._current_iostream)) - def on_connect(self): + def on_connect(self, id): + if id != self._current_iostream: + return parameters = { "oauth_token": self._token.key, "oauth_consumer_key": self._consumer.key, @@ -179,9 +183,11 @@ def on_connect(self): request.append("%s: %s" % (key, value)) request = "\r\n".join(request) + "\r\n\r\n" self._twitter_stream.write(str(request)) - self._twitter_stream.read_until("\r\n\r\n", self.on_headers) + self._twitter_stream.read_until("\r\n\r\n", lambda response: self.on_headers(response, id)) - def on_headers(self, response): + def on_headers(self, response, id): + if id != self._current_iostream: + return """ Starts monitoring for results. """ self._twitter_stream.set_close_callback(lambda: None) status_line = response.splitlines()[0] @@ -226,39 +232,45 @@ def get_error_body(content): self._twitter_stream.set_close_callback(self.close_callback) self.set_stall_timeout() - self.wait_for_message() + self.wait_for_message(id) - def wait_for_message(self): + def wait_for_message(self, id): """ Throw a read event on the stack. """ if self._twitter_stream.closed(): logging.error("stream closed by remote host") return - self._twitter_stream.read_until("\r\n", self.on_result) + self._twitter_stream.read_until("\r\n", lambda response: self.on_result(response, id)) - def on_result(self, response): + def on_result(self, response, id): + if id != self._current_iostream: + return """ Gets length of next message and reads it """ if (response.strip() == ""): - return self.wait_for_message() + return self.wait_for_message(id) length = int(response.strip(), 16) - self._twitter_stream.read_bytes(length, self.parse_json) + self._twitter_stream.read_bytes(length, lambda response: self.parse_json(response, id)) - def parse_json(self, response): + def parse_json(self, response, id): + if id != self._current_iostream: + return self.set_stall_timeout() """ Checks JSON message """ if not response.strip(): # Empty line, happens sometimes for keep alive - return self.wait_for_message() + return self.wait_for_message(id) try: response = json.loads(response) except ValueError: print "Invalid response:" print response - return self.wait_for_message() + return self.wait_for_message(id) - self.parse_response(response) + self.parse_response(response, id) - def parse_response(self, response): + def parse_response(self, response, id): + if id != self._current_iostream: + return """ Parse the twitter message """ if self._clean_message: try: @@ -268,7 +280,7 @@ def parse_response(self, response): avatar = response["user"]["profile_image_url_https"] except KeyError, exc: print "Invalid tweet structure, missing %s" % exc - return self.wait_for_message() + return self.wait_for_message(id) response = { "type": "tweet", @@ -280,7 +292,7 @@ def parse_response(self, response): } if self._callback: self._callback(response) - self.wait_for_message() + self.wait_for_message(id) def schedule_restart(self, seconds=0): if not self._stream_restart_scheduled and not self._stream_restart_in_process: From b4dafea5ec0e9c7abe237f740eb40e9c49ca5e6b Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Sat, 1 Mar 2014 02:58:33 -0500 Subject: [PATCH 05/16] possible fix for partial tweets in chunked encoding --- tweetstream.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index 1c24377..0d55f07 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -81,6 +81,7 @@ def __init__(self, configuration, ioloop=None, clean=False): self._timeout_handle = None self._stall_timeout_handle = None self._current_iostream = None + self._partial_tweet = '' def _get_configuration_key(self, key, default=None): """ @@ -144,6 +145,7 @@ def open_twitter_stream(self): 0, 0) af, socktype, proto = address_info[0][:3] socket_address = address_info[0][-1] + logging.info('socket address:' + str(socket_address)) sock = socket.socket(af, socktype, proto) stream_class = IOStream if self._twitter_stream_scheme == "https": @@ -248,12 +250,14 @@ def on_result(self, response, id): """ Gets length of next message and reads it """ if (response.strip() == ""): return self.wait_for_message(id) + # logging.info(response) length = int(response.strip(), 16) self._twitter_stream.read_bytes(length, lambda response: self.parse_json(response, id)) def parse_json(self, response, id): if id != self._current_iostream: return + # if ord(response[-2]) != 13 or ord(response[-1]) != 10: self.set_stall_timeout() """ Checks JSON message """ if not response.strip(): @@ -261,10 +265,20 @@ def parse_json(self, response, id): return self.wait_for_message(id) try: response = json.loads(response) + self._partial_tweet = '' except ValueError: - print "Invalid response:" - print response - return self.wait_for_message(id) + # logging.info(response) + logging.info('_partial_tweet?') + # logging.info("Invalid response:") + # logging.info(response) + self._partial_tweet += response.strip() + try: + response = json.loads(self._partial_tweet) + logging.info('_partial_tweet success!') + self._partial_tweet = '' + except ValueError: + logging.info('_partial_tweet second fail') + return self.wait_for_message(id) self.parse_response(response, id) From 15bc107347ae6cf8153fa7d47d527a88034676d8 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Fri, 28 Mar 2014 14:13:16 -0400 Subject: [PATCH 06/16] removed partial tweet logging --- tweetstream.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index 0d55f07..4a4c5ee 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -267,17 +267,11 @@ def parse_json(self, response, id): response = json.loads(response) self._partial_tweet = '' except ValueError: - # logging.info(response) - logging.info('_partial_tweet?') - # logging.info("Invalid response:") - # logging.info(response) self._partial_tweet += response.strip() try: response = json.loads(self._partial_tweet) - logging.info('_partial_tweet success!') self._partial_tweet = '' except ValueError: - logging.info('_partial_tweet second fail') return self.wait_for_message(id) self.parse_response(response, id) From 432daecf10097f81fd3ec9111765baa91bcd5b37 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Fri, 28 Mar 2014 14:17:34 -0400 Subject: [PATCH 07/16] don't do anything on close if stream isn't opened yet --- tweetstream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index 4a4c5ee..a9520db 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -334,8 +334,9 @@ def close_helper(self): self._twitter_stream.close() def close(self): - self._twitter_stream.set_close_callback(lambda: None) - self.close_helper() + if self._twitter_stream: + self._twitter_stream.set_close_callback(lambda: None) + self.close_helper() def close_before_established_callback(self): logging.error('stream closed before establishing a connection') From 742e228764e0f3c9d12a13263656ead71c883a3f Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Mon, 31 Mar 2014 14:53:00 -0400 Subject: [PATCH 08/16] fixed a bug which caused an "Already reading" exception --- tweetstream.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index a9520db..15f7fb8 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -153,7 +153,8 @@ def open_twitter_stream(self): self._twitter_stream = stream_class(sock, io_loop=self._ioloop) self._twitter_stream.set_close_callback(self.close_before_established_callback) self._current_iostream = str(self._twitter_stream) - self._twitter_stream.connect(socket_address, lambda: self.on_connect(self._current_iostream)) + id = self._current_iostream + self._twitter_stream.connect(socket_address, lambda: self.on_connect(id)) def on_connect(self, id): if id != self._current_iostream: @@ -277,8 +278,6 @@ def parse_json(self, response, id): self.parse_response(response, id) def parse_response(self, response, id): - if id != self._current_iostream: - return """ Parse the twitter message """ if self._clean_message: try: From db858ef7e85cf0c72a9fc752d5ac4a968ce827bf Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Mon, 7 Apr 2014 17:33:42 -0400 Subject: [PATCH 09/16] hopefully fixing an issue where spaces were getting removed in dates --- tweetstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 15f7fb8..2ac170e 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -268,7 +268,7 @@ def parse_json(self, response, id): response = json.loads(response) self._partial_tweet = '' except ValueError: - self._partial_tweet += response.strip() + self._partial_tweet += response try: response = json.loads(self._partial_tweet) self._partial_tweet = '' From dfe0c9608914b363db85a73a03e96c5a70ef1b3b Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Thu, 1 May 2014 19:12:04 -0400 Subject: [PATCH 10/16] implemented a callback for when the stream gets rate limited --- tweetstream.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 2ac170e..9f3bf17 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -54,6 +54,7 @@ def __init__(self, configuration, ioloop=None, clean=False): self._ioloop = ioloop or IOLoop.instance() self._callback = None self._error_callback = None + self._rate_limited_callback = None self._clean_message = clean self._configuration = configuration consumer_key = self._get_configuration_key("twitter_consumer_key") @@ -98,11 +99,12 @@ def set_error_callback(self, error_callback): """Pretty self explanatory.""" self._error_callback = error_callback - def fetch(self, path, method="GET", callback=None): + def fetch(self, path, method="GET", callback=None, rate_limited_callback=None): """ Opens the request """ parts = urlparse.urlparse(path) self._method = method self._callback = callback + self._rate_limited_callback = rate_limited_callback self._path = parts.path self._full_path = self._path self._parameters = {} @@ -202,6 +204,8 @@ def on_headers(self, response, id): self._stream_restart_in_process = False self.schedule_restart(self._retry_rate_limited_delay) self._retry_rate_limited_delay *= 2 + if self._rate_limited_callback: + self._rate_limited_callback() return elif response_code not in [401, 403, 404, 406, 413]: logging.error('stream connect failed with response_code: %s. next try in %s seconds' % (response_code, self._retry_delay)) From ba0537276711727b0714292af2b1147a0d62ec38 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Mon, 5 May 2014 02:36:05 -0400 Subject: [PATCH 11/16] allowing access token to be changed after instantiation of TweetStream object --- tweetstream.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 9f3bf17..4423ac7 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -65,7 +65,7 @@ def __init__(self, configuration, ioloop=None, clean=False): access_token = self._get_configuration_key("twitter_access_token") access_secret = self._get_configuration_key( "twitter_access_token_secret") - self._token = oauth2.Token(key=access_token, secret=access_secret) + self.set_token(access_token, access_secret) self._twitter_stream_host = self._get_configuration_key( "twitter_stream_host", "stream.twitter.com") self._twitter_stream_scheme = self._get_configuration_key( @@ -84,6 +84,9 @@ def __init__(self, configuration, ioloop=None, clean=False): self._current_iostream = None self._partial_tweet = '' + def set_token(self, access_token, access_secret): + self._token = oauth2.Token(key=access_token, secret=access_secret) + def _get_configuration_key(self, key, default=None): """ Retrieve a configuration option, raising an exception if no From 3ed8d522298ab00ceb34a819484595cea1ea56ff Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Sun, 17 Aug 2014 02:17:13 -0400 Subject: [PATCH 12/16] switched to using oauthlib in preparation for supporting python 3 --- requirements.txt | 2 +- setup.py | 2 +- tweetstream.py | 50 ++++++++++++++++-------------------------------- 3 files changed, 19 insertions(+), 35 deletions(-) diff --git a/requirements.txt b/requirements.txt index 9c4806d..c36be0a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ argparse==1.2.1 httplib2==0.7.4 nose==1.1.2 -oauth2==1.5.211 +oauthlib==0.6.3 tornado==2.3 wsgiref==0.1.2 diff --git a/setup.py b/setup.py index 9a95f4b..6a99a02 100644 --- a/setup.py +++ b/setup.py @@ -24,5 +24,5 @@ url = "http://github.com/joshmarshall/tweetstream", license = "http://www.apache.org/licenses/LICENSE-2.0", py_modules=['tweetstream',], - install_requires=['tornado', "oauth2"] + install_requires=['tornado', "oauthlib"] ) diff --git a/tweetstream.py b/tweetstream.py index 4423ac7..6965b9a 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -36,7 +36,8 @@ def callback(message): import json import socket import time -import oauth2 +import oauthlib.oauth1 +import urllib import urlparse import logging from datetime import datetime, timedelta @@ -57,21 +58,14 @@ def __init__(self, configuration, ioloop=None, clean=False): self._rate_limited_callback = None self._clean_message = clean self._configuration = configuration - consumer_key = self._get_configuration_key("twitter_consumer_key") - consumer_secret = self._get_configuration_key( - "twitter_consumer_secret") - self._consumer = oauth2.Consumer( - key=consumer_key, secret=consumer_secret) + self._consumer_key = self._get_configuration_key("twitter_consumer_key") + self._consumer_secret = self._get_configuration_key("twitter_consumer_secret") access_token = self._get_configuration_key("twitter_access_token") - access_secret = self._get_configuration_key( - "twitter_access_token_secret") + access_secret = self._get_configuration_key("twitter_access_token_secret") self.set_token(access_token, access_secret) - self._twitter_stream_host = self._get_configuration_key( - "twitter_stream_host", "stream.twitter.com") - self._twitter_stream_scheme = self._get_configuration_key( - "twitter_stream_scheme", "https") - self._twitter_stream_port = self._get_configuration_key( - "twitter_stream_port", 443) + self._twitter_stream_host = self._get_configuration_key("twitter_stream_host", "stream.twitter.com") + self._twitter_stream_scheme = self._get_configuration_key("twitter_stream_scheme", "https") + self._twitter_stream_port = self._get_configuration_key("twitter_stream_port", 443) self._twitter_stream = None self._stream_restart_scheduled = False self._stream_restart_in_process = False @@ -85,7 +79,9 @@ def __init__(self, configuration, ioloop=None, clean=False): self._partial_tweet = '' def set_token(self, access_token, access_secret): - self._token = oauth2.Token(key=access_token, secret=access_secret) + self._oauth_client = oauthlib.oauth1.Client( + self._consumer_key, client_secret=self._consumer_secret, + resource_owner_key=access_token, resource_owner_secret=access_secret) def _get_configuration_key(self, key, default=None): """ @@ -164,24 +160,12 @@ def open_twitter_stream(self): def on_connect(self, id): if id != self._current_iostream: return - parameters = { - "oauth_token": self._token.key, - "oauth_consumer_key": self._consumer.key, - "oauth_version": "1.0", - "oauth_nonce": oauth2.generate_nonce(), - "oauth_timestamp": int(time.time()) - } - parameters.update(self._parameters) - request = oauth2.Request( - method="GET", - url="%s://%s%s" % ( - self._twitter_stream_scheme, - self._twitter_stream_host, - self._path), - parameters=parameters) - signature_method = oauth2.SignatureMethod_HMAC_SHA1() - request.sign_request(signature_method, self._consumer, self._token) - headers = request.to_header() + url = "%s://%s%s?%s" % ( + self._twitter_stream_scheme, + self._twitter_stream_host, + self._path, + urllib.urlencode(self._parameters)) + uri, headers, body = self._oauth_client.sign(url) headers["Host"] = self._twitter_stream_host headers["User-Agent"] = "TweetStream" headers["Accept"] = "*/*" From b8b74c4da84588709b2c6bf3343dd2f679803b5a Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Sun, 17 Aug 2014 03:59:57 -0400 Subject: [PATCH 13/16] support for python 3 (and python 2) --- tweetstream.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/tweetstream.py b/tweetstream.py index 6965b9a..36d4a3e 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -31,17 +31,25 @@ def callback(message): basic data from the twitter message payload. """ +from __future__ import print_function + from tornado.iostream import IOStream, SSLIOStream from tornado.ioloop import IOLoop import json import socket import time import oauthlib.oauth1 -import urllib -import urlparse import logging from datetime import datetime, timedelta +try: + # python 3 + from urllib.parse import urlparse, parse_qs, urlencode +except ImportError: + # python 2 + from urlparse import urlparse, parse_qs + from urllib import urlencode + class MissingConfiguration(Exception): """Raised when a configuration value is not found.""" pass @@ -100,7 +108,7 @@ def set_error_callback(self, error_callback): def fetch(self, path, method="GET", callback=None, rate_limited_callback=None): """ Opens the request """ - parts = urlparse.urlparse(path) + parts = urlparse(path) self._method = method self._callback = callback self._rate_limited_callback = rate_limited_callback @@ -113,7 +121,7 @@ def fetch(self, path, method="GET", callback=None, rate_limited_callback=None): # throwing away empty or extra query arguments self._parameters = dict([ (key, value[0]) for key, value in - urlparse.parse_qs(parts.query).iteritems() + parse_qs(parts.query).items() if value ]) self.schedule_restart() @@ -164,23 +172,24 @@ def on_connect(self, id): self._twitter_stream_scheme, self._twitter_stream_host, self._path, - urllib.urlencode(self._parameters)) + urlencode(self._parameters)) uri, headers, body = self._oauth_client.sign(url) headers["Host"] = self._twitter_stream_host headers["User-Agent"] = "TweetStream" headers["Accept"] = "*/*" # headers["Accept-Encoding"] = "deflate, gzip" request = ["GET %s HTTP/1.1" % self._full_path] - for key, value in headers.iteritems(): + for key, value in headers.items(): request.append("%s: %s" % (key, value)) request = "\r\n".join(request) + "\r\n\r\n" - self._twitter_stream.write(str(request)) - self._twitter_stream.read_until("\r\n\r\n", lambda response: self.on_headers(response, id)) + self._twitter_stream.write(request.encode()) + self._twitter_stream.read_until(b"\r\n\r\n", lambda response: self.on_headers(response, id)) def on_headers(self, response, id): if id != self._current_iostream: return """ Starts monitoring for results. """ + response = response.decode(encoding='UTF-8') self._twitter_stream.set_close_callback(lambda: None) status_line = response.splitlines()[0] response_code = status_line.replace("HTTP/1.1", "") @@ -234,12 +243,13 @@ def wait_for_message(self, id): logging.error("stream closed by remote host") return - self._twitter_stream.read_until("\r\n", lambda response: self.on_result(response, id)) + self._twitter_stream.read_until(b"\r\n", lambda response: self.on_result(response, id)) def on_result(self, response, id): if id != self._current_iostream: return """ Gets length of next message and reads it """ + response = response.decode(encoding='UTF-8') if (response.strip() == ""): return self.wait_for_message(id) # logging.info(response) @@ -249,6 +259,7 @@ def on_result(self, response, id): def parse_json(self, response, id): if id != self._current_iostream: return + response = response.decode(encoding='UTF-8') # if ord(response[-2]) != 13 or ord(response[-1]) != 10: self.set_stall_timeout() """ Checks JSON message """ @@ -276,8 +287,8 @@ def parse_response(self, response, id): name = response["user"]["name"] username = response["user"]["screen_name"] avatar = response["user"]["profile_image_url_https"] - except KeyError, exc: - print "Invalid tweet structure, missing %s" % exc + except KeyError as exc: + print("Invalid tweet structure, missing %s" % exc) return self.wait_for_message(id) response = { From 6fea8701fbcf25f54be1bbbcc7ff8b2ba1d47b07 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Mon, 18 Aug 2014 00:31:52 -0400 Subject: [PATCH 14/16] update tests to support python 3 (and python 2) --- tests.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests.py b/tests.py index d08a277..ee273ef 100644 --- a/tests.py +++ b/tests.py @@ -1,3 +1,5 @@ +from __future__ import print_function + from tornado.testing import AsyncTestCase import tweetstream import logging @@ -28,7 +30,7 @@ def test_twitter_stream(self): result = {} def error_callback(error): result["error"] = error - print error + print(error) self.stop() configuration = { From 5870981e2584722f373510409ec075c6a222f6f9 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Mon, 18 Aug 2014 03:35:47 -0400 Subject: [PATCH 15/16] added keep_alive_callback --- tweetstream.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 36d4a3e..70e36c1 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -64,6 +64,7 @@ def __init__(self, configuration, ioloop=None, clean=False): self._callback = None self._error_callback = None self._rate_limited_callback = None + self._keep_alive_callback = None self._clean_message = clean self._configuration = configuration self._consumer_key = self._get_configuration_key("twitter_consumer_key") @@ -106,12 +107,13 @@ def set_error_callback(self, error_callback): """Pretty self explanatory.""" self._error_callback = error_callback - def fetch(self, path, method="GET", callback=None, rate_limited_callback=None): + def fetch(self, path, method="GET", callback=None, rate_limited_callback=None, keep_alive_callback=None): """ Opens the request """ parts = urlparse(path) self._method = method self._callback = callback self._rate_limited_callback = rate_limited_callback + self._keep_alive_callback = keep_alive_callback self._path = parts.path self._full_path = self._path self._parameters = {} @@ -265,6 +267,8 @@ def parse_json(self, response, id): """ Checks JSON message """ if not response.strip(): # Empty line, happens sometimes for keep alive + if self._keep_alive_callback: + self._keep_alive_callback() return self.wait_for_message(id) try: response = json.loads(response) From 503d4d5fb6cf4833d41cd3a62cceebbd18c70954 Mon Sep 17 00:00:00 2001 From: Kyle Mulka Date: Thu, 6 Dec 2018 00:32:40 -0500 Subject: [PATCH 16/16] reduce stall timeout reduce stall timeout to 30 seconds, per Twitter's documentation --- tweetstream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tweetstream.py b/tweetstream.py index 70e36c1..9102eab 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -317,7 +317,7 @@ def schedule_restart(self, seconds=0): def set_stall_timeout(self): self.remove_stall_timeout() - self._stall_timeout_handle = self._ioloop.add_timeout(timedelta(seconds=90), self.stall_callback) + self._stall_timeout_handle = self._ioloop.add_timeout(timedelta(seconds=30), self.stall_callback) def remove_stall_timeout(self): if self._stall_timeout_handle: