Skip to content

Commit

Permalink
Initial version of storagestream inheritance
Browse files Browse the repository at this point in the history
    * Allow python programs to inherit from 'StorageStream' instead of using
      the '@stream' directive.
  • Loading branch information
Juanjo Costa authored and yolandab committed Feb 8, 2023
1 parent b42a234 commit 3f4ed8b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 5 deletions.
3 changes: 2 additions & 1 deletion hecuba_py/hecuba/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ def __init__(self):
from .storageobj import StorageObj
from .hdict import StorageDict
from .hnumpy import StorageNumpy
from .storagestream import StorageStream
from .hfilter import hfilter

if not filter == hfilter:
Expand Down Expand Up @@ -394,4 +395,4 @@ def _intercept_numpy_method(method_name):
_intercept_numpy_method('array_equal')
_intercept_numpy_method('concatenate')

__all__ = ['StorageObj', 'StorageDict', 'StorageNumpy', 'Parser']
__all__ = ['StorageObj', 'StorageDict', 'StorageNumpy', 'StorageStream', 'Parser']
3 changes: 1 addition & 2 deletions hecuba_py/hecuba/hdict.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def __init__(self, name=None, primary_keys=None, columns=None, indexed_on=None,
kwargs: other parameters
"""

super().__init__((), name=name, storage_id=storage_id, **kwargs)
super(StorageDict, self).__init__(name=name, storage_id=storage_id, **kwargs)
log.debug("CREATE StorageDict(%s,%s)", primary_keys, columns)

'''
Expand All @@ -276,7 +276,6 @@ def __init__(self, name=None, primary_keys=None, columns=None, indexed_on=None,
self._primary_keys = self._persistent_props['primary_keys']
self._columns = self._persistent_props['columns']
self._indexed_on = self._persistent_props.get('indexed_on', indexed_on)
self._stream_enabled = self._persistent_props.get('stream', False)

# Field '_istorage_metas' will be set if it exists in HECUBA.istorage
initialized = (getattr(self, '_istorage_metas', None) is not None)
Expand Down
4 changes: 2 additions & 2 deletions hecuba_py/hecuba/hnumpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def _initialize_existing_object(cls, name, storage_id):
raise TypeError(" WARNING: recovered 'view_serialization' has unexpected type ", type(myview))


IStorage.__init__(obj, name=name, storage_id=storage_id, tokens=tokens)
super().__init__(obj, name=name, storage_id=storage_id, tokens=tokens)

obj._numpy_full_loaded = False
obj._hcache = result[1]
Expand Down Expand Up @@ -379,7 +379,7 @@ def __new__(cls, input_array=None, name=None, storage_id=None, block_id=None, **
else:
obj = np.asfortranarray(input_array.copy()).view(cls) #to set the fortran contiguous flag it is necessary to do the copy before
log.debug("Created ARROW")
IStorage.__init__(obj, name=name, storage_id=storage_id, kwargs=kwargs)
super(StorageNumpy, obj).__init__(name=name, storage_id=storage_id, kwargs=kwargs)

if name or storage_id: # The object needs to be persisted
load_data= (input_array is None) and (config.load_on_demand == False)
Expand Down
10 changes: 10 additions & 0 deletions hecuba_py/hecuba/storagestream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from . import log
from .IStorage import IStorage

class StorageStream(IStorage):
def __init__(self, *args, **kwargs):
#log.debug("StorageStream: __init__")
print("StorageStream: __init__ {}".format(kwargs), flush=True)
self._stream_enabled = True
super().__init__(*args, **kwargs)
print("StorageStream: _stream_enabled {}".format(self._stream_enabled), flush=True)
15 changes: 15 additions & 0 deletions hecuba_py/tests/withcassandra/storagenumpy_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,21 @@ def test_arrow_access(self):
z = s[:, 49]
self.assertTrue(np.array_equal(z, n[:,49]))

def test_subclass(self):
from hecuba import StorageStream
n = np.arange(50*50).reshape(50,50)
class X(StorageNumpy, StorageStream):
pass

x = X(n, "test_subclass")
self.assertTrue(x._stream_enabled)

class X(StorageStream, StorageNumpy):
pass

x = X(n, "test_subclass2")
self.assertTrue(x._stream_enabled)


if __name__ == '__main__':
unittest.main()

0 comments on commit 3f4ed8b

Please sign in to comment.