Skip to content

Commit

Permalink
Fix store duplicate chunk and meta per subtask (#2845)
Browse files Browse the repository at this point in the history
  • Loading branch information
fyrestone authored Mar 22, 2022
1 parent 4ab8a46 commit 68d30b5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
25 changes: 14 additions & 11 deletions mars/services/subtask/worker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,20 @@ async def _store_data(self, chunk_graph: ChunkGraph):
]

# store data into storage
puts = []
stored_keys = []
data_key_to_puts = {}
for result_chunk in result_chunks:
data_key = result_chunk.key
if data_key in self._datastore:
# non shuffle op
stored_keys.append(data_key)
result_data = self._datastore[data_key]
# update meta
if not isinstance(result_data, tuple):
result_chunk.params = result_chunk.get_params_from_data(result_data)

# check data_key after update meta
if data_key in data_key_to_puts:
continue
put = self._storage_api.put.delay(data_key, result_data)
puts.append(put)
data_key_to_puts[data_key] = put
else:
assert isinstance(result_chunk.op, MapReduceOperand)
keys = [
Expand All @@ -306,12 +306,15 @@ async def _store_data(self, chunk_graph: ChunkGraph):
if isinstance(store_key, tuple) and store_key[0] == data_key
]
for key in keys:
stored_keys.append(key)
if key in data_key_to_puts:
continue
result_data = self._datastore[key]
put = self._storage_api.put.delay(key, result_data)
puts.append(put)
data_key_to_puts[key] = put
stored_keys = list(data_key_to_puts.keys())
puts = data_key_to_puts.values()
logger.debug(
"Start putting data keys: %s, " "subtask id: %s",
"Start putting data keys: %s, subtask id: %s",
stored_keys,
self.subtask.subtask_id,
)
Expand All @@ -327,20 +330,20 @@ async def _store_data(self, chunk_graph: ChunkGraph):
data_key_to_memory_size[store_key] = store_info.memory_size
data_key_to_object_id[store_key] = store_info.object_id
logger.debug(
"Finish putting data keys: %s, " "subtask id: %s",
"Finish putting data keys: %s, subtask id: %s",
stored_keys,
self.subtask.subtask_id,
)
except asyncio.CancelledError:
logger.debug(
"Cancelling put data keys: %s, " "subtask id: %s",
"Cancelling put data keys: %s, subtask id: %s",
stored_keys,
self.subtask.subtask_id,
)
put_infos.cancel()

logger.debug(
"Cancelled put data keys: %s, " "subtask id: %s",
"Cancelled put data keys: %s, subtask id: %s",
stored_keys,
self.subtask.subtask_id,
)
Expand Down
28 changes: 28 additions & 0 deletions mars/services/subtask/worker/tests/subtask_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,33 @@
from ...worker.processor import SubtaskProcessor


class CheckStorageAPI:
def __init__(self, storage_api):
self._storage_api = storage_api
self._put_data_keys = set()

def __getattr__(self, item):
return getattr(self._storage_api, item)

@property
def put(self):
owner = self
put = self._storage_api.put

class _PutWrapper:
def delay(self, data_key: str, obj: object, level=None):
if data_key in owner._put_data_keys:
raise Exception(f"Duplicate data put: {data_key}, obj: {obj}")
else:
owner._put_data_keys.add(data_key)
return put.delay(data_key, obj, level)

def __getattr__(self, item):
return getattr(put, item)

return _PutWrapper()


class CheckedSubtaskProcessor(ObjectCheckMixin, SubtaskProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -37,6 +64,7 @@ def __init__(self, *args, **kwargs):
check_options[key] = kwargs.get(key, True)
self._check_options = check_options
self._check_keys = kwargs.get("check_keys")
self._storage_api = CheckStorageAPI(self._storage_api)

def _execute_operand(self, ctx: Dict[str, Any], op: OperandType):
super()._execute_operand(ctx, op)
Expand Down

0 comments on commit 68d30b5

Please sign in to comment.