From e1fbd35122d7f4465539bdb71a1fa713ffa0140d Mon Sep 17 00:00:00 2001 From: Tobias Burnesson Date: Wed, 3 Mar 2021 14:00:29 +0100 Subject: [PATCH] Not all environments support /dev/shm. It then crashes when there is an attempt to run Queue or JoinableQueue. It would be a good thing to be able to run local_search sequentially instead in such environments. It seems suitable to use number_of_cpus == 1 as the condition to do so. --- hypermapper/local_search.py | 140 ++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 29 deletions(-) diff --git a/hypermapper/local_search.py b/hypermapper/local_search.py index 87cd0a4..a692c6c 100644 --- a/hypermapper/local_search.py +++ b/hypermapper/local_search.py @@ -616,6 +616,69 @@ def parallel_optimization_function( for key, column in col_of_keys.items() } + def sequential_local_search( + input_list, + output_list, + input_params, + param_space, + optimization_function_parameters, + proc, + ): + sys.stdout.write_to_logfile("Running sequential_local_search \n") + while len(input_list) > 0: + config = input_list.pop(0) + if config is None: + break + iteration_data_array = {} + configuration = config["config"] + idx = config["idx"] + sys.stdout.write_to_logfile("Starting local search on configuration: " + str(configuration) + "\n") + while configuration is not None: + neighbors = get_neighbors(configuration, param_space) + neighbors = data_tuples_to_dict_list(neighbors, input_params) + function_values, feasibility_indicators = optimization_function( + configurations=neighbors, **optimization_function_parameters + ) + function_values_size = len(function_values) + new_data_array = concatenate_list_of_dictionaries( + neighbors[:function_values_size] + ) + new_data_array[scalarization_key] = function_values + if enable_feasible_predictor: + new_data_array[feasible_parameter] = feasibility_indicators + iteration_data_array = concatenate_data_dictionaries( + iteration_data_array, new_data_array + ) + """ + The out of budget stopping criterion is removed for now, since it's not deemed relevant for this algorithm! + """ + if enable_feasible_predictor: + best_neighbor = get_min_feasible_configurations( + new_data_array, 1, scalarization_key, feasible_parameter + ) + else: + best_neighbor = get_min_configurations( + new_data_array, 1, scalarization_key + ) + + for key in configuration: + configuration[key] = [configuration[key]] + if are_configurations_equal(best_neighbor, configuration, input_params): + sys.stdout.write_to_logfile("Local minimum found: " + str(best_neighbor) + "\n") + output_list.append( + {"data_array": iteration_data_array} + ) + configuration = None + else: + sys.stdout.write_to_logfile(( + "Replacing configuration by best neighbor: " + + str(best_neighbor) + + "\n" + )) + configuration = { + key: value[0] for key, value in best_neighbor.items() + } + def parallel_multistart_local_search( input_queue, output_queue, @@ -692,33 +755,53 @@ def parallel_multistart_local_search( + str(number_of_configurations) + "\n" ) - input_queue = JoinableQueue() - output_queue = Queue() - # puts each configuration in a queue to be evaluated in parallel - for idx in range(number_of_configurations): - input_queue.put( - { - "config": get_single_configuration(local_search_configurations, idx), - "idx": idx, - } - ) - sys.stdout.write_to_logfile((f"{idx}, \n")) - - for i in range(number_of_cpus): - input_queue.put(None) + result_array = {} if number_of_cpus == 1: - parallel_multistart_local_search( - input_queue, - output_queue, + input_list = [] + output_list = [] + # puts each configuration in a list to be evaluated in sequence + for idx in range(number_of_configurations): + input_list.append( + { + "config": get_single_configuration(local_search_configurations, idx), + "idx": idx, + } + ) + sys.stdout.write_to_logfile((f"{idx}, \n")) + + sequential_local_search( + input_list, + output_list, input_params, param_space, optimization_function_parameters, 0, ) - input_queue.join() + + for i in range(number_of_configurations): + result = output_list.pop(0) + result_array = concatenate_data_dictionaries(result_array, result["data_array"]) + data_array = concatenate_data_dictionaries(result_array, data_array) else: + + input_queue = JoinableQueue() + output_queue = Queue() + # puts each configuration in a queue to be evaluated in parallel + for idx in range(number_of_configurations): + input_queue.put( + { + "config": get_single_configuration(local_search_configurations, idx), + "idx": idx, + } + ) + sys.stdout.write_to_logfile((f"{idx}, \n")) + + for i in range(number_of_cpus): + input_queue.put(None) + + processes = [ Process( target=parallel_multistart_local_search, @@ -739,19 +822,18 @@ def parallel_multistart_local_search( process.start() input_queue.join() - result_array = {} - for i in range(number_of_configurations): - result = output_queue.get() - sys.stdout.write_to_logfile(result["logstring"]) - result_array = concatenate_data_dictionaries(result_array, result["data_array"]) - data_array = concatenate_data_dictionaries(result_array, data_array) + for i in range(number_of_configurations): + result = output_queue.get() + sys.stdout.write_to_logfile(result["logstring"]) + result_array = concatenate_data_dictionaries(result_array, result["data_array"]) + data_array = concatenate_data_dictionaries(result_array, data_array) - input_queue.close() - output_queue.close() + input_queue.close() + output_queue.close() - if number_of_cpus != 1: - for i in range(len(processes)): - processes[i].join() + if number_of_cpus != 1: + for i in range(len(processes)): + processes[i].join() local_search_time = datetime.datetime.now() sys.stdout.write_to_logfile(