Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweaks for remote components #173

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/basics/remote_components.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Troubleshooting
===============
The :code:`dump_json` option for :code:`RemoteZeroMQComp` will make the component write input and output JSON files, which contain all data sent to and received from the server.
An exception is the :code:`wall_time` entry (given in seconds) in the output JSON file, which is added on the client-side after the server has completed the design evaluation.
Similarly, the :code:`down_time` entry keeps track of the elapsed time between the end of the previous design evaluation and the beginning of the current one.
Another entry that is only provided for informational purposes is :code:`design_counter`, which keeps track of how many different designs have been evaluated on the current server.
If :code:`dump_separate_json` is set to True, then separate files will be written for each design evaluation.
On the server side, an n2 file titled :code:`n2_inner_analysis_<component name>.html` will be written after each evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def get_model(scenario_names):
prob.cleanup()

if prob.model.comm.rank==0: # write out data
cr = om.CaseReader("optimization_history.sql")
cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql")
driver_cases = cr.list_cases('driver')

case = cr.get_case(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ def setup(self):

class TopLevelGroup(om.Group):
def setup(self):
if self.comm.size!=2:
raise SystemError('Please launch with 2 processors')

# IVCs that feed into both parallel groups
self.add_subsystem('ivc', om.IndepVarComp(), promotes=['*'])

Expand Down Expand Up @@ -114,7 +111,7 @@ def setup(self):

# write out data
if prob.model.comm.rank==0:
cr = om.CaseReader("optimization_history_parallel.sql")
cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history_parallel.sql")
driver_cases = cr.list_cases('driver')

case = cr.get_case(0)
Expand Down Expand Up @@ -142,5 +139,6 @@ def setup(self):
f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n')
f.write(' ' + '\n')

# shutdown each rank's server
eval(f'prob.model.multipoint.remote_scenario{prob.model.comm.rank}.stop_server()')
# shutdown the servers
prob.model.multipoint.remote_scenario0.stop_server()
prob.model.multipoint.remote_scenario1.stop_server()
69 changes: 35 additions & 34 deletions examples/aerostructural/supersonic_panel/as_opt_remote_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,40 +54,41 @@
prob.cleanup()

# write out data
cr = om.CaseReader("optimization_history.sql")
driver_cases = cr.list_cases('driver')

case = cr.get_case(0)
cons = case.get_constraints()
dvs = case.get_design_vars()
objs = case.get_objectives()

with open("optimization_history.dat","w+") as f:

for i, k in enumerate(objs.keys()):
f.write('objective: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n')
if prob.model.comm.rank==0:
cr = om.CaseReader(f"{prob.get_outputs_dir()}/optimization_history.sql")
driver_cases = cr.list_cases('driver')

case = cr.get_case(0)
cons = case.get_constraints()
dvs = case.get_design_vars()
objs = case.get_objectives()

with open("optimization_history.dat","w+") as f:

for i, k in enumerate(objs.keys()):
f.write('objective: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + str(cr.get_case(case_id).get_objectives(scaled=False)[k][0]) + '\n')
f.write(' ' + '\n')

for i, k in enumerate(cons.keys()):
f.write('constraint: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n')
f.write(' ' + '\n')

for i, k in enumerate(dvs.keys()):
f.write('DV: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n')
f.write(' ' + '\n')

f.write('run times, function\n')
for i in range(len(prob.model.remote.times_function)):
f.write(f'{prob.model.remote.times_function[i]}\n')
f.write(' ' + '\n')

for i, k in enumerate(cons.keys()):
f.write('constraint: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_constraints(scaled=False)[k])) + '\n')
f.write('run times, gradient\n')
for i in range(len(prob.model.remote.times_gradient)):
f.write(f'{prob.model.remote.times_gradient[i]}\n')
f.write(' ' + '\n')

for i, k in enumerate(dvs.keys()):
f.write('DV: ' + k + '\n')
for j, case_id in enumerate(driver_cases):
f.write(str(j) + ' ' + ' '.join(map(str,cr.get_case(case_id).get_design_vars(scaled=False)[k])) + '\n')
f.write(' ' + '\n')

f.write('run times, function\n')
for i in range(len(prob.model.remote.times_function)):
f.write(f'{prob.model.remote.times_function[i]}\n')
f.write(' ' + '\n')

f.write('run times, gradient\n')
for i in range(len(prob.model.remote.times_gradient)):
f.write(f'{prob.model.remote.times_gradient[i]}\n')
f.write(' ' + '\n')
74 changes: 44 additions & 30 deletions mphys/network/remote_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class RemoteComp(om.ExplicitComponent):
"""
def stop_server(self):
# shortcut for stopping server from top level
self.server_manager.stop_server()
if self.server_manager is not None:
self.server_manager.stop_server()

def initialize(self):
self.options.declare('run_server_filename', default="mphys_server.py", desc="python file that will launch the Server class")
Expand All @@ -31,33 +32,36 @@ def initialize(self):
self.options.declare('use_derivative_coloring', default=False, types=bool, desc="assign derivative coloring to objective/constraints. Only for cases with parallel servers")

def setup(self):
if self.comm.size>1:
raise SystemError('Using Remote Component on more than 1 rank is not supported')
self.time_estimate_multiplier = self.options['time_estimate_multiplier']
self.time_estimate_buffer = self.options['time_estimate_buffer']
self.reboot_only_on_function_call = self.options['reboot_only_on_function_call']
self.dump_json = self.options['dump_json']
self.dump_separate_json = self.options['dump_separate_json']
self.var_naming_dot_replacement = self.options['var_naming_dot_replacement']
self.additional_remote_inputs = self.options['additional_remote_inputs']
self.additional_remote_outputs = self.options['additional_remote_outputs']
self.use_derivative_coloring = self.options['use_derivative_coloring']
self.derivative_coloring_num = 0
if self.dump_separate_json:
self.dump_json = True

self._setup_server_manager()

# for tracking model times, and determining whether to relaunch servers
self.times_function = np.array([])
self.times_gradient = np.array([])

# get baseline model
print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True)
output_dict = self.evaluate_model(command='initialize',
remote_input_dict={'additional_inputs': self.additional_remote_inputs,
'additional_outputs': self.additional_remote_outputs,
'component_name': self.name})
output_dict = None
if self.comm.rank==0:
self.time_estimate_multiplier = self.options['time_estimate_multiplier']
self.time_estimate_buffer = self.options['time_estimate_buffer']
self.reboot_only_on_function_call = self.options['reboot_only_on_function_call']
self.dump_json = self.options['dump_json']
self.dump_separate_json = self.options['dump_separate_json']
self.additional_remote_inputs = self.options['additional_remote_inputs']
self.additional_remote_outputs = self.options['additional_remote_outputs']
self.last_analysis_completed_time = time.time() # for tracking down time between function/gradient calls
if self.dump_separate_json:
self.dump_json = True

self._setup_server_manager()

# for tracking model times, and determining whether to relaunch servers
self.times_function = np.array([])
self.times_gradient = np.array([])

# get baseline model
print(f'CLIENT (subsystem {self.name}): Running model from setup to get design problem info', flush=True)
output_dict = self.evaluate_model(command='initialize',
remote_input_dict={'additional_inputs': self.additional_remote_inputs,
'additional_outputs': self.additional_remote_outputs,
'component_name': self.name})
output_dict = self.comm.bcast(output_dict)

self._add_design_inputs_from_baseline_model(output_dict)
self._add_objectives_from_baseline_model(output_dict)
Expand All @@ -69,8 +73,11 @@ def setup(self):
self.declare_partials('*', '*')

def compute(self,inputs,outputs):
input_dict = self._create_input_dict_for_server(inputs)
remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate')
remote_dict = None
if self.comm.rank==0:
input_dict = self._create_input_dict_for_server(inputs)
remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate')
remote_dict = self.comm.bcast(remote_dict)

self._assign_objectives_from_remote_output(remote_dict, outputs)
self._assign_constraints_from_remote_output(remote_dict, outputs)
Expand All @@ -79,8 +86,11 @@ def compute(self,inputs,outputs):
def compute_partials(self, inputs, partials):
# NOTE: this will not use of and wrt inputs, if given in outer script's compute_totals/check_totals

input_dict = self._create_input_dict_for_server(inputs)
remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives')
remote_dict = None
if self.comm.rank==0:
input_dict = self._create_input_dict_for_server(inputs)
remote_dict = self.evaluate_model(remote_input_dict=input_dict, command='evaluate derivatives')
remote_dict = self.comm.bcast(remote_dict)

self._assign_objective_partials_from_remote_output(remote_dict, partials)
self._assign_constraint_partials_from_remote_output(remote_dict, partials)
Expand All @@ -94,14 +104,16 @@ def evaluate_model(self, remote_input_dict=None, command='initialize'):
if self.dump_json:
self._dump_json(remote_input_dict, command)

down_time = time.time() - self.last_analysis_completed_time
model_start_time = time.time()
self._send_inputs_to_server(remote_input_dict, command)
remote_output_dict = self._receive_outputs_from_server()

model_time_elapsed = time.time() - model_start_time
self.last_analysis_completed_time = time.time()

if self.dump_json:
remote_output_dict.update({'wall_time': model_time_elapsed})
remote_output_dict.update({'wall_time': model_time_elapsed, 'down_time': down_time})
self._dump_json(remote_output_dict, command)

if self._doing_derivative_evaluation(command):
Expand Down Expand Up @@ -135,7 +147,7 @@ def _assign_additional_partials_from_remote_output(self, remote_dict, partials):
def _create_input_dict_for_server(self, inputs):
input_dict = {'design_vars': {}, 'additional_inputs': {}, 'additional_outputs': self.additional_remote_outputs, 'component_name': self.name}
for dv in self.design_var_keys:
input_dict['design_vars'][dv.replace('.',self.var_naming_dot_replacement)] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()}
input_dict['design_vars'][dv] = {'val': inputs[dv.replace('.',self.var_naming_dot_replacement)].tolist()}
for input in self.additional_remote_inputs:
input_dict['additional_inputs'][input] = {'val': inputs[input.replace('.',self.var_naming_dot_replacement)].tolist()}
return input_dict
Expand All @@ -150,6 +162,8 @@ def _is_first_gradient_evaluation(self):
return len(self.times_gradient) == 0

def _need_to_restart_server(self, command: str):
if self.server_manager.job_has_expired():
return True
if self._doing_derivative_evaluation(command):
if self._is_first_gradient_evaluation() or self.reboot_only_on_function_call:
return False
Expand Down
15 changes: 9 additions & 6 deletions mphys/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class Server:
def __init__(self, get_om_group_function_pointer,
ignore_setup_warnings = False,
ignore_runtime_warnings = False,
rerun_initial_design = False):
rerun_initial_design = False,
write_n2 = False):

self.get_om_group_function_pointer = get_om_group_function_pointer
self.ignore_setup_warnings = ignore_setup_warnings
Expand All @@ -38,6 +39,7 @@ def __init__(self, get_om_group_function_pointer,
self.additional_inputs = None
self.additional_outputs = None
self.design_counter = 0 # more debugging info for client side json dumping
self.write_n2 = write_n2

self._load_the_model()

Expand Down Expand Up @@ -99,7 +101,7 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}):
design_vars = self.prob.model._design_vars
remote_output_dict['design_vars'] = {}
for dv in design_vars.keys():
remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv),
remote_output_dict['design_vars'][dv] = {'val': self.prob.get_val(dv, get_remote=True),
'ref': design_vars[dv]['ref'],
'ref0': design_vars[dv]['ref0'],
'lower': design_vars[dv]['lower'],
Expand All @@ -117,7 +119,7 @@ def _gather_design_inputs_from_om_problem(self, remote_output_dict = {}):
def _gather_additional_inputs_from_om_problem(self, remote_output_dict = {}):
remote_output_dict['additional_inputs'] = {}
for input in self.additional_inputs:
remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input)}
remote_output_dict['additional_inputs'][input] = {'val': self.prob.get_val(input, get_remote=True)}
if hasattr(remote_output_dict['additional_inputs'][input]['val'], 'tolist'):
remote_output_dict['additional_inputs'][input]['val'] = remote_output_dict['additional_inputs'][input]['val'].tolist()
return remote_output_dict
Expand Down Expand Up @@ -271,14 +273,14 @@ def _gather_inputs_and_outputs_from_om_problem(self):
def _set_design_variables_into_the_server_problem(self, input_dict):
design_changed = False
for key in input_dict['design_vars'].keys():
if (self.prob.get_val(key)!=input_dict['design_vars'][key]['val']).any():
if (self.prob.get_val(key, get_remote=True)!=input_dict['design_vars'][key]['val']).any():
design_changed = True
self.prob.set_val(key, input_dict['design_vars'][key]['val'])
return design_changed

def _set_additional_inputs_into_the_server_problem(self, input_dict, design_changed):
for key in input_dict['additional_inputs'].keys():
if (self.prob.get_val(key)!=input_dict['additional_inputs'][key]['val']).any():
if (self.prob.get_val(key, get_remote=True)!=input_dict['additional_inputs'][key]['val']).any():
design_changed = True
self.prob.set_val(key, input_dict['additional_inputs'][key]['val'])
return design_changed
Expand Down Expand Up @@ -350,4 +352,5 @@ def run(self):
self._send_outputs_to_client(output_dict)

# write current n2 with values
om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html")
if self.write_n2:
om.n2(self.prob, show_browser=False, outfile=f"n2_inner_analysis_{input_dict['component_name']}.html")
5 changes: 5 additions & 0 deletions mphys/network/server_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ def enough_time_is_remaining(self, estimated_model_time):
How much time the new analysis is estimated to take
"""
return True

def job_has_expired(self):
"""
Check if the job has run out of time.
"""
Loading
Loading