diff --git a/arachnado/rpc/data.py b/arachnado/rpc/data.py index 9294c50..bd67164 100644 --- a/arachnado/rpc/data.py +++ b/arachnado/rpc/data.py @@ -226,11 +226,14 @@ def create_subscribtion_to_urls(self, urls): jobs_q = self.create_jobs_query(url) jobs_ds = yield jobs.storage.fetch(jobs_q) job_ids =[x["_id"] for x in jobs_ds] - storage.job_ids.update(job_ids) - pages_query = storage.create_pages_query(job_ids, last_id) - storage.filters.append(pages_query) - storage.jobs.append(jobs) - jobs_to_subscribe.append([jobs_q, jobs]) + if job_ids: + storage.job_ids.update(job_ids) + pages_query = storage.create_pages_query(job_ids, last_id) + storage.filters.append(pages_query) + storage.jobs.append(jobs) + jobs_to_subscribe.append([jobs_q, jobs]) + else: + logger.info("No jobs found for url {}".format(url)) storage.subscribe_to_pages() for jobs_q, jobs in jobs_to_subscribe: jobs.subscribe(query=jobs_q) diff --git a/arachnado/rpc/ws.py b/arachnado/rpc/ws.py index 420db0d..9a39ccc 100644 --- a/arachnado/rpc/ws.py +++ b/arachnado/rpc/ws.py @@ -20,12 +20,12 @@ class RpcWebsocketHandler(ArachnadoRPC, websocket.WebSocketHandler): """ def on_message(self, message): - try: - data = json.loads(message) - except (TypeError, ValueError): - logger.warn('Invalid message skipped: {!r}'.format(message[:500])) - return - self.handle_request(json.dumps(data)) + # try: + # data = json.loads(message) + # except (TypeError, ValueError): + # logger.warn('Invalid message skipped: {!r}'.format(message[:500])) + # return + self.handle_request(message) def send_data(self, data): self.write_event(data) diff --git a/docs/json-rpc-api.rst b/docs/json-rpc-api.rst index ef46b68..abb01d8 100644 --- a/docs/json-rpc-api.rst +++ b/docs/json-rpc-api.rst @@ -29,13 +29,14 @@ JSON-RPC responses:: "result": ... } -Working with jobs ------------------ +Working with jobs and pages +--------------------------- JSON-RPC API allows to * get information about scraping jobs; * start new crawls; +* subscripbe to crawled pages; * subscribe to job updates. jobs.subscribe @@ -43,10 +44,20 @@ jobs.subscribe Parameters: - * last_id - optional, ObjectID value of a last previously seen job. - When passed, only new job data is returned. - * query - optional, MongoDB query - * fields - optional, ... + * last_id - optional, ObjectID value of a last previously seen job; + When passed, only new job data is returned; + * query - optional, MongoDB query; + * fields - optional, set of fields to return. + +pages.subscribe + Get crawled pages and subscribe for new pages. + + Parameters: + + * last_id - optional, ObjectID value of a last previously seen page. + When passed, only new job data is returned; + * query - optional, MongoDB query; + * fields - optional, set of fields to return. New API @@ -139,7 +150,7 @@ subscribe_to_pages Parameters: * urls - a dictionary of :. Arachnado will create one subscription id for all urls; - * url_groups - a dictionary of : . Arachnado will create one subscription id for each url group. + * url_groups - a dictionary of : {:}. Arachnado will create one subscription id for each url group. Command example:: @@ -196,7 +207,7 @@ set_max_message_size Set maximum message size in bytes for websockets channel. Messages larger than specified limit are dropped. Default value is 2**20. - To disable this chack set max size to zero. + To disable this check set max size to zero. Parameters: * max_size - maximum message size in bytes. diff --git a/tests/items.jl b/tests/items.jl index 54029f6..c39786c 100644 --- a/tests/items.jl +++ b/tests/items.jl @@ -1 +1,2 @@ -{"_job_id": "5749d89da8cb9c1f286e3a90","status" : 200, "body" : "", "_type" : "page", "url" : "http://example.com/index.php", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}} \ No newline at end of file +{"_job_id": "5749d89da8cb9c1f286e3a90","status" : 200, "body" : "", "_type" : "page", "url" : "http://mysite.com/index.php", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}} +{"_job_id": "5749d89da8cb9c1f286e3fff","status" : 200, "body" : "", "_type" : "page", "url" : "http://mysite.com", "items" : [ ], "headers" : { "Cache-Control" : [ "private, no-cache=\"set-cookie\"" ], "X-Powered-By" : [ "PHP/5.5.9-1ubuntu4.14" ], "Date" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Content-Type" : [ "text/html; charset=UTF-8" ], "Expires" : [ "Sat, 28 May 2016 17:43:05 GMT" ], "Vary" : [ "Accept-Encoding" ], "Server" : [ "Apache/2.4.7 (Ubuntu)" ] }, "meta" : { "download_timeout" : 180, "depth" : 2}} \ No newline at end of file diff --git a/tests/test_data.py b/tests/test_data.py index 506af55..d60408f 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -76,6 +76,28 @@ def test_pages_filter_url_groups(self): pages_command = self.get_command("test_pages_0",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}}) yield self.execute_pages_command(pages_command, wait_result=True, required_url=url_value) + def test_pages_no_result(self): + @tornado.gen.coroutine + def f(): + url_value = 'http://mysite.com' + pages_command = self.get_command("test_pages_3",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}}) + yield self.execute_pages_command(pages_command, + wait_result=True, + required_url=url_value, + max_count=0) + self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=3) + + def test_pages_exact_count(self): + @tornado.gen.coroutine + def f(): + url_value = 'http://example.com' + pages_command = self.get_command("test_pages_4",'subscribe_to_pages', {'url_groups': {1: {url_value: None}}}) + yield self.execute_pages_command(pages_command, + wait_result=True, + required_url=url_value, + max_count=1) + self.assertRaises(TimeoutError, self.io_loop.run_sync, f, timeout=3) + @tornado.testing.gen_test def test_pages_no_filter(self): pages_command = self.get_command("test_pages_1",'subscribe_to_pages', {}) @@ -96,7 +118,7 @@ def get_command(self, id, method, params): } @tornado.gen.coroutine - def execute_pages_command(self, pages_command, wait_result=False, required_url=None): + def execute_pages_command(self, pages_command, wait_result=False, required_url=None, max_count=None): ws_url = "ws://localhost:" + str(self.get_http_port()) + self.pages_uri ws_client = yield tornado.websocket.websocket_connect(ws_url) ws_client.write_message(json.dumps(pages_command)) @@ -110,14 +132,21 @@ def execute_pages_command(self, pages_command, wait_result=False, required_url=N subs_id = group_sub_ids[group_id] self.assertNotEqual(subs_id, -1) if wait_result: - response = yield ws_client.read_message() - json_response = json.loads(response) - if json_response is None: - self.fail("incorrect response") + if max_count is None: + response = yield ws_client.read_message() + json_response = json.loads(response) + if json_response is None: + self.fail("incorrect response") else: - self.assertTrue('url' in json_response) - if required_url: - self.assertTrue(required_url in json_response["url"]) + cnt = 0 + while True: + response = yield ws_client.read_message() + json_response = json.loads(response) + if json_response is None: + self.fail("incorrect response") + cnt += 1 + if cnt > max_count: + self.fail("max count of pages exceeded") yield self.execute_cancel(ws_client, subs_id, True) @tornado.testing.gen_test