diff --git a/nanshe/box/spams_sandbox.py b/nanshe/box/spams_sandbox.py index 5a7bbeab..74557f6b 100644 --- a/nanshe/box/spams_sandbox.py +++ b/nanshe/box/spams_sandbox.py @@ -91,12 +91,16 @@ def call_multiprocessing_queue_spams_trainDL(*args, **kwargs): """ # Only necessary for dealing with SPAMS - import multiprocessing + try: + import billiard as multiprocessing + mp = multiprocessing.get_context("spawn") + except ImportError: + import multiprocessing as mp - out_queue = multiprocessing.Queue() + out_queue = mp.Queue() queue_args = (out_queue,) + args - p = multiprocessing.Process( + p = mp.Process( target=run_multiprocessing_queue_spams_trainDL, args=queue_args, kwargs=kwargs @@ -262,7 +266,11 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): """ # Only necessary for dealing with SPAMS - import multiprocessing + try: + import billiard as multiprocessing + mp = multiprocessing.get_context("spawn") + except ImportError: + import multiprocessing as mp # Just to make sure this exists in the new process. Shouldn't be necessary. import numpy @@ -285,9 +293,11 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): ) # Create a shared array to contain X - X_array = multiprocessing.Array(X_array_ctype, - X.size, - lock=False) + X_array = mp.Array( + X_array_ctype, + X.size, + lock=False + ) # Copy over the contents of X. X_array_numpy = numpy.frombuffer( @@ -307,7 +317,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): ) # Create a shared array to contain D - D_array = multiprocessing.Array(D_array_ctype, + D_array = mp.Array(D_array_ctype, D.size, lock=False) @@ -329,7 +339,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): ) # Create a shared array to contain the result - result_array = multiprocessing.Array( + result_array = mp.Array( result_array_ctype, numpy.product(result_array_type._shape_), lock=False @@ -343,7 +353,7 @@ def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): new_args = new_args + ( D_is_arg, D_array_type, D_array, ) - p = multiprocessing.Process( + p = mp.Process( target=run_multiprocessing_array_spams_trainDL, args=new_args, kwargs=kwargs diff --git a/nanshe/learner.py b/nanshe/learner.py index 03f8200c..c11a670d 100644 --- a/nanshe/learner.py +++ b/nanshe/learner.py @@ -28,7 +28,13 @@ import os import json import itertools -import multiprocessing + +try: + import billiard as multiprocessing + mp = multiprocessing.get_context("spawn") +except ImportError: + import multiprocessing as mp + import subprocess import time @@ -78,20 +84,19 @@ def generate_neurons_io_handler(input_filename, parameters_filename JSON filename with parameters. """ + # Extract and validate file extensions. # Parse parameter filename and validate that the name is acceptable parameters_filename_details = pathHelpers.PathComponents( parameters_filename ) - parameters_filename_ext = parameters_filename_details.extension - parameters_filename_ext = parameters_filename_ext.lower().lstrip(os.extsep) # Clean up the extension so it fits the standard. - if (parameters_filename_ext not in ["json"]): + if (parameters_filename_details.extension.lower().lstrip(os.extsep) not in ["json"]): raise Exception( "Parameter file with filename: \"" + parameters_filename + "\"" + " provided with an unknown file extension: \"" + - parameters_filename_ext + "\". If it is a " + + parameters_filename_details.extension + "\". If it is a " + "supported format, please run the given file through " + "nanshe_converter first before proceeding." ) @@ -128,18 +133,17 @@ def generate_neurons_a_block(input_filename, parameters how the run should be configured. """ + # Extract and validate file extensions. # Parse input filename and validate that the name is acceptable input_filename_details = pathHelpers.PathComponents(input_filename) # Clean up the extension so it fits the standard. - input_filename_ext = input_filename_details.extension - input_filename_ext = input_filename_ext.lower().lstrip(os.extsep) - if (input_filename_ext not in ["h5", "hdf5", "he5"]): + if (input_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]): raise Exception( "Input file with filename: \"" + input_filename + "\"" + " provided with an unknown file extension: \"" + - input_filename_ext + "\". If it is a supported " + + input_filename_details.extension + "\". If it is a supported " + "format, please run the given file through " + "nanshe_converter first before proceeding." ) @@ -147,13 +151,11 @@ def generate_neurons_a_block(input_filename, # Parse output filename and validate that the name is acceptable output_filename_details = pathHelpers.PathComponents(output_filename) # Clean up the extension so it fits the standard. - output_filename_ext = output_filename_details.extension - output_filename_ext = output_filename_ext.lower().lstrip(os.extsep) - if (output_filename_ext not in ["h5", "hdf5", "he5"]): + if (output_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]): raise Exception( "Output file with filename: \"" + output_filename + "\"" + " provided with an unknown file extension: \"" + - output_filename_ext + "\". If it is a supported " + + output_filename_details.extension + "\". If it is a supported " + "format, please run the given file through nanshe_converter " + "first before proceeding." ) @@ -226,7 +228,7 @@ def generate_neurons_a_block(input_filename, @prof.log_call(trace_logger) def generate_neurons_blocks(input_filename, output_filename, - num_processes=multiprocessing.cpu_count(), + num_processes=mp.cpu_count(), block_shape=None, num_blocks=None, half_window_shape=None, @@ -235,7 +237,7 @@ def generate_neurons_blocks(input_filename, num_drmaa_cores=16, debug=False, **parameters): - # TODO: Move function into new module with its own command line interface. + # TODO: Move this function into a new module with its own command line interface. # TODO: Heavy refactoring required on this function. # Extract and validate file extensions. @@ -243,27 +245,23 @@ def generate_neurons_blocks(input_filename, # Parse input filename and validate that the name is acceptable input_filename_details = pathHelpers.PathComponents(input_filename) # Clean up the extension so it fits the standard. - input_filename_ext = input_filename_details.extension - input_filename_ext = input_filename_ext.lower().lstrip(os.extsep) - if (input_filename_ext not in ["h5", "hdf5", "he5"]): + if (input_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]): raise Exception( "Input file with filename: \"" + input_filename + "\"" + " provided with an unknown file extension: \"" + - input_filename_ext + "\". If it is a supported " + - "format, please run the given file through " + - "nanshe_converter first before proceeding." + input_filename_details.extension + "\". If it is a supported " + + "format, please run the given file through nanshe_converter " + + "first before proceeding." ) # Parse output filename and validate that the name is acceptable output_filename_details = pathHelpers.PathComponents(output_filename) # Clean up the extension so it fits the standard. - output_filename_ext = output_filename_details.extension - output_filename_ext = output_filename_ext.lower().lstrip(os.extsep) - if (output_filename_ext not in ["h5", "hdf5", "he5"]): + if (output_filename_details.extension.lower().lstrip(os.extsep) not in ["h5", "hdf5", "he5"]): raise Exception( "Output file with filename: \"" + output_filename + "\"" + " provided with an unknown file extension: \"" + - output_filename_ext + "\". If it is a supported " + + output_filename_details.extension + "\". If it is a supported " + "format, please run the given file through nanshe_converter " + "first before proceeding." ) @@ -398,7 +396,8 @@ def generate_neurons_blocks(input_filename, # the f0 calculation. if "extract_f0" in parameters["generate_neurons"]["preprocess_data"]: #assert (parameters["generate_neurons"]["preprocess_data"]["extract_f0"]["half_window_size"] == half_window_shape_array[0]) - assert (parameters["generate_neurons"]["preprocess_data"]["extract_f0"]["half_window_size"] <= half_window_shape_array[0]) + assert (parameters["generate_neurons"]["preprocess_data"][ + "extract_f0"]["half_window_size"] <= half_window_shape_array[0]) # Estimate bounds for each slice. Uses typical python [begin, end) for the # indices. @@ -462,9 +461,8 @@ def generate_neurons_blocks(input_filename, half_border_shape_array, reps_after=2 ) - # Get slice information for the portion within - # `original_images_pared_slices["windowed"]`, which corresponds to - # `original_images_pared_slices["actual"]`. + # Get slice information for the portion within original_images_pared_slices["windowed"], + # which corresponds to original_images_pared_slices["actual"] #original_images_pared_slices["windowed_block_selection"][..., 0] = 0 original_images_pared_slices["windowed_block_selection"][..., 1] = ( original_images_pared_slices["actual"][..., 1] - original_images_pared_slices["actual"][..., 0] @@ -557,8 +555,7 @@ def generate_neurons_blocks(input_filename, block_i = output_group_blocks[i_str] with h5py.File(intermediate_basename_i + os.extsep + "h5", "a") as each_block_file_handle: - # Create a soft link to the original images. But use the - # appropriate type of soft link depending on whether + # Create a soft link to the original images. But use the appropriate type of soft link depending on whether # the input and output file are the same. if "original_images" not in each_block_file_handle: each_block_file_handle["original_images"] = h5py.ExternalLink( @@ -938,10 +935,10 @@ def generate_neurons(original_images, run_stage="all", **parameters): if "dictionary_max_projection" not in generate_neurons.recorders.array_debug_recorder: generate_neurons.recorders.array_debug_recorder["dictionary_max_projection"] = xnumpy.add_singleton_op( - numpy.max, - new_dictionary, - axis=0 - ) + numpy.max, + new_dictionary, + axis=0 + ) if run_stage == "dictionary": return diff --git a/setup.py b/setup.py index f8560ce8..7f089f62 100644 --- a/setup.py +++ b/setup.py @@ -47,6 +47,7 @@ def run_tests(self): "fftw", "setuptools", "psutil", + # "billiard", "numpy", "scipy", "h5py", @@ -66,6 +67,7 @@ def run_tests(self): "fftw", "setuptools", "psutil", + # "billiard", "numpy", "scipy", "h5py", diff --git a/tests/test_nanshe/test_box/test_spams_sandbox.py b/tests/test_nanshe/test_box/test_spams_sandbox.py index ce5d45b4..c2b04bfb 100644 --- a/tests/test_nanshe/test_box/test_spams_sandbox.py +++ b/tests/test_nanshe/test_box/test_spams_sandbox.py @@ -11,7 +11,12 @@ import nose.plugins.attrib import ctypes -import multiprocessing + +try: + import billiard as multiprocessing + mp = multiprocessing.get_context("spawn") +except ImportError: + import multiprocessing as mp try: from queue import Queue @@ -520,7 +525,7 @@ def test_run_multiprocessing_array_spams_trainDL_1(self): g_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( g_array_type._dtype_.type ).type(0)[()])) - g_array = multiprocessing.Array( + g_array = mp.Array( g_array_ctype, numpy.product(g_array_type._shape_), lock=False @@ -541,7 +546,7 @@ def test_run_multiprocessing_array_spams_trainDL_1(self): result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( result_array_type._dtype_.type ).type(0)[()])) - result_array = multiprocessing.Array( + result_array = mp.Array( result_array_ctype, numpy.product(result_array_type._shape_), lock=False @@ -616,7 +621,7 @@ def test_run_multiprocessing_array_spams_trainDL_2(self): g3_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( g3_array_type._dtype_.type ).type(0)[()])) - g3_array = multiprocessing.Array( + g3_array = mp.Array( g3_array_ctype, numpy.product(g3_array_type._shape_), lock=False @@ -637,7 +642,7 @@ def test_run_multiprocessing_array_spams_trainDL_2(self): result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( result_array_type._dtype_.type ).type(0)[()])) - result_array = multiprocessing.Array( + result_array = mp.Array( result_array_ctype, numpy.product(result_array_type._shape_), lock=False @@ -812,7 +817,7 @@ def test_run_multiprocessing_array_spams_trainDL_3(self): g_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( g_array_type._dtype_.type ).type(0)[()])) - g_array = multiprocessing.Array( + g_array = mp.Array( g_array_ctype, numpy.product(g_array_type._shape_), lock=False @@ -833,7 +838,7 @@ def test_run_multiprocessing_array_spams_trainDL_3(self): result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( result_array_type._dtype_.type ).type(0)[()])) - result_array = multiprocessing.Array( + result_array = mp.Array( result_array_ctype, numpy.product(result_array_type._shape_), lock=False @@ -912,7 +917,7 @@ def test_run_multiprocessing_array_spams_trainDL_4(self): g3_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( g3_array_type._dtype_.type ).type(0)[()])) - g3_array = multiprocessing.Array( + g3_array = mp.Array( g3_array_ctype, numpy.product(g3_array_type._shape_), lock=False @@ -933,7 +938,7 @@ def test_run_multiprocessing_array_spams_trainDL_4(self): result_array_ctype = type(numpy.ctypeslib.as_ctypes(numpy.dtype( result_array_type._dtype_.type ).type(0)[()])) - result_array = multiprocessing.Array( + result_array = mp.Array( result_array_ctype, numpy.product(result_array_type._shape_), lock=False