From 68d30b5576a14d963a9a39ae6d4baa1cfa52bdc1 Mon Sep 17 00:00:00 2001 From: Liu Bao Date: Tue, 22 Mar 2022 18:39:07 +0800 Subject: [PATCH] Fix store duplicate chunk and meta per subtask (#2845) --- mars/services/subtask/worker/processor.py | 25 +++++++++-------- .../subtask/worker/tests/subtask_processor.py | 28 +++++++++++++++++++ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index 4e23991837..c73cbfe9f4 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -284,20 +284,20 @@ async def _store_data(self, chunk_graph: ChunkGraph): ] # store data into storage - puts = [] - stored_keys = [] + data_key_to_puts = {} for result_chunk in result_chunks: data_key = result_chunk.key if data_key in self._datastore: # non shuffle op - stored_keys.append(data_key) result_data = self._datastore[data_key] # update meta if not isinstance(result_data, tuple): result_chunk.params = result_chunk.get_params_from_data(result_data) - + # check data_key after update meta + if data_key in data_key_to_puts: + continue put = self._storage_api.put.delay(data_key, result_data) - puts.append(put) + data_key_to_puts[data_key] = put else: assert isinstance(result_chunk.op, MapReduceOperand) keys = [ @@ -306,12 +306,15 @@ async def _store_data(self, chunk_graph: ChunkGraph): if isinstance(store_key, tuple) and store_key[0] == data_key ] for key in keys: - stored_keys.append(key) + if key in data_key_to_puts: + continue result_data = self._datastore[key] put = self._storage_api.put.delay(key, result_data) - puts.append(put) + data_key_to_puts[key] = put + stored_keys = list(data_key_to_puts.keys()) + puts = data_key_to_puts.values() logger.debug( - "Start putting data keys: %s, " "subtask id: %s", + "Start putting data keys: %s, subtask id: %s", stored_keys, self.subtask.subtask_id, ) @@ -327,20 +330,20 @@ async def _store_data(self, chunk_graph: ChunkGraph): data_key_to_memory_size[store_key] = store_info.memory_size data_key_to_object_id[store_key] = store_info.object_id logger.debug( - "Finish putting data keys: %s, " "subtask id: %s", + "Finish putting data keys: %s, subtask id: %s", stored_keys, self.subtask.subtask_id, ) except asyncio.CancelledError: logger.debug( - "Cancelling put data keys: %s, " "subtask id: %s", + "Cancelling put data keys: %s, subtask id: %s", stored_keys, self.subtask.subtask_id, ) put_infos.cancel() logger.debug( - "Cancelled put data keys: %s, " "subtask id: %s", + "Cancelled put data keys: %s, subtask id: %s", stored_keys, self.subtask.subtask_id, ) diff --git a/mars/services/subtask/worker/tests/subtask_processor.py b/mars/services/subtask/worker/tests/subtask_processor.py index 9f49ca478e..603caa17bc 100644 --- a/mars/services/subtask/worker/tests/subtask_processor.py +++ b/mars/services/subtask/worker/tests/subtask_processor.py @@ -19,6 +19,33 @@ from ...worker.processor import SubtaskProcessor +class CheckStorageAPI: + def __init__(self, storage_api): + self._storage_api = storage_api + self._put_data_keys = set() + + def __getattr__(self, item): + return getattr(self._storage_api, item) + + @property + def put(self): + owner = self + put = self._storage_api.put + + class _PutWrapper: + def delay(self, data_key: str, obj: object, level=None): + if data_key in owner._put_data_keys: + raise Exception(f"Duplicate data put: {data_key}, obj: {obj}") + else: + owner._put_data_keys.add(data_key) + return put.delay(data_key, obj, level) + + def __getattr__(self, item): + return getattr(put, item) + + return _PutWrapper() + + class CheckedSubtaskProcessor(ObjectCheckMixin, SubtaskProcessor): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -37,6 +64,7 @@ def __init__(self, *args, **kwargs): check_options[key] = kwargs.get(key, True) self._check_options = check_options self._check_keys = kwargs.get("check_keys") + self._storage_api = CheckStorageAPI(self._storage_api) def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): super()._execute_operand(ctx, op)