Skip to content
30 changes: 20 additions & 10 deletions nanshe/box/spams_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ def call_multiprocessing_queue_spams_trainDL(*args, **kwargs):
"""

# Only necessary for dealing with SPAMS
import multiprocessing
try:
import billiard as multiprocessing
mp = multiprocessing.get_context("spawn")
except ImportError:
import multiprocessing as mp

out_queue = multiprocessing.Queue()
out_queue = mp.Queue()

queue_args = (out_queue,) + args
p = multiprocessing.Process(
p = mp.Process(
target=run_multiprocessing_queue_spams_trainDL,
args=queue_args,
kwargs=kwargs
Expand Down Expand Up @@ -262,7 +266,11 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs):
"""

# Only necessary for dealing with SPAMS
import multiprocessing
try:
import billiard as multiprocessing
mp = multiprocessing.get_context("spawn")
except ImportError:
import multiprocessing as mp

# Just to make sure this exists in the new process. Shouldn't be necessary.
import numpy
Expand All @@ -285,9 +293,11 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs):
)

# Create a shared array to contain X
X_array = multiprocessing.Array(X_array_ctype,
X.size,
lock=False)
X_array = mp.Array(
X_array_ctype,
X.size,
lock=False
)

# Copy over the contents of X.
X_array_numpy = numpy.frombuffer(
Expand All @@ -307,7 +317,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs):
)

# Create a shared array to contain D
D_array = multiprocessing.Array(D_array_ctype,
D_array = mp.Array(D_array_ctype,
D.size,
lock=False)

Expand All @@ -329,7 +339,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs):
)

# Create a shared array to contain the result
result_array = multiprocessing.Array(
result_array = mp.Array(
result_array_ctype,
numpy.product(result_array_type._shape_),
lock=False
Expand All @@ -343,7 +353,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs):
new_args = new_args + (
D_is_arg, D_array_type, D_array,
)
p = multiprocessing.Process(
p = mp.Process(
target=run_multiprocessing_array_spams_trainDL,
args=new_args,
kwargs=kwargs
Expand Down
67 changes: 32 additions & 35 deletions nanshe/learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
import os
import json
import itertools
import multiprocessing

try:
import billiard as multiprocessing
mp = multiprocessing.get_context("spawn")
except ImportError:
import multiprocessing as mp

import subprocess
import time

Expand Down Expand Up @@ -78,20 +84,19 @@ def generate_neurons_io_handler(input_filename,
parameters_filename JSON filename with parameters.
"""


# Extract and validate file extensions.

# Parse parameter filename and validate that the name is acceptable
parameters_filename_details = pathHelpers.PathComponents(
parameters_filename
)
parameters_filename_ext = parameters_filename_details.extension
parameters_filename_ext = parameters_filename_ext.lower().lstrip(os.extsep)
# Clean up the extension so it fits the standard.
if (parameters_filename_ext not in ["json"]):
if (parameters_filename_details.extension.lower().lstrip(os.extsep) not in ["json"]):
raise Exception(
"Parameter file with filename: \"" + parameters_filename + "\"" +
" provided with an unknown file extension: \"" +
parameters_filename_ext + "\". If it is a " +
parameters_filename_details.extension + "\". If it is a " +
"supported format, please run the given file through " +
"nanshe_converter first before proceeding."
)
Expand Down Expand Up @@ -128,32 +133,29 @@ def generate_neurons_a_block(input_filename,
parameters how the run should be configured.
"""


# Extract and validate file extensions.

# Parse input filename and validate that the name is acceptable
input_filename_details = pathHelpers.PathComponents(input_filename)
# Clean up the extension so it fits the standard.
input_filename_ext = input_filename_details.extension
input_filename_ext = input_filename_ext.lower().lstrip(os.extsep)
if (input_filename_ext not in ["h5", "hdf5", "he5"]):
if (input_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]):
raise Exception(
"Input file with filename: \"" + input_filename + "\"" +
" provided with an unknown file extension: \"" +
input_filename_ext + "\". If it is a supported " +
input_filename_details.extension + "\". If it is a supported " +
"format, please run the given file through " +
"nanshe_converter first before proceeding."
)

# Parse output filename and validate that the name is acceptable
output_filename_details = pathHelpers.PathComponents(output_filename)
# Clean up the extension so it fits the standard.
output_filename_ext = output_filename_details.extension
output_filename_ext = output_filename_ext.lower().lstrip(os.extsep)
if (output_filename_ext not in ["h5", "hdf5", "he5"]):
if (output_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]):
raise Exception(
"Output file with filename: \"" + output_filename + "\"" +
" provided with an unknown file extension: \"" +
output_filename_ext + "\". If it is a supported " +
output_filename_details.extension + "\". If it is a supported " +
"format, please run the given file through nanshe_converter " +
"first before proceeding."
)
Expand Down Expand Up @@ -226,7 +228,7 @@ def generate_neurons_a_block(input_filename,
@prof.log_call(trace_logger)
def generate_neurons_blocks(input_filename,
output_filename,
num_processes=multiprocessing.cpu_count(),
num_processes=mp.cpu_count(),
block_shape=None,
num_blocks=None,
half_window_shape=None,
Expand All @@ -235,35 +237,31 @@ def generate_neurons_blocks(input_filename,
num_drmaa_cores=16,
debug=False,
**parameters):
# TODO: Move function into new module with its own command line interface.
# TODO: Move this function into a new module with its own command line interface.
# TODO: Heavy refactoring required on this function.

# Extract and validate file extensions.

# Parse input filename and validate that the name is acceptable
input_filename_details = pathHelpers.PathComponents(input_filename)
# Clean up the extension so it fits the standard.
input_filename_ext = input_filename_details.extension
input_filename_ext = input_filename_ext.lower().lstrip(os.extsep)
if (input_filename_ext not in ["h5", "hdf5", "he5"]):
if (input_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]):
raise Exception(
"Input file with filename: \"" + input_filename + "\"" +
" provided with an unknown file extension: \"" +
input_filename_ext + "\". If it is a supported " +
"format, please run the given file through " +
"nanshe_converter first before proceeding."
input_filename_details.extension + "\". If it is a supported " +
"format, please run the given file through nanshe_converter " +
"first before proceeding."
)

# Parse output filename and validate that the name is acceptable
output_filename_details = pathHelpers.PathComponents(output_filename)
# Clean up the extension so it fits the standard.
output_filename_ext = output_filename_details.extension
output_filename_ext = output_filename_ext.lower().lstrip(os.extsep)
if (output_filename_ext not in ["h5", "hdf5", "he5"]):
if (output_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]):
raise Exception(
"Output file with filename: \"" + output_filename + "\"" +
" provided with an unknown file extension: \"" +
output_filename_ext + "\". If it is a supported " +
output_filename_details.extension + "\". If it is a supported " +
"format, please run the given file through nanshe_converter " +
"first before proceeding."
)
Expand Down Expand Up @@ -398,7 +396,8 @@ def generate_neurons_blocks(input_filename,
# the f0 calculation.
if "extract_f0" in parameters["generate_neurons"]["preprocess_data"]:
#assert (parameters["generate_neurons"]["preprocess_data"]["extract_f0"]["half_window_size"] == half_window_shape_array[0])
assert (parameters["generate_neurons"]["preprocess_data"]["extract_f0"]["half_window_size"] <= half_window_shape_array[0])
assert (parameters["generate_neurons"]["preprocess_data"][
"extract_f0"]["half_window_size"] <= half_window_shape_array[0])

# Estimate bounds for each slice. Uses typical python [begin, end) for the
# indices.
Expand Down Expand Up @@ -462,9 +461,8 @@ def generate_neurons_blocks(input_filename,
half_border_shape_array, reps_after=2
)

# Get slice information for the portion within
# `original_images_pared_slices["windowed"]`, which corresponds to
# `original_images_pared_slices["actual"]`.
# Get slice information for the portion within original_images_pared_slices["windowed"],
# which corresponds to original_images_pared_slices["actual"]
#original_images_pared_slices["windowed_block_selection"][..., 0] = 0
original_images_pared_slices["windowed_block_selection"][..., 1] = (
original_images_pared_slices["actual"][..., 1] - original_images_pared_slices["actual"][..., 0]
Expand Down Expand Up @@ -557,8 +555,7 @@ def generate_neurons_blocks(input_filename,
block_i = output_group_blocks[i_str]

with h5py.File(intermediate_basename_i + os.extsep + "h5", "a") as each_block_file_handle:
# Create a soft link to the original images. But use the
# appropriate type of soft link depending on whether
# Create a soft link to the original images. But use the appropriate type of soft link depending on whether
# the input and output file are the same.
if "original_images" not in each_block_file_handle:
each_block_file_handle["original_images"] = h5py.ExternalLink(
Expand Down Expand Up @@ -938,10 +935,10 @@ def generate_neurons(original_images, run_stage="all", **parameters):

if "dictionary_max_projection" not in generate_neurons.recorders.array_debug_recorder:
generate_neurons.recorders.array_debug_recorder["dictionary_max_projection"] = xnumpy.add_singleton_op(
numpy.max,
new_dictionary,
axis=0
)
numpy.max,
new_dictionary,
axis=0
)

if run_stage == "dictionary":
return
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def run_tests(self):
"fftw",
"setuptools",
"psutil",
# "billiard",
"numpy",
"scipy",
"h5py",
Expand All @@ -66,6 +67,7 @@ def run_tests(self):
"fftw",
"setuptools",
"psutil",
# "billiard",
"numpy",
"scipy",
"h5py",
Expand Down
23 changes: 14 additions & 9 deletions tests/test_nanshe/test_box/test_spams_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
import nose.plugins.attrib

import ctypes
import multiprocessing

try:
import billiard as multiprocessing
mp = multiprocessing.get_context("spawn")
except ImportError:
import multiprocessing as mp

try:
from queue import Queue
Expand Down Expand Up @@ -520,7 +525,7 @@ def test_run_multiprocessing_array_spams_trainDL_1(self):
g_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
g_array_type._dtype_.type
).type(0)[()]))
g_array = multiprocessing.Array(
g_array = mp.Array(
g_array_ctype,
numpy.product(g_array_type._shape_),
lock=False
Expand All @@ -541,7 +546,7 @@ def test_run_multiprocessing_array_spams_trainDL_1(self):
result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
result_array_type._dtype_.type
).type(0)[()]))
result_array = multiprocessing.Array(
result_array = mp.Array(
result_array_ctype,
numpy.product(result_array_type._shape_),
lock=False
Expand Down Expand Up @@ -616,7 +621,7 @@ def test_run_multiprocessing_array_spams_trainDL_2(self):
g3_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
g3_array_type._dtype_.type
).type(0)[()]))
g3_array = multiprocessing.Array(
g3_array = mp.Array(
g3_array_ctype,
numpy.product(g3_array_type._shape_),
lock=False
Expand All @@ -637,7 +642,7 @@ def test_run_multiprocessing_array_spams_trainDL_2(self):
result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
result_array_type._dtype_.type
).type(0)[()]))
result_array = multiprocessing.Array(
result_array = mp.Array(
result_array_ctype,
numpy.product(result_array_type._shape_),
lock=False
Expand Down Expand Up @@ -812,7 +817,7 @@ def test_run_multiprocessing_array_spams_trainDL_3(self):
g_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
g_array_type._dtype_.type
).type(0)[()]))
g_array = multiprocessing.Array(
g_array = mp.Array(
g_array_ctype,
numpy.product(g_array_type._shape_),
lock=False
Expand All @@ -833,7 +838,7 @@ def test_run_multiprocessing_array_spams_trainDL_3(self):
result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
result_array_type._dtype_.type
).type(0)[()]))
result_array = multiprocessing.Array(
result_array = mp.Array(
result_array_ctype,
numpy.product(result_array_type._shape_),
lock=False
Expand Down Expand Up @@ -912,7 +917,7 @@ def test_run_multiprocessing_array_spams_trainDL_4(self):
g3_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
g3_array_type._dtype_.type
).type(0)[()]))
g3_array = multiprocessing.Array(
g3_array = mp.Array(
g3_array_ctype,
numpy.product(g3_array_type._shape_),
lock=False
Expand All @@ -933,7 +938,7 @@ def test_run_multiprocessing_array_spams_trainDL_4(self):
result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype(
result_array_type._dtype_.type
).type(0)[()]))
result_array = multiprocessing.Array(
result_array = mp.Array(
result_array_ctype,
numpy.product(result_array_type._shape_),
lock=False
Expand Down