diff --git a/VERSION b/VERSION index 6c6aa7c..a3df0a6 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0 \ No newline at end of file +0.8.0 diff --git a/molnsutil/molnsutil.py b/molnsutil/molnsutil.py index 5fed0d3..621c153 100755 --- a/molnsutil/molnsutil.py +++ b/molnsutil/molnsutil.py @@ -31,18 +31,21 @@ def run(seed, number_of_trajectories): from os import environ import logging from boto.s3.connection import S3Connection -logging.basicConfig(filename="boto.log", level=logging.DEBUG) +#logging.basicConfig(filename="boto.log", level=logging.DEBUG) from boto.s3.key import Key import uuid import math import molns_cloudpickle as cloudpickle import random import copy +import inspect import swiftclient.client import IPython.parallel import uuid from IPython.display import HTML, Javascript, display +import os +import sys import itertools @@ -68,7 +71,8 @@ class MolnsUtilStorageException(Exception): # s3.json needs to be created and put in .molns/s3.json in the root of the home directory. import os -def get_s3config(): +def get_persisistent_storage_config(): + """ Return the configuration for the persistent storage. """ try: with open(os.environ['HOME']+'/.molns/s3.json','r') as fh: s3config = json.loads(fh.read()) @@ -82,6 +86,7 @@ def get_s3config(): class LocalStorage(): """ This class provides an abstraction for storing and reading objects on/from the ephemeral storage. """ + def __init__(self, folder_name="/home/ubuntu/localarea"): self.folder_name = folder_name @@ -127,7 +132,7 @@ def delete(self,filename): class S3Provider(): def __init__(self, bucket_name): - s3config = get_s3config() + s3config = get_persisistent_storage_config() self.connection = S3Connection(is_secure=False, calling_format=boto.s3.connection.OrdinaryCallingFormat(), **s3config['credentials'] @@ -189,7 +194,7 @@ def list(self): class SwiftProvider(): def __init__(self, bucket_name): - s3config = get_s3config() + s3config = get_persisistent_storage_config() self.connection = swiftclient.client.Connection(auth_version=2.0,**s3config['credentials']) self.set_bucket(bucket_name) @@ -253,7 +258,7 @@ class PersistentStorage(): """ def __init__(self, bucket_name=None): - s3config = get_s3config() + s3config = get_persisistent_storage_config() if bucket_name is None: # try reading it from the config file try: @@ -397,11 +402,13 @@ def run_ensemble_map_and_aggregate(model_class, parameters, param_set_id, seed_b """ Generate an ensemble, then run the mappers are aggreator. This will not store the results. """ import sys import uuid + import molnsutil.molns_cloudpickle as cp + if aggregator is None: aggregator = builtin_aggregator_list_append # Create the model try: - model_class_cls = cloudpickle.loads(model_class) + model_class_cls = cp.loads(model_class) if parameters is not None: model = model_class_cls(**parameters) else: @@ -459,6 +466,8 @@ def run_ensemble(model_class, parameters, param_set_id, seed_base, number_of_tra import sys import uuid from molnsutil import PersistentStorage, LocalStorage, SharedStorage + import molnsutil.molns_cloudpickle as cp + if storage_mode=="Shared": storage = SharedStorage() @@ -468,7 +477,7 @@ def run_ensemble(model_class, parameters, param_set_id, seed_base, number_of_tra raise MolnsUtilException("Unknown storage type '{0}'".format(storage_mode)) # Create the model try: - model_class_cls = cloudpickle.loads(model_class) + model_class_cls = cp.loads(model_class) if parameters is not None: model = model_class_cls(**parameters) else: @@ -561,34 +570,76 @@ def map_and_aggregate(results, param_set_id, mapper, aggregator=None, cache_resu #return res +############################################################################ class DistributedEnsemble(): """ A class to provide an API for execution of a distributed ensemble. """ - def __init__(self, model_class=None, parameters=None, client=None, num_engines=None): + my_class_name = 'DistributedEnsemble' + + #----------------------------------------------------------------------------------- + @classmethod + def delete(cls, name): + """ Static method to remove the state of a distributed comptutation from the system.""" + # delete realization + try: + with open('.molnsutil/{1}-{0}'.format(name, cls.my_class_name)) as fd: + state = pickle.load(fd) + + if state['storage_mode'] is not None: + if state['storage_mode'] == "Shared": + ss = SharedStorage() + elif state['storage_mode'] == "Persistent": + ss = PersistentStorage() + for param_set_id in state['result_list']: + for filename in state['result_list'][param_set_id]: + try: + ss.delete(filename) + except OSError as e: + pass + # delete database file + os.remove('.molnsutil/{1}-{0}'.format(name, cls.my_class_name)) + except Exception as e: + sys.stderr.write('delete(): {0}'.format(e)) + + #----------------------------------------------------------------------------------- + def __init__(self, name=None, model_class=None, parameters=None, client=None, num_engines=None, ignore_model_mismatch=False): """ Constructor """ - self.my_class_name = 'DistributedEnsemble' + if not isinstance(name, str): + raise MolnsUtilException("name not specified") + self.name = name + if not inspect.isclass(model_class): + raise MolnsUtilException("model_class not a class") self.model_class = cloudpickle.dumps(model_class) - self.parameters = [parameters] - self.number_of_trajectories = 0 - self.seed_base = self.generate_seed_base() - self.storage_mode = None - # A chunk list - self.result_list = {} # Set the Ipython.parallel client self.num_engines = num_engines self._update_client(client) - + + if not self.load_state(ignore_model_mismatch=ignore_model_mismatch): + # Set defaults if state not found + self.parameters = [parameters] + self.number_of_trajectories = 0 + self.seed_base = self.generate_seed_base() + self.storage_mode = None + self.result_list = {} + self.running_MapReduceTask = None + self.running_SimulationTask = None + self.reduced_results = None + self.mapped_results = None + self.number_of_results = 0 + self.step1_complete = False + self.step2_complete = False + self.step3_complete = False + self.mapper_fn = None + self.aggregator_fn = None + self.reducer_fn = None + + #----------------------------------------------------------------------------------- def generate_seed_base(self): """ Create a random number and truncate to 64 bits. """ - x = int(uuid.uuid4()) - if x.bit_length() >= 64: - x = x & ((1<<64)-1) - if x > (1 << 63) -1: - x -= 1 << 64 - return x + return abs(int(random.getrandbits(31))) - #-------------------------- - def save_state(self, name): + #----------------------------------------------------------------------------------- + def save_state(self): """ Serialize the state of the ensemble, for persistence beyond memory.""" state = {} state['model_class'] = self.model_class @@ -597,99 +648,238 @@ def save_state(self, name): state['seed_base'] = self.seed_base state['result_list'] = self.result_list state['storage_mode'] = self.storage_mode + # + state['reduced_results'] = self.reduced_results + state['mapped_results'] = self.mapped_results + state['number_of_results'] = self.number_of_results + state['step1_complete'] = self.step1_complete + state['step2_complete'] = self.step2_complete + state['step3_complete'] = self.step3_complete + state['mapper_fn'] = self.mapper_fn + state['aggregator_fn'] = self.aggregator_fn + state['reducer_fn'] = self.reducer_fn + # + if self.running_MapReduceTask is None: + state['running_MapReduceTask'] = None + else: + state['running_MapReduceTask'] = self.running_MapReduceTask.msg_ids + if self.running_SimulationTask is None: + state['running_SimulationTask'] = None + else: + state['running_SimulationTask'] = self.running_SimulationTask.msg_ids if not os.path.isdir('.molnsutil'): os.makedirs('.molnsutil') - with open('.molnsutil/{1}-{0}'.format(name, self.my_class_name)) as fd: + with open('.molnsutil/{1}-{0}'.format(self.name, self.my_class_name),'w+') as fd: pickle.dump(state, fd) - def load_state(self, name): + #----------------------------------------------------------------------------------- + def load_state(self, ignore_model_mismatch=False): """ Recover the state of an ensemble from a previous save. """ - with open('.molnsutil/{1}-{0}'.format(name, self.my_class_name)) as fd: - state = pickle.load(fd) - if state['model_class'] is not self.model_class: - raise MolnsUtilException("Can only load state of a class that is identical to the original class") - self.parameters = state['parameters'] - self.number_of_trajectories = state['number_of_trajectories'] - self.seed_base = state['seed_base'] - self.result_list = state['result_list'] - self.storage_mode = state['storage_mode'] + try: + with open('.molnsutil/{1}-{0}'.format(self.name, self.my_class_name)) as fd: + state = pickle.load(fd) + if state['model_class'] != self.model_class and not ignore_model_mismatch: + #sys.stderr.write("Error loading saved state\n\n") + #sys.stderr.write("state['model_class']={0}\n\n".format(state['model_class'])) + #sys.stderr.write("self.model_class={0}\n\n".format(self.model_class)) + #sys.stderr.write("state['model_class'] != self.model_class {0}\n\n".format(state['model_class'] != self.model_class)) + #TODO: Minor differences show up in the pickled string, but the classes are still identical. Find a way around this. + raise MolnsUtilException("Can only load state of a class that is identical to the original class. Use '{0}.delete(name=\"{1}\")' to remove previous state. Use the argument 'ignore_model_mismatch=True' to override".format(self.my_class_name, self.name)) + #TODO: Check to be sure the state is sane. Both tasks can not be running, result list and number_of_trajectories should match up, (others?). + + self.parameters = state['parameters'] + self.number_of_trajectories = state['number_of_trajectories'] + self.seed_base = state['seed_base'] + self.result_list = state['result_list'] + self.storage_mode = state['storage_mode'] + # + self.reduced_results = state['reduced_results'] + self.mapped_results = state['mapped_results'] + self.number_of_results = state['number_of_results'] + self.step1_complete = state['step1_complete'] + self.step2_complete = state['step2_complete'] + self.step3_complete = state['step3_complete'] + self.mapper_fn = state['mapper_fn'] + self.aggregator_fn = state['aggregator_fn'] + self.reducer_fn = state['reducer_fn'] + # + if state['running_MapReduceTask'] is None: + self.running_MapReduceTask = None + else: + self.running_MapReduceTask = self.c.get_result(state['running_MapReduceTask']) + if state['running_SimulationTask'] is None: + self.running_SimulationTask = None + else: + self.running_SimulationTask = self.c.get_result(state['running_SimulationTask']) + return True + except IOError as e: + return False - #-------------------------- + #----------------------------------------------------------------------------------- # MAIN FUNCTION - #-------------------------- - def run(self, mapper, aggregator=None, reducer=None, number_of_trajectories=None, chunk_size=None, verbose=True, progress_bar=True, store_realizations=True, storage_mode="Shared", cache_results=False): - """ Main entry point """ - if store_realizations: - if self.storage_mode is None: - if storage_mode != "Persistent" and storage_mode != "Shared": - raise MolnsUtilException("Acceptable values for 'storage_mode' are 'Persistent' or 'Shared'") - self.storage_mode = storage_mode - elif self.storage_mode != storage_mode: - raise MolnsUtilException("Storage mode already set to {0}, can not mix storage modes".format(self.storage_mode)) - # Do we have enough trajectores yet? - if number_of_trajectories is None and self.number_of_trajectories == 0: - raise MolnsUtilException("number_of_trajectories is zero") - # Run simulations - if self.number_of_trajectories < number_of_trajectories: - self.add_realizations( number_of_trajectories - self.number_of_trajectories, chunk_size=chunk_size, verbose=verbose, storage_mode=storage_mode) + #----------------------------------------------------------------------------------- + def run(self, mapper=None, aggregator=None, reducer=None, number_of_trajectories=None, chunk_size=None, verbose=True, progress_bar=True, store_realizations=True, storage_mode="Shared", cache_results=False): + """ Main entry point for executing parallel MOLNs computations. + + Arguments: + mapper [required] Python function that takes as input the simulation result. This function is applied to each simulation trajectory. + aggregator [optional] Python function that aggregates the output of the mapper function on each worker engine. + reducer [optional] Python function that takes as input the output of all the aggregator functions on each worker. One insteance of this function is run for each parameter point. + number_of_trajectories [required] Integer, number of simulation trajectories to execute for each parameter point. + chunk_size [optional] Integer, group a number of trajectories into a block for efficicnet execution. One aggregator will be run for each chunk. + verbose [optional, default True] Print the status of the computation. + progress_bar [optional, default True] Display a javascript progress bar to indicate the progress of the computation. + store_realizations [optional, default True] If set to False, the intermediary results will be deleted as soon as the computation is complete. + storage_mode [required, default 'Shared'] Either 'Shared' or 'Persistent'. Store the intermediary results in the ephemeral shared filesystem ('Shared'), or the cloud object store ('Persistent' e.g. Amazon S3 or OpenStack Swift). + cache_results [optional, experimental, default False] Store the intermediary results in the ephemeral storage on each compute node. + Returns: + The output of the computation is returned. For 'DistributedEnsemble' class, this is the output of the 'reducer' function. For the 'ParameterSweep' class, this is a 'ParameterSweepResultList' object. + Raises: + MolnsUtilException on error. + """ + ##### + # 0. Validate input. + if mapper is None or not hasattr(mapper, '__call__'): + raise MolnsUtilException("mapper function not specified") + if self.storage_mode is None: + if storage_mode != "Persistent" and storage_mode != "Shared": + raise MolnsUtilException("Acceptable values for 'storage_mode' are 'Persistent' or 'Shared'") + self.storage_mode = storage_mode + elif self.storage_mode != storage_mode: + raise MolnsUtilException("Storage mode already set to {0}, can not mix storage modes".format(self.storage_mode)) + if number_of_trajectories is None or number_of_trajectories == 0: + raise MolnsUtilException("'number_of_trajectories' is zero.") + elif not store_realizations and self.number_of_trajectories > 0 and self.number_of_trajectories != number_of_trajectories: + raise MolnsUtilException("'number_of_trajectories' changed since first call. Value can only be changed if store_realizations is True") + #TODO: Fix, if store_realizations is false, number_of_trajectories=0 after success + elif store_realizations and self.number_of_trajectories > 0 and self.number_of_trajectories > number_of_trajectories: + raise MolnsUtilException("'number_of_trajectories' less than first call. Can not reduce number of realizations.") + # Check if the mapper, aggregator, and reducer functions are the same as previously run. If not throw error. Use 'clear_results()' to reset + mapper_fn_pkl = cloudpickle.dumps(mapper) + if self.mapper_fn is not None and self.mapper_fn != mapper_fn_pkl: + raise MolnsUtilException("'mapper' function has changed since results have been computed. Use 'clear_results()' to reset.") + else: + self.mapper_fn = mapper_fn_pkl + aggregator_fn_pkl = cloudpickle.dumps(aggregator) + if self.aggregator_fn is not None and self.aggregator_fn != aggregator_fn_pkl: + raise MolnsUtilException("'aggregator' function has changed since results have been computed. Use 'clear_results()' to reset.") + else: + self.aggregator_fn = aggregator_fn_pkl + reducer_fn_pkl = cloudpickle.dumps(reducer) + if self.reducer_fn is not None and self.reducer_fn != reducer_fn_pkl: + raise MolnsUtilException("'reducer' function has changed since results have been computed. Use 'clear_results()' to reset.") + else: + self.reducer_fn = reducer_fn_pkl + if self.number_of_trajectories > 0 and not store_realizations and self.number_of_trajectories != number_of_trajectories: + raise MolnsUtilException("'number_of_trajectories' is not the same as the original call. It can only be changed if 'store_realizations' is True.") - if chunk_size is None: - chunk_size = self._determine_chunk_size(self.number_of_trajectories) + ##### + # Shortcut, check if computation is complete + if self.step3_complete: if verbose: - print "Running mapper & aggregator on the result objects (number of results={0}, chunk size={1})".format(self.number_of_trajectories*len(self.parameters), chunk_size) - else: - progress_bar=False + print "Using previously computed results." + return self.reduced_results - # chunks per parameter - num_chunks = int(math.ceil(self.number_of_trajectories/float(chunk_size))) - chunks = [chunk_size]*(num_chunks-1) - chunks.append(self.number_of_trajectories-chunk_size*(num_chunks-1)) - # total chunks - pchunks = chunks*len(self.parameters) - num_pchunks = num_chunks*len(self.parameters) - pparams = [] - param_set_ids = [] - presult_list = [] - for id, param in enumerate(self.parameters): - param_set_ids.extend( [id]*num_chunks ) - pparams.extend( [param]*num_chunks ) - for i in range(num_chunks): - presult_list.append( self.result_list[id][i*chunk_size:(i+1)*chunk_size] ) - results = self.lv.map_async(map_and_aggregate, presult_list, param_set_ids, [mapper]*num_pchunks,[aggregator]*num_pchunks,[cache_results]*num_pchunks) + ###### + # 1. Run simulations + #sys.stderr.write("[1] self.number_of_trajectories < number_of_trajectories: {0} {1} {2}\n".format(self.number_of_trajectories,number_of_trajectories,self.number_of_trajectories < number_of_trajectories)) + #sys.stderr.write("[1] (not self.step1_complete): {0} {1}\n".format(self.step1_complete, (not store_realizations))) + if (not self.step1_complete) or (store_realizations == True and self.number_of_trajectories < number_of_trajectories): + if verbose: + print "Step 1: Computing simulation trajectories." + self.add_realizations( number_of_trajectories - self.number_of_trajectories, chunk_size=chunk_size, verbose=verbose, storage_mode=storage_mode, progress_bar=progress_bar) + + self.step1_complete=True else: - # If we don't store the realizations (or use the stored ones) - if chunk_size is None: - chunk_size = self._determine_chunk_size(number_of_trajectories) - if not verbose: - progress_bar=False - else: - print "Generating {0} realizations of the model, running mapper & aggregator (chunk size={1})".format(number_of_trajectories,chunk_size) + if verbose: + print "Step 1: Done. Using previously computed simulation trajectories." + + ###### + # 2. Run Map function for the MapReduce post-processing + if (not self.step2_complete) or (store_realizations == True and self.number_of_results < self.number_of_trajectories): + if verbose: + print "Step 2: Running mapper & aggregator on the result objects (number of results={0}, chunk size={1})".format(self.number_of_trajectories*len(self.parameters), chunk_size) + self.run_mappers(mapper, aggregator, chunk_size=chunk_size, progress_bar=progress_bar, cache_results=cache_results) + self.step2_complete=True + self.step3_complete=False # To ensure that step 3 is always run after step 2 runs (as in the case of a re-execution). + else: + if verbose: + print "Step 2: Done. Using previously computed results." + ###### + # 3. Run Reduce function for the MapReduce post-processing + if not self.step3_complete: + if verbose: + print "Step 3: Running reducer on mapped and aggregated results (size={0})".format(len(self.mapped_results[0])) + if reducer is None: + reducer = builtin_reducer_default + self.reduced_results = self.run_reducer(reducer) + self.step3_complete = True + self.save_state() + else: + if verbose: + print "Step 3: Done. Using previously computed results." + + ###### + # Clean up + if not store_realizations: + self.delete_realizations() + self.delete_results() + self.save_state() + + ###### + # Return results + return self.reduced_results + + #----------------------------------------------------------------------------------- + def run_reducer(self, reducer): + """ Inside the run() function, apply the reducer to all of the map'ped-aggregated result values. """ + return reducer(self.mapped_results[0], parameters=self.parameters[0]) + + #----------------------------------------------------------------------------------- + def run_mappers(self, mapper, aggregator, chunk_size=None, progress_bar=True, cache_results=False): + """ Run the mapper and aggrregator function on each of the simulation trajectories. """ + num_results_to_compute = self.number_of_trajectories - self.number_of_results + if self.running_MapReduceTask is None: + #sys.stderr.write('run_mappers(): Starting MapReduce Task\n') + # If number_of_trajectories > 0 and number_of_results < number_of_trajectories, only run mappers on the 'new' trajectories. + offset = self.number_of_results # chunks per parameter - num_chunks = int(math.ceil(number_of_trajectories/float(chunk_size))) + if chunk_size is None: + chunk_size = self._determine_chunk_size(num_results_to_compute) + #sys.stderr.write('chunk_size={0}, num_results_to_compute={1}\n'.format(chunk_size, num_results_to_compute)) + if chunk_size < 1: chunk_size = 1 + num_chunks = int(math.ceil(num_results_to_compute/float(chunk_size))) chunks = [chunk_size]*(num_chunks-1) - chunks.append(number_of_trajectories-chunk_size*(num_chunks-1)) + chunks.append(num_results_to_compute-chunk_size*(num_chunks-1)) # total chunks pchunks = chunks*len(self.parameters) num_pchunks = num_chunks*len(self.parameters) - pparams = [] + #pparams = [] param_set_ids = [] - for id, param in enumerate(self.parameters): - param_set_ids.extend( [id]*num_chunks ) - pparams.extend( [param]*num_chunks ) + presult_list = [] + try: #'try' is for Debugging + for id, param in enumerate(self.parameters): + param_set_ids.extend( [id]*num_chunks ) + #pparams.extend( [param]*num_chunks ) + for i in range(num_chunks): + presult_list.append( self.result_list[id][(i*chunk_size+offset):((i+1)*chunk_size+offset)] ) + except Exception as e: + #sys.stderr.write('run_mappers(): caught exception while trying start MapReduce Task: {0}\n'.format(e)) + #sys.stderr.write('run_mappers(): self.parameters={0}\n'.format(self.parameters)) + #sys.stderr.write('run_mappers(): self.result_list={0}\n'.format(self.result_list)) + raise - seed_list = [] - for _ in range(len(self.parameters)): - #need to do it this way cause the number of run per chunk might not be even - seed_list.extend(range(self.seed_base, self.seed_base+number_of_trajectories, chunk_size)) - self.seed_base += number_of_trajectories - #def run_ensemble_map_and_aggregate(model_class, parameters, seed_base, number_of_trajectories, mapper, aggregator=None): - results = self.lv.map_async(run_ensemble_map_and_aggregate, [self.model_class]*num_pchunks, pparams, param_set_ids, seed_list, pchunks, [mapper]*num_pchunks, [aggregator]*num_pchunks) + self.running_MapReduceTask = self.lv.map_async(map_and_aggregate, presult_list, param_set_ids, [mapper]*num_pchunks,[aggregator]*num_pchunks,[cache_results]*num_pchunks) + self.save_state() + else: + #sys.stderr.write('run_mappers(): MapReduce Task already running\n') + pass + if progress_bar: - # This should be factored out somehow. divid = str(uuid.uuid4()) pb = HTML("""