Skip to content

Commit

Permalink
Data API bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zuev committed Aug 9, 2016
1 parent 9a5181e commit 87304b8
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 28 deletions.
13 changes: 8 additions & 5 deletions arachnado/rpc/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,14 @@ def create_subscribtion_to_urls(self, urls):
jobs_q = self.create_jobs_query(url)
jobs_ds = yield jobs.storage.fetch(jobs_q)
job_ids =[x["_id"] for x in jobs_ds]
storage.job_ids.update(job_ids)
pages_query = storage.create_pages_query(job_ids, last_id)
storage.filters.append(pages_query)
storage.jobs.append(jobs)
jobs_to_subscribe.append([jobs_q, jobs])
if job_ids:
storage.job_ids.update(job_ids)
pages_query = storage.create_pages_query(job_ids, last_id)
storage.filters.append(pages_query)
storage.jobs.append(jobs)
jobs_to_subscribe.append([jobs_q, jobs])
else:
logger.info("No jobs found for url {}".format(url))
storage.subscribe_to_pages()
for jobs_q, jobs in jobs_to_subscribe:
jobs.subscribe(query=jobs_q)
Expand Down
12 changes: 6 additions & 6 deletions arachnado/rpc/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ class RpcWebsocketHandler(ArachnadoRPC, websocket.WebSocketHandler):
"""

def on_message(self, message):
try:
data = json.loads(message)
except (TypeError, ValueError):
logger.warn('Invalid message skipped: {!r}'.format(message[:500]))
return
self.handle_request(json.dumps(data))
# try:
# data = json.loads(message)
# except (TypeError, ValueError):
# logger.warn('Invalid message skipped: {!r}'.format(message[:500]))
# return
self.handle_request(message)

def send_data(self, data):
self.write_event(data)
Expand Down
27 changes: 19 additions & 8 deletions docs/json-rpc-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,35 @@ JSON-RPC responses::
"result": ...
}

Working with jobs
-----------------
Working with jobs and pages
---------------------------

JSON-RPC API allows to

* get information about scraping jobs;
* start new crawls;
* subscripbe to crawled pages;
* subscribe to job updates.

jobs.subscribe
Get information about jobs and subscribe for new jobs.

Parameters:

* last_id - optional, ObjectID value of a last previously seen job.
When passed, only new job data is returned.
* query - optional, MongoDB query
* fields - optional, ...
* last_id - optional, ObjectID value of a last previously seen job;
When passed, only new job data is returned;
* query - optional, MongoDB query;
* fields - optional, set of fields to return.

pages.subscribe
Get crawled pages and subscribe for new pages.

Parameters:

* last_id - optional, ObjectID value of a last previously seen page.
When passed, only new job data is returned;
* query - optional, MongoDB query;
* fields - optional, set of fields to return.


New API
Expand Down Expand Up @@ -139,7 +150,7 @@ subscribe_to_pages
Parameters:

* urls - a dictionary of <url>:<last seen page id pairs>. Arachnado will create one subscription id for all urls;
* url_groups - a dictionary of <url group id>: <dictionary like urls param>. Arachnado will create one subscription id for each url group.
* url_groups - a dictionary of <url group id>: {<url>:<last seen page id pairs>}. Arachnado will create one subscription id for each url group.

Command example::

Expand Down Expand Up @@ -196,7 +207,7 @@ set_max_message_size
Set maximum message size in bytes for websockets channel.
Messages larger than specified limit are dropped.
Default value is 2**20.
To disable this chack set max size to zero.
To disable this check set max size to zero.
Parameters:
* max_size - maximum message size in bytes.

Expand Down
3 changes: 2 additions & 1 deletion tests/items.jl
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
{"_job_id": "5749d89da8cb9c1f286e3a90","status" : 200, "body" : "<html><head></head><body></body></html>", "_type" : "page", "url" : "http://example.com/index.php", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}}
{"_job_id": "5749d89da8cb9c1f286e3a90","status" : 200, "body" : "<html><head></head><body></body></html>", "_type" : "page", "url" : "http://mysite.com/index.php", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}}
{"_job_id": "5749d89da8cb9c1f286e3fff","status" : 200, "body" : "<html><head></head><body></body></html>", "_type" : "page", "url" : "http://mysite.com", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}}
45 changes: 37 additions & 8 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ def test_pages_filter_url_groups(self):
pages_command = self.get_command("test_pages_0",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}})
yield self.execute_pages_command(pages_command, wait_result=True, required_url=url_value)

def test_pages_no_result(self):
@tornado.gen.coroutine
def f():
url_value = 'http://mysite.com'
pages_command = self.get_command("test_pages_3",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}})
yield self.execute_pages_command(pages_command,
wait_result=True,
required_url=url_value,
max_count=0)
self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=3)

def test_pages_exact_count(self):
@tornado.gen.coroutine
def f():
url_value = 'http://example.com'
pages_command = self.get_command("test_pages_4",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}})
yield self.execute_pages_command(pages_command,
wait_result=True,
required_url=url_value,
max_count=1)
self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=3)

@tornado.testing.gen_test
def test_pages_no_filter(self):
pages_command = self.get_command("test_pages_1",'subscribe_to_pages', {})
Expand All @@ -96,7 +118,7 @@ def get_command(self, id, method, params):
}

@tornado.gen.coroutine
def execute_pages_command(self, pages_command, wait_result=False, required_url=None):
def execute_pages_command(self, pages_command, wait_result=False, required_url=None, max_count=None):
ws_url = "ws://localhost:" + str(self.get_http_port()) + self.pages_uri
ws_client = yield tornado.websocket.websocket_connect(ws_url)
ws_client.write_message(json.dumps(pages_command))
Expand All @@ -110,14 +132,21 @@ def execute_pages_command(self, pages_command, wait_result=False, required_url=N
subs_id = group_sub_ids[group_id]
self.assertNotEqual(subs_id, -1)
if wait_result:
response = yield ws_client.read_message()
json_response = json.loads(response)
if json_response is None:
self.fail("incorrect response")
if max_count is None:
response = yield ws_client.read_message()
json_response = json.loads(response)
if json_response is None:
self.fail("incorrect response")
else:
self.assertTrue('url' in json_response)
if required_url:
self.assertTrue(required_url in json_response["url"])
cnt = 0
while True:
response = yield ws_client.read_message()
json_response = json.loads(response)
if json_response is None:
self.fail("incorrect response")
cnt += 1
if cnt > max_count:
self.fail("max count of pages exceeded")
yield self.execute_cancel(ws_client, subs_id, True)

@tornado.testing.gen_test
Expand Down

0 comments on commit 87304b8

Please sign in to comment.