Skip to content
157 changes: 157 additions & 0 deletions pyomo/contrib/parmest/parmest.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,92 @@ def _instance_creation_callback(self, experiment_number=None, cb_data=None):
model = self._create_parmest_model(experiment_number)
return model

def _create_scenario_blocks(self):
# Create scenario block structure
# Code is still heavily hypothetical and needs to be thought over and debugged.
# Utility function for _Q_opt_simple
# Make a block of model scenarios, one for each experiment in exp_list

# Create a parent model to hold scenario blocks
model = pyo.ConcreteModel()
model.exp_scenarios = pyo.Block(range(len(self.exp_list)))
for i in range(len(self.exp_list)):
# Create parmest model for experiment i
parmest_model = self._create_parmest_model(i)
# Assign parmest model to block
model.exp_scenarios[i].transfer_attributes_from(parmest_model)

# Make an objective that sums over all scenario blocks
def total_obj(m):
return sum(
block.Total_Cost_Objective for block in m.exp_scenarios.values()
) / len(self.exp_list)

model.Obj = pyo.Objective(rule=total_obj, sense=pyo.minimize)

# Make sure all the parameters are linked across blocks
for name in self.estimator_theta_names:
# Get the variable from the first block
ref_var = getattr(model.exp_scenarios[0], name)
for i in range(1, len(self.exp_list)):
curr_var = getattr(model.exp_scenarios[i], name)
# Constrain current variable to equal reference variable
model.add_component(
f"Link_{name}_Block0_Block{i}",
pyo.Constraint(expr=curr_var == ref_var),
)

# Deactivate the objective in each block to avoid double counting
for i in range(len(self.exp_list)):
model.exp_scenarios[i].Total_Cost_Objective.deactivate()

model.pprint()

return model

# Redesigning simpler version of _Q_opt
# Still work in progress
def _Q_opt_simple(
self,
return_values=None,
bootlist=None,
ThetaVals=None,
solver="ipopt",
calc_cov=NOTSET,
cov_n=NOTSET,
):
'''
Making new version of _Q_opt that uses scenario blocks, similar to DoE.

Steps:
1. Load model - parmest model should be labeled
2. Create scenario blocks (biggest redesign) - clone model to have one per experiment
3. Define objective and constraints for the block
4. Solve the block as a single problem
5. Analyze results and extract parameter estimates

'''

# Create scenario blocks using utility function
model = self._create_scenario_blocks()

solver = SolverFactory('ipopt')
if self.solver_options is not None:
for key in self.solver_options:
solver.options[key] = self.solver_options[key]

solve_result = solver.solve(model, tee=self.tee)
assert_optimal_termination(solve_result)

# Extract objective value
obj_value = pyo.value(model.Obj)
theta_estimates = {}
# Extract theta estimates from first block
for name in self.estimator_theta_names:
theta_estimates[name] = pyo.value(getattr(model.exp_scenarios[0], name))

return obj_value, theta_estimates

def _Q_opt(
self,
ThetaVals=None,
Expand Down Expand Up @@ -1683,6 +1769,77 @@ def theta_est(
cov_n=cov_n,
)

# Replicate of theta_est for testing simplified _Q_opt
# Still work in progress
def theta_est_simple(
self, solver="ipopt", return_values=[], calc_cov=NOTSET, cov_n=NOTSET
):
"""
Parameter estimation using all scenarios in the data

Parameters
----------
solver: str, optional
Currently only "ef_ipopt" is supported. Default is "ef_ipopt".
return_values: list, optional
List of Variable names, used to return values from the model
for data reconciliation
calc_cov: boolean, optional
DEPRECATED.

If True, calculate and return the covariance matrix
(only for "ef_ipopt" solver). Default is NOTSET
cov_n: int, optional
DEPRECATED.

If calc_cov=True, then the user needs to supply the number of datapoints
that are used in the objective function. Default is NOTSET

Returns
-------
obj_val: float
The objective function value
theta_vals: pd.Series
Estimated values for theta
var_values: pd.DataFrame
Variable values for each variable name in
return_values (only for solver='ipopt')
"""
assert isinstance(solver, str)
assert isinstance(return_values, list)
assert (calc_cov is NOTSET) or isinstance(calc_cov, bool)

if calc_cov is not NOTSET:
deprecation_warning(
"theta_est(): `calc_cov` and `cov_n` are deprecated options and "
"will be removed in the future. Please use the `cov_est()` function "
"for covariance calculation.",
version="6.9.5",
)
else:
calc_cov = False

# check if we are using deprecated parmest
if self.pest_deprecated is not None and calc_cov:
return self.pest_deprecated.theta_est(
solver=solver,
return_values=return_values,
calc_cov=calc_cov,
cov_n=cov_n,
)
elif self.pest_deprecated is not None and not calc_cov:
return self.pest_deprecated.theta_est(
solver=solver, return_values=return_values
)

return self._Q_opt_simple(
solver=solver,
return_values=return_values,
bootlist=None,
calc_cov=calc_cov,
cov_n=cov_n,
)

def cov_est(self, method="finite_difference", solver="ipopt", step=1e-3):
"""
Covariance matrix calculation using all scenarios in the data
Expand Down
Loading