Skip to content

Commit

Permalink
resume jobs after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
kmike committed May 25, 2016
1 parent 3fbb049 commit 79aa9f9
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 16 deletions.
1 change: 1 addition & 0 deletions arachnado/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def main(port, host, start_manhole, manhole_port, manhole_host, loglevel, opts):
spider_packages=_parse_spider_packages(spider_packages),
settings=settings
)
domain_crawlers.resume(job_storage)

cron = Cron(domain_crawlers, site_storage)
cron.start()
Expand Down
22 changes: 18 additions & 4 deletions arachnado/domain_crawlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import os
import uuid
import warnings

from arachnado.crawler_process import ArachnadoCrawler
from tornado.ioloop import IOLoop
from tornado import gen
from scrapy.settings import Settings

import arachnado.settings
from arachnado.crawler_process import ArachnadoCrawler
from arachnado.spider import CrawlWebsiteSpider, ArachnadoSpider
from arachnado.utils.spiders import get_spider_cls

Expand All @@ -21,14 +23,26 @@ def __init__(self, crawler_process, spider_packages, settings):
self.crawler_process = crawler_process
self.spider_packages = spider_packages

def start(self, domain, args, settings):
def resume(self, job_storage):
@gen.coroutine
def _resume():
query = {"status": {"$in": ["shutdown", "running"]}}
for job in (yield job_storage.fetch(query)):
if 'options' not in job:
warnings.warn("invalid job without options, can't resume: %s" % job)
continue
self.start(**job['options'])

IOLoop.instance().add_callback(_resume)

def start(self, domain, args, settings, crawl_id=None):
""" Create, start and return a crawler for a given domain. """
spider_cls = get_spider_cls(domain, self.spider_packages,
CrawlWebsiteSpider)
if not spider_cls:
return

crawl_id = uuid.uuid4().hex
crawl_id = uuid.uuid4().hex if crawl_id is None else crawl_id
crawler = self._create_crawler(crawl_id, spider_cls, settings)
crawler.start_options = dict(
domain=domain,
Expand Down
12 changes: 8 additions & 4 deletions arachnado/pipelines/mongoexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import datetime

from tornado import gen
from bson.objectid import ObjectId
import scrapy
from scrapy.exceptions import NotConfigured
from scrapy import signals
Expand Down Expand Up @@ -74,16 +75,19 @@ def from_crawler(cls, crawler):
def open_spider(self, spider):
try:
yield self.items_col.ensure_index(self.job_id_key)
yield self.jobs_col.ensure_index('id', unique=True)

self.job_id = yield self.jobs_col.insert({
job = yield self.jobs_col.find_and_modify({
'id': spider.crawl_id,
}, {
'id': spider.crawl_id,
'started_at': datetime.datetime.utcnow(),
'status': 'running',
'spider': spider.name,
'options': getattr(spider.crawler, 'start_options', {}),
})
}, upsert=True, new=True)
self.job_id = str(job['_id'])
spider.motor_job_id = str(self.job_id)

logger.info("Crawl job generated id: %s", self.job_id,
extra={'crawler': self.crawler})
except Exception:
Expand All @@ -109,7 +113,7 @@ def spider_closed(self, spider, reason, **kwargs):
status = 'shutdown'

yield self.jobs_col.update(
{'_id': self.job_id},
{'_id': ObjectId(self.job_id)},
{'$set': {
'finished_at': datetime.datetime.utcnow(),
'status': status,
Expand Down
17 changes: 12 additions & 5 deletions arachnado/spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,22 @@ def parse_first(self, response):
if self.domain.startswith("www."):
allow_domain = allow_domain[len("www."):]

self.link_extractor = LinkExtractor(
allow_domains=[allow_domain],
canonicalize=False,
)
self.get_links = self.link_extractor.extract_links
self.state['allow_domain'] = allow_domain

for elem in self.parse(response):
yield elem

@property
def link_extractor(self):
return LinkExtractor(
allow_domains=[self.state['allow_domain']],
canonicalize=False,
)

@property
def get_links(self):
return self.link_extractor.extract_links

def parse(self, response):
if not isinstance(response, HtmlResponse):
self.logger.info("non-HTML response is skipped: %s" % response.url)
Expand Down
2 changes: 1 addition & 1 deletion arachnado/static/js/components/KeyValueList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@ var KeyValueRow = React.createClass({
this.props.onChange(index, key, value);
}
}
})
});
4 changes: 2 additions & 2 deletions arachnado/storages/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ def available_subscriptions(self):
return list(self.signals.keys())

@coroutine
def fetch(self):
def fetch(self, query=None):
if self.fetching:
return
self.fetching = True
docs = []
cursor = self.col.find()
cursor = self.col.find(query)
while (yield cursor.fetch_next):
doc = cursor.next_object()
docs.append(doc)
Expand Down

0 comments on commit 79aa9f9

Please sign in to comment.