Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,28 @@ notifications:


install:
# See Dockerfile
#- pip install git+https://github.com/HASTE-project/HarmonicPE.git@master
#- pip install git+https://github.com/HASTE-project/windowed-conformal-model.git@master
#- pip install git+https://github.com/HASTE-project/[email protected]
# See Dockerfile

# Checkout and install Harmonic PE (latest):
- git clone https://github.com/HASTE-project/HarmonicPE.git
- cd HarmonicPE
- git checkout master
- pip3 install .
- cd ..

# Checkout and install the The Windowed Conformal Model (latest):
# This needs to use -e for the .npy files.
- git clone https://github.com/HASTE-project/windowed-conformal-model.git
- cd windowed-conformal-model
- git checkout master
- pip3 install -e .
- cd ..

- pip install .

# Tests broken on Travis
script:
- pytest test2
- pytest tests

before_deploy:
- docker build --no-cache=true -t "benblamey/haste-image-proc:latest" .
Expand Down
24 changes: 9 additions & 15 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,28 @@ FROM python:3.6.3
WORKDIR /app

# GOTCHA: git clone gets cached - use --no-cache !!
# https://github.com/moby/moby/issues/1996
# https://stackoverflow.com/questions/36996046/

# Install dependendencies not on PyPI:

# Checkout and install Harmonic PE (latest):
RUN git clone https://github.com/HASTE-project/HarmonicPE.git;cd /app/HarmonicPE;git checkout master;pip3 install .

# Checkout and install the Haste Storage Client (specific version):
#RUN git clone https://github.com/HASTE-project/HasteStorageClient.git;cd /app/HasteStorageClient;git checkout v0.8;pip3 install -e .

# Checkout and install the The Windowed Conformal Model (latest):
# This needs to use -e for the npy files.
# This needs to use -e for the .npy files.
RUN git clone https://github.com/HASTE-project/windowed-conformal-model.git;cd /app/windowed-conformal-model;git checkout master;pip3 install -e .

# Install packages for image analysis
# TODO: use setup.py for dependencies.
#RUN pip3 install numpy
#RUN pip3 install Pillow
#RUN pip3 install scikit-image

# TODO investigate installing scikit-sparse here?
#RUN conda install -c conda-forge scikit-sparse

# Make port 80 available (required for the listening daemon)
EXPOSE 80

COPY haste_processing_node /app/haste_processing_node

# Install dependendencies from setup.py:

COPY setup.py /app/setup.py
RUN pip3 install /app

CMD ["python", "-m", "haste_processing_node.function"]
# Make port 80 available (required for the listening daemon)
EXPOSE 80

CMD ["python", "-m", "haste_processing_node.function"]
37 changes: 20 additions & 17 deletions haste_processing_node/function.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from harmonicPE.daemon import listen_for_tasks

from haste_processing_node.image_analysis.image_analysis import extract_features
from .haste_storage_client_cache import get_storage_client
from .haste_storage_client_cache import get_storage_client_az_lnp, get_storage_client_vironova
from .simulator_messages import split_metadata_and_data

# This constant seems to be specific to the microscope/AZN
GREEN_COLOR_CHANNEL = 2
AZN_LNP_GREEN_COLOR_CHANNEL = 2


# TODO: This will break on MACOSX (see HIO code for fix)
Expand All @@ -17,20 +17,30 @@ def process_data(message_bytes):

metadata, image_bytes = split_metadata_and_data(message_bytes)

extracted_features = extract_image_features(metadata, image_bytes)

# TODO: rename to 'course_features' or 'features_level_0' ?
metadata['extracted_features'] = extracted_features

# metadata['containerID'] = hostname

# Get a storage client for the cache, and use it to save the blob and all metadata:
stream_id = metadata.pop('stream_id') # Identifies the data in storage - across all processing nodes. Required.
timestamp_cloud_edge = metadata.pop('timestamp') # Required
location = metadata.pop('location', None) # Optional
substream_id = metadata.pop('substream_id', None) # Optional

haste_storage_client = get_storage_client(stream_id)
if metadata.get('tag', None) == 'vironova':
# Vironova image stream
extracted_features = extract_features(image_bytes)
haste_storage_client = get_storage_client_vironova(stream_id)

else:
# Assume AZ LNP dataset (from simulator)
# TODO: simulator should send a 'tag'
# Use a client with the conformal prediction
haste_storage_client = get_storage_client_az_lnp(stream_id)
if metadata.get('color_channel', None) == AZN_LNP_GREEN_COLOR_CHANNEL:
extracted_features = extract_features(image_bytes)
else:
extracted_features = {}

# TODO: rename to 'course_features' or 'features_level_0' ?
metadata['extracted_features'] = extracted_features
# metadata['containerID'] = hostname

haste_storage_client.save(timestamp_cloud_edge,
location,
Expand All @@ -41,13 +51,6 @@ def process_data(message_bytes):
print('saved to storage!', flush=True)


def extract_image_features(metadata, image_bytes):
if (metadata.get('color_channel', None) == GREEN_COLOR_CHANNEL) or metadata.get('tag', None) == 'vironova':
return extract_features(image_bytes)
else:
return {}


# TODO: add toy example with local image to run extraction locally.


Expand Down
33 changes: 28 additions & 5 deletions haste_processing_node/haste_storage_client_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import os.path
import urllib.request

haste_storage_clients = {}
haste_storage_clients_az_lnp = {}
haste_storage_clients_vironova = {}


def __get_magic_haste_client_config_from_server(host):
Expand Down Expand Up @@ -39,8 +40,9 @@ def __get_haste_storage_client_config():
print('failed reading config from all locations', flush=True)


def get_storage_client(stream_id):
if stream_id not in haste_storage_clients:
def get_storage_client_az_lnp(stream_id):
# For the Vironova dataset, streamed from microscope.
if stream_id not in haste_storage_clients_az_lnp:
haste_storage_client_config = __get_haste_storage_client_config()

model = ConformalInterestingnessModel()
Expand All @@ -52,11 +54,32 @@ def get_storage_client(stream_id):

print('creating client for stream ID: ' + stream_id, flush=True)

haste_storage_clients[stream_id] = client
haste_storage_clients_az_lnp[stream_id] = client

# TODO: only cache N clients.

return haste_storage_clients[stream_id]
return haste_storage_clients_az_lnp[stream_id]


def get_storage_client_vironova(stream_id):
if stream_id not in haste_storage_clients_vironova:
haste_storage_client_config = __get_haste_storage_client_config()

# Default to 1.0
model = None

client = HasteStorageClient(stream_id,
config=haste_storage_client_config,
interestingness_model=model,
storage_policy=[(0.0, 1.0, OS_SWIFT_STORAGE)]) # discard blobs which don't match the policy.

print('creating client for stream ID: ' + stream_id, flush=True)

haste_storage_clients_az_lnp[stream_id] = client

# TODO: only cache N clients.

return haste_storage_clients_az_lnp[stream_id]


if __name__ == '__main__':
Expand Down
3 changes: 3 additions & 0 deletions test2/test_foo2.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@

# The tests don't work on TravisCI -- so use this dummy test instead.

def test_foo():
assert True
5 changes: 2 additions & 3 deletions tests/test_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from haste_processing_node.function import process_data, extract_image_features
from haste_processing_node.function import process_data, extract_features
import pickle
import time

Expand Down Expand Up @@ -29,6 +29,5 @@ def test_extract_image_features():
image_bytes = bytes(fh.read())
fh.close()

extracted_features = extract_image_features({'location': (12.34, 56.78),
'image_length_bytes': len(image_bytes)}, image_bytes)
extracted_features = extract_features(image_bytes)
print(extracted_features)