Skip to content

Commit

Permalink
Implement storage lib of Arrow plasma as well as disk (#1904)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekaisheng authored Feb 1, 2021
1 parent b271cdd commit b190fcd
Show file tree
Hide file tree
Showing 15 changed files with 1,028 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ if [ -n "$WITH_CYTHON" ]; then
fi
if [ -z "$NO_COMMON_TESTS" ]; then
mkdir -p build
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded mars/tensor mars/dataframe mars/web mars/learn mars/remote
pytest $PYTEST_CONFIG --cov-config .coveragerc-threaded mars/tensor mars/dataframe mars/web \
mars/learn mars/remote mars/storage
mv .coverage build/.coverage.tensor.file
pytest $PYTEST_CONFIG --cov-config .coveragerc --forked --ignore mars/tensor --ignore mars/dataframe \
--ignore mars/learn --ignore mars/remote mars
Expand Down
145 changes: 145 additions & 0 deletions mars/aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import functools
from concurrent.futures import Executor
from typing import Any, Type


def _make_delegate_method(attr):
async def method(self, *args, **kwargs):
func = functools.partial(getattr(self._file, attr), *args, **kwargs)
return await self._loop.run_in_executor(self._executor, func)

return method


def _make_proxy_method(attr):
def method(self, *args, **kwargs):
return getattr(self._file, attr)(*args, **kwargs)

return method


def _make_proxy_property(attr):
def proxy_property(self):
return getattr(self._file, attr)

return property(proxy_property)


def delegate_to_executor(*attrs):
def wrap_cls(cls: Type):
for attr in attrs:
setattr(cls, attr, _make_delegate_method(attr))
return cls

return wrap_cls


def proxy_method_directly(*attrs):
def wrap_cls(cls: Type):
for attr in attrs:
setattr(cls, attr, _make_proxy_method(attr))
return cls

return wrap_cls


def proxy_property_directly(*attrs):
def wrap_cls(cls):
for attr in attrs:
setattr(cls, attr, _make_proxy_property(attr))
return cls

return wrap_cls


class AioBase:
def __init__(self,
file: Any,
loop: asyncio.BaseEventLoop = None,
executor: Executor = None):
if loop is None:
loop = asyncio.get_event_loop()
if isinstance(file, AioBase):
file = file._file

self._file = file
self._loop = loop
self._executor = executor


@delegate_to_executor(
"close",
"flush",
"isatty",
"read",
"read1",
"readinto",
"readline",
"readlines",
"seek",
"seekable",
"tell",
"truncate",
"writable",
"write",
"writelines",
)
@proxy_method_directly("fileno", "readable")
@proxy_property_directly("closed", "name", "mode")
class AioFileObject(AioBase):
def __aiter__(self):
return self

async def __anext__(self):
"""Simulate normal file iteration."""
line = await self.readline()
if line:
return line
else:
raise StopAsyncIteration

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
self._file = None


@delegate_to_executor(
"cat",
"ls",
"delete",
"disk_usage",
"stat",
"rm",
"mv",
"rename",
"mkdir",
"exists",
"isdir",
"isfile",
"read_parquet",
"walk",
)
@proxy_property_directly("pathsep")
class AioFilesystem(AioBase):
async def open(self, *args, **kwargs):
func = functools.partial(self._file.open, *args, **kwargs)
file = await self._loop.run_in_executor(self._executor, func)
return AioFileObject(file)
2 changes: 1 addition & 1 deletion mars/learn/cluster/tests/test_k_means.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def testKMeansFitPredict(self):
# using more than one thread, the absolute values of the labels can be
# different between the 2 strategies but they should correspond to the same
# clustering.
self.assertEqual(v_measure_score(labels_1, labels_2), 1)
self.assertAlmostEqual(v_measure_score(labels_1, labels_2), 1)

def testTransform(self):
centers = np.array([
Expand Down
1 change: 1 addition & 0 deletions mars/serialization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from .core import serialize, deserialize
from .core import AioSerializer, AioDeserializer

from . import arrow, cuda, numpy, scipy, mars_objects
del arrow, cuda, numpy, scipy, mars_objects
70 changes: 69 additions & 1 deletion mars/serialization/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
import sys
from typing import Dict, List
from io import BytesIO
from typing import Any, Dict, List

from ..utils import TypeDispatcher

Expand Down Expand Up @@ -212,3 +214,69 @@ def serialize(obj):
def deserialize(header: Dict, buffers: List):
serializer = _deserializers[header.pop('serializer')]
return serializer.deserialize(header, buffers)


DEFAULT_SERIALIZATION_VERSION = 0
BUFFER_SIZES_NAME = 'buf_sizes'


class AioSerializer:
def __init__(self,
obj: Any,
compress=0):
self._obj = obj
self._compress = compress

def _get_buffers(self):
headers, buffers = serialize(self._obj)

# add buffer lengths into headers
headers[BUFFER_SIZES_NAME] = [getattr(buf, 'nbytes', len(buf))
for buf in buffers]
header = pickle.dumps(headers)

# gen header buffer
header_bio = BytesIO()
# write version first
header_bio.write(struct.pack('B', DEFAULT_SERIALIZATION_VERSION))
# write header length
header_bio.write(struct.pack('<Q', len(header)))
# write compression
header_bio.write(struct.pack('<H', self._compress))
# write header
header_bio.write(header)

out_buffers = list()
out_buffers.append(header_bio.getbuffer())
out_buffers.extend(buffers)

return out_buffers

async def run(self):
return self._get_buffers()


class AioDeserializer:
def __init__(self, file):
self._file = file

async def _get_obj(self):
header_bytes = bytes(await self._file.read(11))
version = struct.unpack('B', header_bytes[:1])[0]
# now we only have default version
assert version == DEFAULT_SERIALIZATION_VERSION
# header length
header_length = struct.unpack('<Q', header_bytes[1:9])[0]
# compress
_ = struct.unpack('<H', header_bytes[9:])[0]
# extract header
header = pickle.loads(await self._file.read(header_length))
# get buffer size
buffer_sizes = header.pop(BUFFER_SIZES_NAME)
# get buffers
buffers = [await self._file.read(size) for size in buffer_sizes]

return deserialize(header, buffers)

async def run(self):
return await self._get_obj()
19 changes: 19 additions & 0 deletions mars/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2020 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from .filesystem import FileSystemStorage
from .plasma import PlasmaStorage
Loading

0 comments on commit b190fcd

Please sign in to comment.