From 9cc45bec65b78c48744bcaccca3324a9b06ad020 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Tue, 19 Mar 2024 15:48:19 -0400 Subject: [PATCH 1/8] tweaks for: calling stop_server in parallel, remote DVs with dots in them, remote DVs that don't exist on all ranks (e.g. from parallel group) --- mphys/network/remote_component.py | 5 +++-- mphys/network/server.py | 15 +++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py index 6de888bc..50356913 100644 --- a/mphys/network/remote_component.py +++ b/mphys/network/remote_component.py @@ -13,7 +13,8 @@ class RemoteComp(om.ExplicitComponent): """ def stop_server(self): # shortcut for stopping server from top level - self.server_manager.stop_server() + if self.server_manager is not None: + self.server_manager.stop_server() def initialize(self): self.options.declare('run_server_filename', default="mphys_server.py", desc="python file that will launch the Server class") @@ -135,7 +136,7 @@ def _assign_additional_partials_from_remote_output(self, remote_dict, partials): def _create_input_dict_for_server(self, inputs): input_dict = {'design_vars': {}, 'additional_inputs': {}, 'additional_outputs': self.additional_remote_outputs, 'component_name': self.name} for dv in self.design_var_keys: - input_dict['design_vars'][dv.replace('.',self.var_naming_dot_replacement)] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()} + input_dict['design_vars'][dv] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()} for input in self.additional_remote_inputs: input_dict['additional_inputs'][input] = {'val': inputs[input.replace('.',self.var_naming_dot_replacement)].tolist()} return input_dict diff --git a/mphys/network/server.py b/mphys/network/server.py index 1fb509bd..3410f51f 100644 --- a/mphys/network/server.py +++ b/mphys/network/server.py @@ -25,7 +25,8 @@ class Server: def __init__(self, get_om_group_function_pointer, ignore_setup_warnings = False, ignore_runtime_warnings = False, - rerun_initial_design = False): + rerun_initial_design = False, + write_n2 = False): self.get_om_group_function_pointer = get_om_group_function_pointer self.ignore_setup_warnings = ignore_setup_warnings @@ -38,6 +39,7 @@ def __init__(self, get_om_group_function_pointer, self.additional_inputs = None self.additional_outputs = None self.design_counter = 0 # more debugging info for client side json dumping + self.write_n2 = write_n2 self._load_the_model() @@ -99,7 +101,7 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}): design_vars = self.prob.model._design_vars remote_output_dict['design_vars'] = {} for dv in design_vars.keys(): - remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv), + remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv, get_remote=True), 'ref': design_vars[dv]['ref'], 'ref0': design_vars[dv]['ref0'], 'lower': design_vars[dv]['lower'], @@ -117,7 +119,7 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}): def _gather_additional_inputs_from_om_problem(self, remote_output_dict = {}): remote_output_dict['additional_inputs'] = {} for input in self.additional_inputs: - remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input)} + remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input, get_remote=True)} if hasattr(remote_output_dict['additional_inputs'][input]['val'], 'tolist'): remote_output_dict['additional_inputs'][input]['val'] = remote_output_dict['additional_inputs'][input]['val'].tolist() return remote_output_dict @@ -271,14 +273,14 @@ def _gather_inputs_and_outputs_from_om_problem(self): def _set_design_variables_into_the_server_problem(self, input_dict): design_changed = False for key in input_dict['design_vars'].keys(): - if (self.prob.get_val(key)!=input_dict['design_vars'][key]['val']).any(): + if (self.prob.get_val(key, get_remote=True)!=input_dict['design_vars'][key]['val']).any(): design_changed = True self.prob.set_val(key, input_dict['design_vars'][key]['val']) return design_changed def _set_additional_inputs_into_the_server_problem(self, input_dict, design_changed): for key in input_dict['additional_inputs'].keys(): - if (self.prob.get_val(key)!=input_dict['additional_inputs'][key]['val']).any(): + if (self.prob.get_val(key, get_remote=True)!=input_dict['additional_inputs'][key]['val']).any(): design_changed = True self.prob.set_val(key, input_dict['additional_inputs'][key]['val']) return design_changed @@ -350,4 +352,5 @@ def run(self): self._send_outputs_to_client(output_dict) # write current n2 with values - om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html") + if self.write_n2: + om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html") From 60063aa0e8b673e84a99d4c8f5d032157639a13b Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Thu, 21 Mar 2024 12:42:55 -0400 Subject: [PATCH 2/8] add way to restart job if it is no longer running --- mphys/network/remote_component.py | 2 ++ mphys/network/server_manager.py | 5 +++++ mphys/network/zmq_pbs.py | 27 ++++++++++++++++++++++++--- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py index 50356913..f0183fe0 100644 --- a/mphys/network/remote_component.py +++ b/mphys/network/remote_component.py @@ -151,6 +151,8 @@ def _is_first_gradient_evaluation(self): return len(self.times_gradient) == 0 def _need_to_restart_server(self, command: str): + if self.server_manager.job_has_expired(): + return True if self._doing_derivative_evaluation(command): if self._is_first_gradient_evaluation() or self.reboot_only_on_function_call: return False diff --git a/mphys/network/server_manager.py b/mphys/network/server_manager.py index dda1c75d..376b34e2 100644 --- a/mphys/network/server_manager.py +++ b/mphys/network/server_manager.py @@ -29,3 +29,8 @@ def enough_time_is_remaining(self, estimated_model_time): How much time the new analysis is estimated to take """ return True + + def job_has_expired(self): + """ + Check if the job has run out of time. + """ diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py index 85f646d6..1599a424 100644 --- a/mphys/network/zmq_pbs.py +++ b/mphys/network/zmq_pbs.py @@ -19,6 +19,7 @@ def initialize(self): self.options.declare('port', default=5081, desc="port number for server/client communication") self.options.declare('acceptable_port_range', default=[5081,6000], desc="port range to look through if 'port' is currently busy") self.options.declare('additional_server_args', default="", desc="Optional arguments to give server, in addition to --port ") + self.options.declare('job_expiration_max_restarts', default=None, desc="Optional maximum number of server restarts due to job expiration; unlimited by default") super().initialize() self.server_manager = None # for avoiding reinitialization due to multiple setup calls @@ -40,7 +41,8 @@ def _setup_server_manager(self): component_name=self.name, port=self.options['port'], acceptable_port_range=self.options['acceptable_port_range'], - additional_server_args=self.options['additional_server_args']) + additional_server_args=self.options['additional_server_args'], + job_expiration_max_restarts=self.options['job_expiration_max_restarts']) class MPhysZeroMQServerManager(ServerManager): """ @@ -61,6 +63,8 @@ class MPhysZeroMQServerManager(ServerManager): Range of alternative port numbers if specified port is already in use additional_server_args : str Optional arguments to give server, in addition to --port + job_expiration_max_restarts : int + Optional maximum number of server restarts due to job expiration; unlimited by default """ def __init__(self, pbs: PBS, @@ -68,7 +72,8 @@ def __init__(self, component_name: str, port=5081, acceptable_port_range=[5081,6000], - additional_server_args='' + additional_server_args='', + job_expiration_max_restarts=None ): self.pbs = pbs self.run_server_filename = run_server_filename @@ -76,8 +81,10 @@ def __init__(self, self.port = port self.acceptable_port_range = acceptable_port_range self.additional_server_args = additional_server_args + self.job_expiration_max_restarts = job_expiration_max_restarts self.queue_time_delay = 5 # seconds to wait before rechecking if a job has started self.server_counter = 0 # for saving output of each server to different files + self.job_expiration_restarts = 0 self.start_server() def start_server(self): @@ -87,7 +94,8 @@ def start_server(self): def stop_server(self): print(f'CLIENT (subsystem {self.component_name}): Stopping the remote analysis server', flush=True) - self.socket.send('shutdown|null'.encode()) + if self.job.state=='R': + self.socket.send('shutdown|null'.encode()) self._shutdown_server() self.socket.close() @@ -98,6 +106,19 @@ def enough_time_is_remaining(self, estimated_model_time): else: return estimated_model_time < self.job.walltime_remaining + def job_has_expired(self): + self.job.update_job_state() + if self.job.state!='R': + if self.job_expiration_max_restarts is not None: + if self.job_expiration_restarts+1 > self.job_expiration_max_restarts: + self.stop_server() + raise RuntimeError(f'CLIENT (subsystem {self.component_name}): Reached maximum number of job expiration restarts') + self.job_expiration_restarts += 1 + print(f'CLIENT (subsystem {self.component_name}): Job no longer running; flagging for job restart') + return True + else: + return False + def _port_is_in_use(self, port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: return s.connect_ex(('localhost', port))==0 From 331397f800becaa0052519f3a3e7cb28344d2ea4 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Thu, 21 Mar 2024 14:01:23 -0400 Subject: [PATCH 3/8] keep track of down time between model evaluations --- docs/basics/remote_components.rst | 1 + mphys/network/remote_component.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/basics/remote_components.rst b/docs/basics/remote_components.rst index 1b2ba290..b772f4da 100644 --- a/docs/basics/remote_components.rst +++ b/docs/basics/remote_components.rst @@ -64,6 +64,7 @@ Troubleshooting =============== The :code:`dump_json` option for :code:`RemoteZeroMQComp` will make the component write input and output JSON files, which contain all data sent to and received from the server. An exception is the :code:`wall_time` entry (given in seconds) in the output JSON file, which is added on the client-side after the server has completed the design evaluation. +Similarly, the :code:`down_time` entry keeps track of the elapsed time between the end of the previous design evaluation and the beginning of the current one. Another entry that is only provided for informational purposes is :code:`design_counter`, which keeps track of how many different designs have been evaluated on the current server. If :code:`dump_separate_json` is set to True, then separate files will be written for each design evaluation. On the server side, an n2 file titled :code:`n2_inner_analysis_.html` will be written after each evaluation. diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py index f0183fe0..aec1423b 100644 --- a/mphys/network/remote_component.py +++ b/mphys/network/remote_component.py @@ -44,6 +44,7 @@ def setup(self): self.additional_remote_outputs = self.options['additional_remote_outputs'] self.use_derivative_coloring = self.options['use_derivative_coloring'] self.derivative_coloring_num = 0 + self.last_analysis_completed_time = time.time() # for tracking down time between function/gradient calls if self.dump_separate_json: self.dump_json = True @@ -95,14 +96,16 @@ def evaluate_model(self, remote_input_dict=None, command='initialize'): if self.dump_json: self._dump_json(remote_input_dict, command) + down_time = time.time() - self.last_analysis_completed_time model_start_time = time.time() self._send_inputs_to_server(remote_input_dict, command) remote_output_dict = self._receive_outputs_from_server() model_time_elapsed = time.time() - model_start_time + self.last_analysis_completed_time = time.time() if self.dump_json: - remote_output_dict.update({'wall_time': model_time_elapsed}) + remote_output_dict.update({'wall_time': model_time_elapsed, 'down_time': down_time}) self._dump_json(remote_output_dict, command) if self._doing_derivative_evaluation(command): From b8ff328d63ba873d0526977fa0778c0d2c78f552 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Thu, 21 Mar 2024 14:14:40 -0400 Subject: [PATCH 4/8] stop_server in example changed from eval command to explicit stop_server commands, now that they work on both ranks in parallel --- .../supersonic_panel/as_opt_remote_parallel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py index 97534964..0ccfafba 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py @@ -142,5 +142,6 @@ def setup(self): f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') f.write(' ' + '\n') -# shutdown each rank's server -eval(f'prob.model.multipoint.remote_scenario{prob.model.comm.rank}.stop_server()') +# shutdown the servers +prob.model.multipoint.remote_scenario0.stop_server() +prob.model.multipoint.remote_scenario1.stop_server() From 0fc312ab8599149090c4f607c95118601522e201 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Thu, 21 Mar 2024 16:08:31 -0400 Subject: [PATCH 5/8] replace dummy port forwarding process with dummy zeromq socket, to avoid tons of system log errors --- mphys/network/zmq_pbs.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py index 1599a424..a54960f2 100644 --- a/mphys/network/zmq_pbs.py +++ b/mphys/network/zmq_pbs.py @@ -153,11 +153,11 @@ def _launch_job(self): def _wait_for_job_to_start(self): print(f'CLIENT (subsystem {self.component_name}): Waiting for job to start', flush=True) job_submission_time = time.time() - self._setup_placeholder_ssh() + self._setup_dummy_socket() while self.job.state!='R': time.sleep(self.queue_time_delay) self.job.update_job_state() - self._stop_placeholder_ssh() + self._stop_dummy_socket() self.job_start_time = time.time() print(f'CLIENT (subsystem {self.component_name}): Job started (queue wait time: {(time.time()-job_submission_time)/3600} hours)', flush=True) @@ -172,15 +172,14 @@ def _shutdown_server(self): time.sleep(0.1) # prevent full shutdown before job deletion? self.job.qdel() - def _setup_placeholder_ssh(self): - print(f'CLIENT (subsystem {self.component_name}): Starting placeholder process to hold port {self.port} while in queue', flush=True) - ssh_command = f'ssh -4 -o ServerAliveCountMax=40 -o ServerAliveInterval=15 -N -L {self.port}:localhost:{self.port} {socket.gethostname()} &' - self.ssh_proc = subprocess.Popen(ssh_command.split(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL) + def _setup_dummy_socket(self): + print(f'CLIENT (subsystem {self.component_name}): Starting dummy ZeroMQ socket to hold port {self.port} while in queue', flush=True) + context = zmq.Context() + self.dummy_socket = context.socket(zmq.REP) + self.dummy_socket.bind(f"tcp://*:{self.port}") - def _stop_placeholder_ssh(self): - self.ssh_proc.kill() + def _stop_dummy_socket(self): + self.dummy_socket.close() class MPhysZeroMQServer(Server): """ From 91c8546534d015bee86503ebde77ceaa7473d983 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Fri, 22 Mar 2024 11:49:23 -0400 Subject: [PATCH 6/8] support running on more than 1 rank --- .../as_opt_remote_parallel.py | 3 - .../supersonic_panel/as_opt_remote_serial.py | 69 ++++++++++--------- mphys/network/remote_component.py | 64 +++++++++-------- mphys/network/zmq_pbs.py | 6 +- 4 files changed, 74 insertions(+), 68 deletions(-) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py index 0ccfafba..ef3160d4 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py @@ -40,9 +40,6 @@ def setup(self): class TopLevelGroup(om.Group): def setup(self): - if self.comm.size!=2: - raise SystemError('Please launch with 2 processors') - # IVCs that feed into both parallel groups self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*']) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py index 16a985c3..ba1de172 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py @@ -54,40 +54,41 @@ prob.cleanup() # write out data - cr = om.CaseReader("optimization_history.sql") - driver_cases = cr.list_cases('driver') - - case = cr.get_case(0) - cons = case.get_constraints() - dvs = case.get_design_vars() - objs = case.get_objectives() - - with open("optimization_history.dat","w+") as f: - - for i, k in enumerate(objs.keys()): - f.write('objective: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + if prob.model.comm.rank==0: + cr = om.CaseReader("optimization_history.sql") + driver_cases = cr.list_cases('driver') + + case = cr.get_case(0) + cons = case.get_constraints() + dvs = case.get_design_vars() + objs = case.get_objectives() + + with open("optimization_history.dat","w+") as f: + + for i, k in enumerate(objs.keys()): + f.write('objective: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(cons.keys()): + f.write('constraint: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + for i, k in enumerate(dvs.keys()): + f.write('DV: ' + k + '\n') + for j, case_id in enumerate(driver_cases): + f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') + f.write(' ' + '\n') + + f.write('run times, function\n') + for i in range(len(prob.model.remote.times_function)): + f.write(f'{prob.model.remote.times_function[i]}\n') f.write(' ' + '\n') - for i, k in enumerate(cons.keys()): - f.write('constraint: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n') + f.write('run times, gradient\n') + for i in range(len(prob.model.remote.times_gradient)): + f.write(f'{prob.model.remote.times_gradient[i]}\n') f.write(' ' + '\n') - - for i, k in enumerate(dvs.keys()): - f.write('DV: ' + k + '\n') - for j, case_id in enumerate(driver_cases): - f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n') - f.write(' ' + '\n') - - f.write('run times, function\n') - for i in range(len(prob.model.remote.times_function)): - f.write(f'{prob.model.remote.times_function[i]}\n') - f.write(' ' + '\n') - - f.write('run times, gradient\n') - for i in range(len(prob.model.remote.times_gradient)): - f.write(f'{prob.model.remote.times_gradient[i]}\n') - f.write(' ' + '\n') diff --git a/mphys/network/remote_component.py b/mphys/network/remote_component.py index aec1423b..f43268bd 100644 --- a/mphys/network/remote_component.py +++ b/mphys/network/remote_component.py @@ -32,34 +32,36 @@ def initialize(self): self.options.declare('use_derivative_coloring', default=False, types=bool, desc="assign derivative coloring to objective/constraints. Only for cases with parallel servers") def setup(self): - if self.comm.size>1: - raise SystemError('Using Remote Component on more than 1 rank is not supported') - self.time_estimate_multiplier = self.options['time_estimate_multiplier'] - self.time_estimate_buffer = self.options['time_estimate_buffer'] - self.reboot_only_on_function_call = self.options['reboot_only_on_function_call'] - self.dump_json = self.options['dump_json'] - self.dump_separate_json = self.options['dump_separate_json'] self.var_naming_dot_replacement = self.options['var_naming_dot_replacement'] - self.additional_remote_inputs = self.options['additional_remote_inputs'] - self.additional_remote_outputs = self.options['additional_remote_outputs'] self.use_derivative_coloring = self.options['use_derivative_coloring'] self.derivative_coloring_num = 0 - self.last_analysis_completed_time = time.time() # for tracking down time between function/gradient calls - if self.dump_separate_json: - self.dump_json = True - - self._setup_server_manager() - - # for tracking model times, and determining whether to relaunch servers - self.times_function = np.array([]) - self.times_gradient = np.array([]) - # get baseline model - print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True) - output_dict = self.evaluate_model(command='initialize', - remote_input_dict={'additional_inputs': self.additional_remote_inputs, - 'additional_outputs': self.additional_remote_outputs, - 'component_name': self.name}) + output_dict = None + if self.comm.rank==0: + self.time_estimate_multiplier = self.options['time_estimate_multiplier'] + self.time_estimate_buffer = self.options['time_estimate_buffer'] + self.reboot_only_on_function_call = self.options['reboot_only_on_function_call'] + self.dump_json = self.options['dump_json'] + self.dump_separate_json = self.options['dump_separate_json'] + self.additional_remote_inputs = self.options['additional_remote_inputs'] + self.additional_remote_outputs = self.options['additional_remote_outputs'] + self.last_analysis_completed_time = time.time() # for tracking down time between function/gradient calls + if self.dump_separate_json: + self.dump_json = True + + self._setup_server_manager() + + # for tracking model times, and determining whether to relaunch servers + self.times_function = np.array([]) + self.times_gradient = np.array([]) + + # get baseline model + print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True) + output_dict = self.evaluate_model(command='initialize', + remote_input_dict={'additional_inputs': self.additional_remote_inputs, + 'additional_outputs': self.additional_remote_outputs, + 'component_name': self.name}) + output_dict = self.comm.bcast(output_dict) self._add_design_inputs_from_baseline_model(output_dict) self._add_objectives_from_baseline_model(output_dict) @@ -71,8 +73,11 @@ def setup(self): self.declare_partials('*', '*') def compute(self,inputs,outputs): - input_dict = self._create_input_dict_for_server(inputs) - remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate') + remote_dict = None + if self.comm.rank==0: + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate') + remote_dict = self.comm.bcast(remote_dict) self._assign_objectives_from_remote_output(remote_dict, outputs) self._assign_constraints_from_remote_output(remote_dict, outputs) @@ -81,8 +86,11 @@ def compute(self,inputs,outputs): def compute_partials(self, inputs, partials): # NOTE: this will not use of and wrt inputs, if given in outer script's compute_totals/check_totals - input_dict = self._create_input_dict_for_server(inputs) - remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives') + remote_dict = None + if self.comm.rank==0: + input_dict = self._create_input_dict_for_server(inputs) + remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives') + remote_dict = self.comm.bcast(remote_dict) self._assign_objective_partials_from_remote_output(remote_dict, partials) self._assign_constraint_partials_from_remote_output(remote_dict, partials) diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py index a54960f2..bf862ed8 100644 --- a/mphys/network/zmq_pbs.py +++ b/mphys/network/zmq_pbs.py @@ -108,7 +108,9 @@ def enough_time_is_remaining(self, estimated_model_time): def job_has_expired(self): self.job.update_job_state() - if self.job.state!='R': + if self.job.state=='R': + return False + else: if self.job_expiration_max_restarts is not None: if self.job_expiration_restarts+1 > self.job_expiration_max_restarts: self.stop_server() @@ -116,8 +118,6 @@ def job_has_expired(self): self.job_expiration_restarts += 1 print(f'CLIENT (subsystem {self.component_name}): Job no longer running; flagging for job restart') return True - else: - return False def _port_is_in_use(self, port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: From 900ff3c7055d1b1ac6d97cbe4842a8b978dd1c5a Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Thu, 28 Mar 2024 14:05:24 -0400 Subject: [PATCH 7/8] send write_n2 option to server --- mphys/network/zmq_pbs.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mphys/network/zmq_pbs.py b/mphys/network/zmq_pbs.py index bf862ed8..0f823bf1 100644 --- a/mphys/network/zmq_pbs.py +++ b/mphys/network/zmq_pbs.py @@ -188,10 +188,11 @@ class MPhysZeroMQServer(Server): def __init__(self, port, get_om_group_function_pointer, ignore_setup_warnings = False, ignore_runtime_warnings = False, - rerun_initial_design = False): + rerun_initial_design = False, + write_n2 = False): super().__init__(get_om_group_function_pointer, ignore_setup_warnings, - ignore_runtime_warnings, rerun_initial_design) + ignore_runtime_warnings, rerun_initial_design, write_n2) self._setup_zeromq_socket(port) def _setup_zeromq_socket(self, port): From 124c28046d7eaa3958df36d9164b65886ab53af2 Mon Sep 17 00:00:00 2001 From: Andrew Thelen Date: Tue, 22 Oct 2024 10:37:13 -0400 Subject: [PATCH 8/8] fix final reading of sql file for newer openmdao versions, which save it to outputs_dir --- examples/aerostructural/supersonic_panel/as_opt_parallel.py | 2 +- .../aerostructural/supersonic_panel/as_opt_remote_parallel.py | 2 +- .../aerostructural/supersonic_panel/as_opt_remote_serial.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/aerostructural/supersonic_panel/as_opt_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_parallel.py index 0364c38c..dc9ba65a 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_parallel.py @@ -173,7 +173,7 @@ def get_model(scenario_names): prob.cleanup() if prob.model.comm.rank==0: # write out data - cr = om.CaseReader("optimization_history.sql") + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql") driver_cases = cr.list_cases('driver') case = cr.get_case(0) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py index ef3160d4..7e75921c 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_parallel.py @@ -111,7 +111,7 @@ def setup(self): # write out data if prob.model.comm.rank==0: - cr = om.CaseReader("optimization_history_parallel.sql") + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history_parallel.sql") driver_cases = cr.list_cases('driver') case = cr.get_case(0) diff --git a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py index ba1de172..812dc67a 100644 --- a/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py +++ b/examples/aerostructural/supersonic_panel/as_opt_remote_serial.py @@ -55,7 +55,7 @@ # write out data if prob.model.comm.rank==0: - cr = om.CaseReader("optimization_history.sql") + cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql") driver_cases = cr.list_cases('driver') case = cr.get_case(0)