diff --git a/README.md b/README.md index 25be579..08f0e63 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,20 @@ Usage is pretty simple: ```python import tweetstream -tweetstream.TWITTER_APP_USER = "username" -tweetstream.TWITTER_APP_PASSWORD = "password" def callback(message): # this will be called every message print message -stream = tweetstream.TweetStream() -stream.fetch("/1/statuses/filter.json?track=foobar", callback=callback) +configuration = { + "twitter_consumer_key": "key", + "twitter_consumer_secret": "secret", + "twitter_access_token": "token", + "twitter_access_token_secret": "secret", +} + +stream = tweetstream.TweetStream(configuration) +stream.fetch("/1.1/statuses/filter.json?track=foobar", callback=callback) # if you aren't on a running ioloop... from tornado.ioloop import IOLoop @@ -24,5 +29,3 @@ The constructor takes two optional arguments, `ioloop` and `clean`. The `ioloop` argument just lets you specify a specific loop to run on, and `clean` is just a boolean (False by default) that will strip out basic data from the twitter message payload. - -TODO: Implement OAuth header instead of Basic Auth. diff --git a/requirements.txt b/requirements.txt index 9c4806d..c36be0a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ argparse==1.2.1 httplib2==0.7.4 nose==1.1.2 -oauth2==1.5.211 +oauthlib==0.6.3 tornado==2.3 wsgiref==0.1.2 diff --git a/setup.py b/setup.py index 9a95f4b..6a99a02 100644 --- a/setup.py +++ b/setup.py @@ -24,5 +24,5 @@ url = "http://github.com/joshmarshall/tweetstream", license = "http://www.apache.org/licenses/LICENSE-2.0", py_modules=['tweetstream',], - install_requires=['tornado', "oauth2"] + install_requires=['tornado', "oauthlib"] ) diff --git a/tests.py b/tests.py index d08a277..ee273ef 100644 --- a/tests.py +++ b/tests.py @@ -1,3 +1,5 @@ +from __future__ import print_function + from tornado.testing import AsyncTestCase import tweetstream import logging @@ -28,7 +30,7 @@ def test_twitter_stream(self): result = {} def error_callback(error): result["error"] = error - print error + print(error) self.stop() configuration = { diff --git a/tweetstream.py b/tweetstream.py index 5ca2e79..9102eab 100644 --- a/tweetstream.py +++ b/tweetstream.py @@ -31,13 +31,24 @@ def callback(message): basic data from the twitter message payload. """ +from __future__ import print_function + from tornado.iostream import IOStream, SSLIOStream from tornado.ioloop import IOLoop import json import socket import time -import oauth2 -import urlparse +import oauthlib.oauth1 +import logging +from datetime import datetime, timedelta + +try: + # python 3 + from urllib.parse import urlparse, parse_qs, urlencode +except ImportError: + # python 2 + from urlparse import urlparse, parse_qs + from urllib import urlencode class MissingConfiguration(Exception): """Raised when a configuration value is not found.""" @@ -52,23 +63,34 @@ def __init__(self, configuration, ioloop=None, clean=False): self._ioloop = ioloop or IOLoop.instance() self._callback = None self._error_callback = None + self._rate_limited_callback = None + self._keep_alive_callback = None self._clean_message = clean self._configuration = configuration - consumer_key = self._get_configuration_key("twitter_consumer_key") - consumer_secret = self._get_configuration_key( - "twitter_consumer_secret") - self._consumer = oauth2.Consumer( - key=consumer_key, secret=consumer_secret) + self._consumer_key = self._get_configuration_key("twitter_consumer_key") + self._consumer_secret = self._get_configuration_key("twitter_consumer_secret") access_token = self._get_configuration_key("twitter_access_token") - access_secret = self._get_configuration_key( - "twitter_access_token_secret") - self._token = oauth2.Token(key=access_token, secret=access_secret) - self._twitter_stream_host = self._get_configuration_key( - "twitter_stream_host", "stream.twitter.com") - self._twitter_stream_scheme = self._get_configuration_key( - "twitter_stream_scheme", "https") - self._twitter_stream_port = self._get_configuration_key( - "twitter_stream_port", 443) + access_secret = self._get_configuration_key("twitter_access_token_secret") + self.set_token(access_token, access_secret) + self._twitter_stream_host = self._get_configuration_key("twitter_stream_host", "stream.twitter.com") + self._twitter_stream_scheme = self._get_configuration_key("twitter_stream_scheme", "https") + self._twitter_stream_port = self._get_configuration_key("twitter_stream_port", 443) + self._twitter_stream = None + self._stream_restart_scheduled = False + self._stream_restart_in_process = False + self._stream_restart_time = None + self._retry_delay = 5 + self._retry_rate_limited_delay = 60 + self._retry_before_established_delay = .25 + self._timeout_handle = None + self._stall_timeout_handle = None + self._current_iostream = None + self._partial_tweet = '' + + def set_token(self, access_token, access_secret): + self._oauth_client = oauthlib.oauth1.Client( + self._consumer_key, client_secret=self._consumer_secret, + resource_owner_key=access_token, resource_owner_secret=access_secret) def _get_configuration_key(self, key, default=None): """ @@ -85,11 +107,13 @@ def set_error_callback(self, error_callback): """Pretty self explanatory.""" self._error_callback = error_callback - def fetch(self, path, method="GET", callback=None): + def fetch(self, path, method="GET", callback=None, rate_limited_callback=None, keep_alive_callback=None): """ Opens the request """ - parts = urlparse.urlparse(path) + parts = urlparse(path) self._method = method self._callback = callback + self._rate_limited_callback = rate_limited_callback + self._keep_alive_callback = keep_alive_callback self._path = parts.path self._full_path = self._path self._parameters = {} @@ -99,10 +123,10 @@ def fetch(self, path, method="GET", callback=None): # throwing away empty or extra query arguments self._parameters = dict([ (key, value[0]) for key, value in - urlparse.parse_qs(parts.query).iteritems() + parse_qs(parts.query).items() if value ]) - self.open_twitter_stream() + self.schedule_restart() def on_error(self, error): """ Just a wrapper for the error callback """ @@ -112,54 +136,83 @@ def on_error(self, error): raise error def open_twitter_stream(self): + logging.info("open_twitter_stream") + + self._stream_restart_scheduled = False + + if self._stream_restart_time is not None and datetime.now() - self._stream_restart_time < timedelta(seconds=5): + self.schedule_restart(5) + return + + self._stream_restart_time = datetime.now() + self._stream_restart_in_process = True + + if self._twitter_stream: + self.close() + """ Creates the client and watches stream """ address_info = socket.getaddrinfo(self._twitter_stream_host, self._twitter_stream_port, socket.AF_INET, socket.SOCK_STREAM, 0, 0) af, socktype, proto = address_info[0][:3] socket_address = address_info[0][-1] + logging.info('socket address:' + str(socket_address)) sock = socket.socket(af, socktype, proto) stream_class = IOStream if self._twitter_stream_scheme == "https": stream_class = SSLIOStream self._twitter_stream = stream_class(sock, io_loop=self._ioloop) - self._twitter_stream.connect(socket_address, self.on_connect) - - def on_connect(self): - parameters = { - "oauth_token": self._token.key, - "oauth_consumer_key": self._consumer.key, - "oauth_version": "1.0", - "oauth_nonce": oauth2.generate_nonce(), - "oauth_timestamp": int(time.time()) - } - parameters.update(self._parameters) - request = oauth2.Request( - method="GET", - url="%s://%s%s" % ( - self._twitter_stream_scheme, - self._twitter_stream_host, - self._path), - parameters=parameters) - signature_method = oauth2.SignatureMethod_HMAC_SHA1() - request.sign_request(signature_method, self._consumer, self._token) - headers = request.to_header() + self._twitter_stream.set_close_callback(self.close_before_established_callback) + self._current_iostream = str(self._twitter_stream) + id = self._current_iostream + self._twitter_stream.connect(socket_address, lambda: self.on_connect(id)) + + def on_connect(self, id): + if id != self._current_iostream: + return + url = "%s://%s%s?%s" % ( + self._twitter_stream_scheme, + self._twitter_stream_host, + self._path, + urlencode(self._parameters)) + uri, headers, body = self._oauth_client.sign(url) headers["Host"] = self._twitter_stream_host headers["User-Agent"] = "TweetStream" headers["Accept"] = "*/*" + # headers["Accept-Encoding"] = "deflate, gzip" request = ["GET %s HTTP/1.1" % self._full_path] - for key, value in headers.iteritems(): + for key, value in headers.items(): request.append("%s: %s" % (key, value)) request = "\r\n".join(request) + "\r\n\r\n" - self._twitter_stream.write(str(request)) - self._twitter_stream.read_until("\r\n\r\n", self.on_headers) + self._twitter_stream.write(request.encode()) + self._twitter_stream.read_until(b"\r\n\r\n", lambda response: self.on_headers(response, id)) - def on_headers(self, response): + def on_headers(self, response, id): + if id != self._current_iostream: + return """ Starts monitoring for results. """ + response = response.decode(encoding='UTF-8') + self._twitter_stream.set_close_callback(lambda: None) status_line = response.splitlines()[0] response_code = status_line.replace("HTTP/1.1", "") response_code = int(response_code.split()[0].strip()) if response_code != 200: + if response_code == 420: + logging.error('stream connect being rate limited. next try in %s seconds' % self._retry_rate_limited_delay) + self._stream_restart_in_process = False + self.schedule_restart(self._retry_rate_limited_delay) + self._retry_rate_limited_delay *= 2 + if self._rate_limited_callback: + self._rate_limited_callback() + return + elif response_code not in [401, 403, 404, 406, 413]: + logging.error('stream connect failed with response_code: %s. next try in %s seconds' % (response_code, self._retry_delay)) + self._stream_restart_in_process = False + self.schedule_restart(self._retry_delay) + if self._retry_delay < 320: + self._retry_delay *= 2 + return + exception_string = "Could not connect: %s\n%s" % ( status_line, response) headers = dict([ @@ -175,34 +228,62 @@ def get_error_body(content): return self._twitter_stream.read_bytes( content_length, get_error_body) - self.wait_for_message() + logging.info("stream connection established") + self._stream_restart_in_process = False + self._retry_delay = 5 + self._retry_rate_limited_delay = 60 + self._retry_before_established_delay = .25 - def wait_for_message(self): + self._twitter_stream.set_close_callback(self.close_callback) + self.set_stall_timeout() + + self.wait_for_message(id) + + def wait_for_message(self, id): """ Throw a read event on the stack. """ - self._twitter_stream.read_until("\r\n", self.on_result) + if self._twitter_stream.closed(): + logging.error("stream closed by remote host") + return + + self._twitter_stream.read_until(b"\r\n", lambda response: self.on_result(response, id)) - def on_result(self, response): + def on_result(self, response, id): + if id != self._current_iostream: + return """ Gets length of next message and reads it """ + response = response.decode(encoding='UTF-8') if (response.strip() == ""): - return self.wait_for_message() + return self.wait_for_message(id) + # logging.info(response) length = int(response.strip(), 16) - self._twitter_stream.read_bytes(length, self.parse_json) + self._twitter_stream.read_bytes(length, lambda response: self.parse_json(response, id)) - def parse_json(self, response): + def parse_json(self, response, id): + if id != self._current_iostream: + return + response = response.decode(encoding='UTF-8') + # if ord(response[-2]) != 13 or ord(response[-1]) != 10: + self.set_stall_timeout() """ Checks JSON message """ if not response.strip(): # Empty line, happens sometimes for keep alive - return self.wait_for_message() + if self._keep_alive_callback: + self._keep_alive_callback() + return self.wait_for_message(id) try: response = json.loads(response) + self._partial_tweet = '' except ValueError: - print "Invalid response:" - print response - return self.wait_for_message() + self._partial_tweet += response + try: + response = json.loads(self._partial_tweet) + self._partial_tweet = '' + except ValueError: + return self.wait_for_message(id) - self.parse_response(response) + self.parse_response(response, id) - def parse_response(self, response): + def parse_response(self, response, id): """ Parse the twitter message """ if self._clean_message: try: @@ -210,9 +291,9 @@ def parse_response(self, response): name = response["user"]["name"] username = response["user"]["screen_name"] avatar = response["user"]["profile_image_url_https"] - except KeyError, exc: - print "Invalid tweet structure, missing %s" % exc - return self.wait_for_message() + except KeyError as exc: + print("Invalid tweet structure, missing %s" % exc) + return self.wait_for_message(id) response = { "type": "tweet", @@ -224,4 +305,59 @@ def parse_response(self, response): } if self._callback: self._callback(response) - self.wait_for_message() + self.wait_for_message(id) + + def schedule_restart(self, seconds=0): + if not self._stream_restart_scheduled and not self._stream_restart_in_process: + self._stream_restart_scheduled = True + if seconds == 0: + self.open_twitter_stream() + else: + self.add_timeout(seconds, self.open_twitter_stream) + + def set_stall_timeout(self): + self.remove_stall_timeout() + self._stall_timeout_handle = self._ioloop.add_timeout(timedelta(seconds=30), self.stall_callback) + + def remove_stall_timeout(self): + if self._stall_timeout_handle: + self._ioloop.remove_timeout(self._stall_timeout_handle) + self._stall_timeout_handle = None + + def add_timeout(self, seconds, callback): + self.remove_timeout() + self._timeout_handle = self._ioloop.add_timeout(timedelta(seconds=seconds), callback) + + def remove_timeout(self): + if self._timeout_handle: + self._ioloop.remove_timeout(self._timeout_handle) + self._timeout_handle = None + + def close_helper(self): + self.remove_timeout() + self.remove_stall_timeout() + self._twitter_stream.close() + + def close(self): + if self._twitter_stream: + self._twitter_stream.set_close_callback(lambda: None) + self.close_helper() + + def close_before_established_callback(self): + logging.error('stream closed before establishing a connection') + self.close() + self._stream_restart_in_process = False + self.schedule_restart(self._retry_before_established_delay) + if self._retry_before_established_delay < 16: + self._retry_before_established_delay += .25 + + def close_callback(self): + logging.error('close callback') + self.close_helper() + self.schedule_restart() + + def stall_callback(self): + logging.error('stream stalled. restarting') + self.close() + self.schedule_restart() +