Skip to content

Commit

Permalink
Merge pull request TeamHG-Memex#24 from TeamHG-Memex/pages_api_docs
Browse files Browse the repository at this point in the history
[wip] docs & cleanups
  • Loading branch information
zergey authored Jul 7, 2016
2 parents d0c3cb7 + b79fcdd commit b5a9fa0
Show file tree
Hide file tree
Showing 39 changed files with 1,233 additions and 172 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ bot_spiders/
.coverage.*
htmlcov/
.scrapy
docs/_build
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ RUN pip3 install .

# use e.g. -v /path/to/my/arachnado/config.conf:/etc/arachnado.conf
# docker run option to override arachnado parameters
VOLUME /etc/arachnado.conf
# The VOLUME is not exposed here because Docker assumes that volumes are folders
# (unless the file really exists), so this can cause problems in docker-compose
# later (see https://github.com/docker/docker/issues/21702#issuecomment-221987049)

# this folder is added to PYTHONPATH, so modules from there are available
# for spider_packages Arachnado option
Expand Down
14 changes: 7 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ License is MIT.
Install
-------

Arachnado requires Python 2.7.
Arachnado requires Python 2.7 or Python 3.5.
To install Arachnado use pip::

pip install arachnado
Expand Down Expand Up @@ -41,13 +41,13 @@ the server::
For available options check
https://github.com/TeamHG-Memex/arachnado/blob/master/arachnado/config/defaults.conf.

Test
-----------
To start unit tests for API:
Tests
-----

To run tests make sure tox_ is installed, then
execute ``tox`` command from the source root.

python -m tornado.test.runtests tests.test_data
or
python3 -m tornado.test.runtests tests.test_data
.. _tox: https://testrun.org/tox/latest/

Development
-----------
Expand Down
8 changes: 5 additions & 3 deletions arachnado/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def main(port, host, start_manhole, manhole_port, manhole_host, loglevel, opts):
jobs_uri = _getval(storage_opts, 'jobs_uri_env', 'jobs_uri')
sites_uri = _getval(storage_opts, 'sites_uri_env', 'sites_uri')

settings.update({k: v for k, v in opts['arachnado.scrapy'].items()
if k.isupper()})
scrapy_opts = opts['arachnado.scrapy']
settings.update({k: v for k, v in scrapy_opts.items() if k.isupper()})

settings.update({
'MONGO_EXPORT_ENABLED': storage_opts['enabled'],
Expand All @@ -91,10 +91,12 @@ def main(port, host, start_manhole, manhole_port, manhole_host, loglevel, opts):
site_checker_crawler = get_site_checker_crawler(site_storage)
crawler_process.crawl(site_checker_crawler)

spider_packages = opts['arachnado.scrapy']['spider_packages']
spider_packages = scrapy_opts['spider_packages']
default_spider_name = scrapy_opts['default_spider_name']
domain_crawlers = DomainCrawlers(
crawler_process=crawler_process,
spider_packages=_parse_spider_packages(spider_packages),
default_spider_name=default_spider_name,
settings=settings
)
domain_crawlers.resume(job_storage)
Expand Down
9 changes: 8 additions & 1 deletion arachnado/config/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
[arachnado]
; General Arachnado server options.

; Event loop to use. Allowed values are "twisted", "tornado" and "auto".
; Event loop to use. Allowed values are
; "twisted", "tornado" and "auto".
reactor = auto

; Host/port to listen to
Expand All @@ -31,6 +32,12 @@ DEPTH_LIMIT = 10
; Packages to load spiders from (separated by whitespace)
spider_packages =

; Name of the default spider. It is used for crawling if
; no custom spider is specified or detected. It should support
; API similar to arachnado.spider.CrawlWebsiteSpider
; (which is the default here).
default_spider_name = generic

[arachnado.storage]
; Where to store crawled items and job information.
; Currently only MongoDB is supported (mongodb:// URIs).
Expand Down
8 changes: 7 additions & 1 deletion arachnado/crawler_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,13 @@ def _downloader_stats(cls, crawler):

@classmethod
def _request_info(cls, request):
return {'url': request.url, 'method': request.method}
info = {'url': request.url, 'method': request.method}
if 'splash' in request.meta:
splash_args = request.meta['splash'].get('args', {})
if 'url' in splash_args:
info['url'] = splash_args['url']
info['method'] = splash_args.get('http_method', 'GET')
return info

@classmethod
def _slot_info(cls, key, slot):
Expand Down
8 changes: 3 additions & 5 deletions arachnado/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, domain_crawlers, site_storage):
self.waiting_calls = {}
self.domain_crawlers = domain_crawlers
self.site_storage = site_storage
self.site_storage.subscribe(self.site_storage.available_subscriptions,
self.site_storage.subscribe(self.site_storage.available_events,
self.rerun)

def start(self):
Expand Down Expand Up @@ -96,10 +96,8 @@ def start_crawl(self, id_):
args = _key_value_to_dict(site.get('args', []))
settings = _key_value_to_dict(site.get('settings', []))

if not site.get('engine'):
site['engine'] = 'generic'

if site['engine'] == 'generic':
# checking for == 'generic' to be backwards compatible
if not site.get('engine') or site['engine'] == 'generic':
url = site['url']
else:
url = 'spider://' + site['engine']
Expand Down
12 changes: 8 additions & 4 deletions arachnado/domain_crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import arachnado.settings
from arachnado.crawler_process import ArachnadoCrawler
from arachnado.spider import CrawlWebsiteSpider, ArachnadoSpider
from arachnado.utils.spiders import get_spider_cls
from arachnado.utils.spiders import get_spider_cls, find_spider_cls


class DomainCrawlers(object):
"""
Helper class to create and start crawlers.
"""
def __init__(self, crawler_process, spider_packages, settings):
def __init__(self, crawler_process, spider_packages, default_spider_name,
settings):
self.settings = get_settings(settings)
self.crawler_process = crawler_process
self.spider_packages = spider_packages
self.default_spider_name = default_spider_name

def resume(self, job_storage):
@gen.coroutine
Expand All @@ -37,8 +39,10 @@ def _resume():

def start(self, domain, args, settings, crawl_id=None):
""" Create, start and return a crawler for a given domain. """
spider_cls = get_spider_cls(domain, self.spider_packages,
CrawlWebsiteSpider)
default_cls = find_spider_cls(
self.default_spider_name,
self.spider_packages + ['arachnado.spider'])
spider_cls = get_spider_cls(domain, self.spider_packages, default_cls)
if not spider_cls:
return

Expand Down
1 change: 1 addition & 0 deletions arachnado/downloadermiddlewares/proxyfromsettings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def from_crawler(cls, crawler):

def __init__(self, settings):
self.proxies = {}
self.auth_encoding = settings.get('HTTPPROXY_AUTH_ENCODING')
proxies = [
('http', settings.get('HTTP_PROXY')),
('https', settings.get('HTTPS_PROXY')),
Expand Down
7 changes: 7 additions & 0 deletions arachnado/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ def get_application(crawler_process, domain_crawlers,
debug = opts['arachnado']['debug']

handlers = [
# UI
url(r"/", Index, context, name="index"),
url(r"/help", Help, context, name="help"),

# simple API used by UI
url(r"/crawler/start", StartCrawler, context, name="start"),
url(r"/crawler/stop", StopCrawler, context, name="stop"),
url(r"/crawler/pause", PauseCrawler, context, name="pause"),
url(r"/crawler/resume", ResumeCrawler, context, name="resume"),
url(r"/crawler/status", CrawlerStatus, context, name="status"),
url(r"/ws-updates", Monitor, context, name="ws-updates"),

# RPC API
url(r"/ws-rpc", RpcWebsocketHandler, context, name="ws-rpc"),
url(r"/rpc", RpcHttpHandler, context, name="rpc"),
url(r"/ws-pages-data", PagesDataRpcWebsocketHandler, context, name="ws-pages-data"),
Expand Down Expand Up @@ -148,6 +153,8 @@ def control_job(self, job_id):

class CrawlerStatus(BaseRequestHandler):
""" Status for one or more jobs. """
# FIXME: does it work? Can we remove it? It is not used
# by Arachnado UI.
def get(self):
crawl_ids_arg = self.get_argument('crawl_ids', '')

Expand Down
4 changes: 2 additions & 2 deletions arachnado/manhole.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
An interactive Python interpreter available through telnet.
"""
from __future__ import absolute_import
from twisted.conch.manhole import ColoredManhole
from twisted.conch.insults import insults
from twisted.conch.telnet import TelnetTransport, TelnetBootstrapProtocol
from twisted.internet import protocol


def start(port=None, host=None, telnet_vars=None):
from twisted.conch.manhole import ColoredManhole
from twisted.conch.insults import insults
from twisted.internet import reactor

port = int(port) if port else 6023
Expand Down
4 changes: 2 additions & 2 deletions arachnado/pipelines/mongoexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ def from_crawler(cls, crawler):
def get_spider_urls(cls, spider):
options = getattr(spider.crawler, 'start_options', None)
if options and "domain" in options:
return options["domain"]
return [options["domain"]]
else:
return " ".join(spider.start_urls)
return spider.start_urls

@tt_coroutine
def open_spider(self, spider):
Expand Down
18 changes: 14 additions & 4 deletions arachnado/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@

class ArachnadoRPC(object):
""" Base class for all Arachnado RPC resources.
Use it as a mixin for tornado.web.RequestHandler subclasses.
It provides :meth:`handle_request` method which handles
Jobs, Sites and Pages RPC requests.
"""
rpc_objects = tuple()

def initialize(self, *args, **kwargs):
jobs = Jobs(self, *args, **kwargs)
sites = Sites(self, *args, **kwargs)
pages = Pages(self, *args, **kwargs)
self.rpc_objects = [jobs, sites, pages]

self.dispatcher = Dispatcher()
self.dispatcher.add_object(Jobs(self, *args, **kwargs))
self.dispatcher.add_object(Sites(self, *args, **kwargs))
self.dispatcher.add_object(Pages(self, *args, **kwargs))
for obj in self.rpc_objects:
self.dispatcher.add_object(obj)

def handle_request(self, body):
response = JSONRPCResponseManager.handle(body, self.dispatcher)
Expand All @@ -45,4 +55,4 @@ def post(self):

def send_data(self, data):
self.write(json_encode(data))
self.finish()
self.finish()
45 changes: 20 additions & 25 deletions arachnado/rpc/data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import logging

from arachnado.utils.misc import json_encode
# A little monkey patching to have custom types encoded right
# from jsonrpclib import jsonrpc
# jsonrpc.jdumps = json_encode
# import tornadorpc
import json
from collections import deque
from tornado import gen
import tornado.ioloop
from bson.objectid import ObjectId
Expand All @@ -18,15 +13,14 @@

from arachnado.crawler_process import agg_stats_changed, CrawlerProcessSignals as CPS
from arachnado.rpc.ws import RpcWebsocketHandler
from arachnado.utils.misc import json_encode

logger = logging.getLogger(__name__)
# tornadorpc.config.verbose = True
# tornadorpc.config.short_errors = True


class DataRpcWebsocketHandler(RpcWebsocketHandler):
""" basic class for Data API handlers"""
stored_data = []
stored_data = deque()
delay_mode = False
event_types = []
data_hb = None
Expand All @@ -52,15 +46,15 @@ def init_hb(self, update_delay):
)
self.data_hb.start()

def add_storage(self, mongo_q, storage):
self.dispatcher.add_object(storage)
def add_storage_wrapper(self, mongo_q, storage_wrapper):
self.dispatcher.add_object(storage_wrapper)
new_id = str(len(self.storages))
self.storages[new_id] = {
"storage": storage,
"storage": storage_wrapper,
"job_ids": set([])
}
storage.handler_id = new_id
storage.subscribe(query=mongo_q)
storage_wrapper.handler_id = new_id
storage_wrapper.subscribe(query=mongo_q)
return new_id

def cancel_subscription(self, subscription_id):
Expand All @@ -79,8 +73,6 @@ def initialize(self, *args, **kwargs):
self.dispatcher["cancel_subscription"] = self.cancel_subscription

def on_close(self):
# import traceback
# traceback.print_stack()
logger.info("connection closed")
for storage in self.storages.values():
storage["storage"]._on_close()
Expand All @@ -98,9 +90,8 @@ def on_spider_closed(self, spider):
self.write_event("jobs:state", job)

def send_updates(self):
logger.debug("send_updates: {}".format(len(self.stored_data)))
while len(self.stored_data):
item = self.stored_data.pop(0)
item = self.stored_data.popleft()
return self._send_event(item["event"], item["data"])


Expand All @@ -113,7 +104,7 @@ def subscribe_to_jobs(self, include=[], exclude=[], update_delay=0):
mongo_q = self.create_jobs_query(include=include, exclude=exclude)
self.init_hb(update_delay)
return { "datatype": "job_subscription_id",
"id": self.add_storage(mongo_q, storage=self.create_jobs_storage_link())
"id": self.add_storage_wrapper(mongo_q, storage_wrapper=self.create_jobs_storage_link())
}

@gen.coroutine
Expand All @@ -128,8 +119,8 @@ def write_event(self, event, data, handler_id=None):
if event == 'stats:changed':
if len(data) > 1:
job_id = data[0]
# dumps for back compatibility
event_data = {"stats": json.dumps(data[1]),
# two fields with same content for back compatibility
event_data = {"stats": data[1],
"stats_dict": data[1],
}
# same as crawl_id
Expand All @@ -146,6 +137,12 @@ def write_event(self, event, data, handler_id=None):
allowed = allowed or job_id in storage["job_ids"]
if not allowed:
return
if 'stats' in event_data:
if not isinstance(event_data['stats'], dict):
try:
event_data['stats'] = json.loads(event_data['stats'])
except Exception as ex:
logger.warning("Invalid stats field in job {}".format(event_data.get("_id", "MISSING MONGO ID")))
if event in self.event_types and self.delay_mode:
self.stored_data.append({"event":event, "data":event_data})
else:
Expand Down Expand Up @@ -173,8 +170,6 @@ def create_jobs_storage_link(self):
return jobs

def on_close(self):
# import traceback
# traceback.print_stack()
logger.info("connection closed")
if self.cp:
self.cp.signals.disconnect(self.on_stats_changed, agg_stats_changed)
Expand Down Expand Up @@ -206,12 +201,12 @@ def subscribe_to_pages(self, site_ids={}, update_delay=0, mode="urls"):
}
if mode == "urls":
mongo_q = self.create_pages_query(site_ids=site_ids)
result["single_subscription_id"] = self.add_storage(mongo_q, storage=self.create_pages_storage_link())
result["single_subscription_id"] = self.add_storage_wrapper(mongo_q, storage_wrapper=self.create_pages_storage_link())
elif mode == "ids":
res = {}
for site_id in site_ids:
mongo_q = self.create_pages_query(site_ids=site_ids[site_id])
res[site_id] = self.add_storage(mongo_q, storage=self.create_pages_storage_link())
res[site_id] = self.add_storage_wrapper(mongo_q, storage_wrapper=self.create_pages_storage_link())
result["id"] = res
return result

Expand Down
Loading

0 comments on commit b5a9fa0

Please sign in to comment.