-
Notifications
You must be signed in to change notification settings - Fork 562
Pyomo. DoE: Sensitivity initialization #3639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
a4a1ade
58c3663
26f8cd3
4dab155
55bfeae
f8105ea
0eb9791
66cdd0c
d75e667
e5b2e73
13848fa
bd53282
e40c6bc
75acdd2
0c87484
7cdba5b
6db9e66
f12d908
fda812e
ccfca8c
043e7de
33e1bd0
c95f41d
e9e878d
caf6bae
dd1fdcd
553ab82
a063c33
9af160d
1f849a0
da91f2f
fd8d3ea
0ee6856
01fc6ed
ab7f31f
c6e81ea
aa3961f
555a307
40ac36c
b2ed61d
daf01c7
8664ea6
9393dde
b27d89d
964446b
39a3d2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,6 +45,17 @@ | |
| from pyomo.contrib.sensitivity_toolbox.sens import get_dsdp | ||
|
|
||
| import pyomo.environ as pyo | ||
| from pyomo.contrib.doe.utils import ( | ||
| generate_snake_zigzag_pattern, | ||
| ) # , compute_FIM_metrics | ||
|
|
||
| from pyomo.contrib.doe.utils_updated import compute_FIM_metrics | ||
|
|
||
| """ | ||
| utils_updated.py is the utils.py from my open PR # 3525. | ||
| When that PR is merged, compute_FIM_metrics will be imported from utils.py and this | ||
| import will be removed. | ||
| """ | ||
|
|
||
| from pyomo.opt import SolverStatus | ||
|
|
||
|
|
@@ -1617,6 +1628,221 @@ def compute_FIM_full_factorial( | |
|
|
||
| return self.fim_factorial_results | ||
|
|
||
| def compute_FIM_factorial( | ||
| self, | ||
| model=None, | ||
| design_values: dict = None, | ||
| method="sequential", | ||
| change_one_design_at_a_time=True, | ||
| file_name: str = None, | ||
| ): | ||
| """Will run a simulation-based factorial exploration of the experimental input | ||
| space (i.e., a ``grid search`` or ``parameter sweep``) to understand how the | ||
|
||
| FIM metrics change as a function of the experimental design space. This method | ||
| can be used for both full factorial and fractional factorial designs. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| model : DoE model, optional | ||
| The model to perform the full factorial exploration on. Default: None | ||
| design_values : dict, | ||
| dict of lists, of the form {"var_name": [var_values]}. Default: None. | ||
| The `design_values` should have the key(s) passed as strings that is a | ||
| subset of the `experiment_inputs`. If one or more design variables are not | ||
| to be changed, then they should not be passed in the `design_values` | ||
| dictionary, but if they are passed in the dictionary, then they must be a | ||
| list of floats. For example, if our experiment has 4 design variables | ||
| (i.e., `experiment_inputs`): model.x1, model.x2, model.x3, and model.x4, | ||
| their values may be passed as, design_values= {"x1": [1, 2, 3], "x3": [7], | ||
| "x4": [-10, 20]}. In this case, x2 will not be changed and will be fixed at | ||
| the value in the model. | ||
| method : str, optional | ||
| string to specify which method should be used. options are ``kaug`` and | ||
| ``sequential`. Default: "sequential" | ||
| change_one_design_at_a_time : bool, optional | ||
| If True, will generate a snake-like zigzag combination of the design values | ||
| thatchanges only one of the design variables at a time. This combination | ||
| may help with the convergence in some scenarios. If False, will | ||
| generate a regular nested for loop that can change from one to all the | ||
| design variables at a time. Default: True | ||
| file_name : str, optional | ||
| if provided, will save the results to a json file. Default: None | ||
|
|
||
| Returns | ||
| ------- | ||
| factorial_results: dict | ||
| A dictionary containing the results of the factorial design with the | ||
| following keys: | ||
| - keys of model's experiment_inputs | ||
| - "log10(D-opt)": list of D-optimality values | ||
| - "log10(A-opt)": list of A-optimality values | ||
| - "log10(E-opt)": list of E-optimality values | ||
| - "log10(ME-opt)": list of ME-optimality values | ||
| - "solve_time": list of solve times | ||
| - "total_points": total number of points in the factorial design | ||
| - "success_count": number of successful runs | ||
| - "failure_count": number of failed runs | ||
| - "FIM_all": list of all FIMs computed for each point in the factorial | ||
| design. | ||
|
|
||
| Raises | ||
| ------ | ||
| ValueError | ||
| If the design_values' keys are not a subset of the model's | ||
| `experiment_inputs` keys or if the design_values are not provided. | ||
| """ | ||
|
|
||
| # Start timer | ||
| sp_timer = TicTocTimer() | ||
| sp_timer.tic(msg=None) | ||
| self.logger.info("Beginning Factorial Design.") | ||
|
|
||
| # Make new model for factorial design | ||
| self.factorial_model = self.experiment.get_labeled_model( | ||
| **self.get_labeled_model_args | ||
| ).clone() | ||
| model = self.factorial_model | ||
|
|
||
| if not design_values: | ||
| raise ValueError( | ||
| "design_values must be provided as a dictionary of lists " | ||
| "in the form {<'var_name'>: [<var_values>]}." | ||
| ) | ||
|
|
||
| # Check whether the design_ranges keys are in the experiment_inputs | ||
| design_keys = set(design_values.keys()) | ||
| map_keys = set([k.name for k in model.experiment_inputs.keys()]) | ||
| if not design_keys.issubset(map_keys): | ||
| incorrect_given_keys = design_keys - map_keys | ||
| suggested_keys = map_keys - design_keys | ||
| raise ValueError( | ||
| f"design_values keys: {incorrect_given_keys} are incorrect." | ||
| f"The keys should be from the following keys: {suggested_keys}." | ||
| ) | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to split the function here? That way, we can do extensive testing on the results of |
||
| # Get the design map keys that match the design_values keys | ||
| design_map_keys = [ | ||
| k for k in model.experiment_inputs.keys() if k.name in design_values.keys() | ||
| ] | ||
| # This ensures that the order of the design_values keys matches the order of the | ||
| # design_map_keys so that design_point can be constructed correctly in the loop. | ||
| des_ranges = [design_values[k.name] for k in design_map_keys] | ||
| if change_one_design_at_a_time: | ||
| factorial_points = generate_snake_zigzag_pattern(*des_ranges) | ||
|
||
| else: | ||
| factorial_points = product(*des_ranges) | ||
|
|
||
| factorial_points_list = list(factorial_points) | ||
|
|
||
| factorial_results = {k.name: [] for k in model.experiment_inputs.keys()} | ||
| factorial_results.update( | ||
| { | ||
| "log10(D-opt)": [], | ||
| "log10(A-opt)": [], | ||
| "log10(E-opt)": [], | ||
| "log10(ME-opt)": [], | ||
| "eigval_min": [], | ||
| "eigval_max": [], | ||
| "det_FIM": [], | ||
| "trace_FIM": [], | ||
| "solve_time": [], | ||
| } | ||
| ) | ||
|
|
||
| success_count = 0 | ||
| failure_count = 0 | ||
| total_points = len(factorial_points_list) | ||
|
|
||
| # save all the FIMs for each point in the factorial design | ||
| self.n_parameters = len(model.unknown_parameters) | ||
| FIM_all = np.zeros((total_points, self.n_parameters, self.n_parameters)) | ||
|
|
||
| time_set = [] | ||
| curr_point = 1 # Initial current point | ||
| for design_point in factorial_points_list: | ||
| # Fix design variables at fixed experimental design point | ||
| for i in range(len(design_point)): | ||
| design_map_keys[i].fix(design_point[i]) | ||
|
|
||
| # Timing and logging objects | ||
| self.logger.info(f"=======Iteration Number: {curr_point} =======") | ||
| iter_timer = TicTocTimer() | ||
| iter_timer.tic(msg=None) | ||
|
|
||
| try: | ||
| curr_point = success_count + failure_count + 1 | ||
| self.logger.info(f"This is run {curr_point} out of {total_points}.") | ||
| self.compute_FIM(model=model, method=method) | ||
| success_count += 1 | ||
| # iteration time | ||
| iter_t = iter_timer.toc(msg=None) | ||
| time_set.append(iter_t) | ||
|
|
||
| # More logging | ||
| self.logger.info( | ||
| f"The code has run for {round(sum(time_set), 2)} seconds." | ||
| ) | ||
| self.logger.info( | ||
| "Estimated remaining time: %s seconds", | ||
| round( | ||
| sum(time_set) / (curr_point) * (total_points - curr_point + 1), | ||
| 2, | ||
| ), | ||
| ) | ||
| except: | ||
| self.logger.warning( | ||
| ":::::::::::Warning: Cannot converge this run.::::::::::::" | ||
| ) | ||
| failure_count += 1 | ||
| self.logger.warning("failed count:", failure_count) | ||
|
|
||
| self._computed_FIM = np.zeros(self.prior_FIM.shape) | ||
|
|
||
| iter_t = iter_timer.toc(msg=None) | ||
| time_set.append(iter_t) | ||
|
|
||
| FIM = self._computed_FIM | ||
|
|
||
| # Save FIM for the current design point | ||
| FIM_all[curr_point - 1, :, :] = FIM | ||
|
|
||
| # Compute and record metrics on FIM | ||
| det_FIM, trace_FIM, E_vals, E_vecs, D_opt, A_opt, E_opt, ME_opt = ( | ||
| compute_FIM_metrics(FIM) | ||
| ) | ||
|
|
||
| for k in model.experiment_inputs.keys(): | ||
| factorial_results[k.name].append(pyo.value(k)) | ||
|
|
||
| factorial_results["log10(D-opt)"].append(D_opt) | ||
| factorial_results["log10(A-opt)"].append(A_opt) | ||
| factorial_results["log10(E-opt)"].append(E_opt) | ||
| factorial_results["log10(ME-opt)"].append(ME_opt) | ||
| factorial_results["eigval_min"].append(np.min(E_vals)) | ||
| factorial_results["eigval_max"].append(np.max(E_vals)) | ||
| factorial_results["det_FIM"].append(det_FIM) | ||
| factorial_results["trace_FIM"].append(trace_FIM) | ||
| factorial_results["solve_time"].append(time_set[-1]) | ||
|
|
||
| factorial_results.update( | ||
| { | ||
| "total_points": total_points, | ||
| "success_counts": success_count, | ||
| "failure_counts": failure_count, | ||
| "FIM_all": FIM_all.tolist(), # Save all FIMs | ||
| } | ||
| ) | ||
|
|
||
| self.factorial_results = factorial_results | ||
|
|
||
| # Save the results to a json file based on the file_name provided | ||
| if file_name is not None: | ||
| with open(file_name + ".json", "w") as f: | ||
| json.dump(self.factorial_results, f, indent=4) | ||
| self.logger.info(f"Results saved to {file_name}.json.") | ||
|
|
||
| return self.factorial_results | ||
|
|
||
| # TODO: Overhaul plotting functions to not use strings | ||
| # TODO: Make the plotting functionalities work for >2 design features | ||
| def draw_factorial_figure( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,3 +100,43 @@ def rescale_FIM(FIM, param_vals): | |
| # pass # ToDo: Write error for suffix keys that aren't ParamData or VarData | ||
| # | ||
| # return param_list | ||
|
|
||
|
|
||
| def generate_snake_zigzag_pattern(*lists): | ||
|
||
| """ | ||
| Generates a multi-dimensional zigzag pattern for an arbitrary number of lists. | ||
| This pattern is useful for generating patterns for sensitivity analysis when we want | ||
| to change one variable at a time. This function uses recursion and acts as a generator. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| *lists: A variable number of 1D arraylike arguments. | ||
|
|
||
| Yields | ||
| ------ | ||
| A tuple representing points in the snake-like zigzag pattern. | ||
| """ | ||
|
|
||
| # The main logic is in a nested recursive helper function. | ||
| def _generate_recursive(depth, index_sum): | ||
| # Base case: If we've processed all lists, we're at the end of a path. | ||
| if depth == len(lists): | ||
| yield () | ||
| return | ||
|
|
||
| current_list = lists[depth] | ||
|
|
||
| # Determine the iteration direction based on the sum of parent indices. | ||
| # This is the mathematical rule for the zigzag. | ||
| is_forward = index_sum % 2 == 0 | ||
| iterable = current_list if is_forward else reversed(current_list) | ||
|
|
||
| # Enumerate to get the index `i` for the *next* recursive call's sum. | ||
| for i, value in enumerate(iterable): | ||
| # Recur for the next list, updating the index_sum. | ||
| for sub_pattern in _generate_recursive(depth + 1, index_sum + i): | ||
| # Prepend the current value to the results from deeper levels. | ||
| yield (value,) + sub_pattern | ||
|
|
||
| # Start the recursion at the first list (depth 0) with an initial sum of 0. | ||
| yield from _generate_recursive(depth=0, index_sum=0) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How are we going to test this function? IMO, the next step is to develop some tests for this. That way, as you try to run the sensitivity analysis for an example, you are least know this part of the code is correct.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @adowling2 , I did check it manually before. I have recently added a test for this in |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this function already in Pyomo.DoE somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I created this method. I do not see the same name anywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adowling2, we have
compute_FIM_full_factorial(). I have not changed that method, rather I have added a new method. Maybe we can show a deprecation warning forcompute_FIM_full_factorial()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, a depreciation warning sounds reasonable.