diff --git a/nilmtk_contrib/disaggregate/afhmm.py b/nilmtk_contrib/disaggregate/afhmm.py index ad16433..9b8e486 100644 --- a/nilmtk_contrib/disaggregate/afhmm.py +++ b/nilmtk_contrib/disaggregate/afhmm.py @@ -1,223 +1,182 @@ -from collections import Counter, OrderedDict +import cvxpy as cvx import math -import pandas as pd +import multiprocessing +import nilmtk.docinherit import numpy as np -from nilmtk.disaggregate import Disaggregator -import cvxpy as cvx +import pandas as pd + +from collections import Counter, OrderedDict from hmmlearn import hmm -from multiprocessing import Process, Manager +from nilmtk.disaggregate import Disaggregator -class AFHMM(Disaggregator): +class AFHMM(Disaggregator): + """ + Additive Factorial Hidden Markov Model (without Signal Aggregate Constraints) + See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf + """ def __init__(self, params): - self.model = [] - self.MODEL_NAME = 'AFHMM' + self.MODEL_NAME = 'AFHMM' self.models = [] - self.num_appliances = 0 - self.appliances = [] + self.means_vector = OrderedDict() + self.pi_s_vector = OrderedDict() + self.transmat_vector = OrderedDict() self.signal_aggregates = OrderedDict() - self.time_period = 720 - self.time_period = params.get('time_period', self.time_period) - self.default_num_states = params.get('default_num_states',2) - self.save_model_path = params.get('save-model-path', None) - self.load_model_path = params.get('pretrained-model-path',None) - self.chunk_wise_training = False + self.time_period = params.get("time_period", 720) + self.default_num_states = params.get("default_num_states", 2) + self.save_model_path = params.get("save-model-path", None) + self.load_model_path = params.get("pretrained-model-path", None) + self.chunk_wise_training = params.get("chunk_wise_training", False) if self.load_model_path: self.load_model(self.load_model_path) - + @nilmtk.docinherit.doc_inherit def partial_fit(self, train_main, train_appliances, **load_kwargs): - - self.models = [] - self.num_appliances = 0 - self.appliances = [] - train_main = pd.concat(train_main, axis=0) - train_app_tmp = [] - for app_name, df_list in train_appliances: - df_list = pd.concat(df_list, axis=0) - train_app_tmp.append((app_name,df_list)) - - # All the initializations required by the model - train_appliances = train_app_tmp - learnt_model = OrderedDict() - means_vector = [] - one_hot_states_vector = [] - pi_s_vector = [] - transmat_vector = [] - states_vector = [] - train_main = train_main.values.flatten().reshape((-1,1)) - - for appliance_name, power in train_appliances: - #print (appliance_name) - # Learning the pi's and transistion probabliites for each appliance using a simple HMM - self.appliances.append(appliance_name) - X = power.values.reshape((-1,1)) - learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") - # Fit - learnt_model[appliance_name].fit(X) - means = learnt_model[appliance_name].means_.flatten().reshape((-1,1)) - states = learnt_model[appliance_name].predict(X) - transmat = learnt_model[appliance_name].transmat_ + """ + train_main: pd.DataFrame It will contain the mains reading. + train_appliances: list of tuples [('appliance1', [ df1 ]),('appliance2', [ df2 ]),...] + """ + for appli_name, df_list in train_appliances: + # Compute model parameters for this chunk. + app_df = pd.concat(df_list, axis=0) + X = app_df.values.reshape(( -1, 1 )) + learnt_model = hmm.GaussianHMM(self.default_num_states, "full") + learnt_model.fit(X) + means = learnt_model.means_.flatten().reshape(( -1, 1 )) + states = learnt_model.predict(X) + transmat = learnt_model.transmat_.T counter = Counter(states.flatten()) - total = 0 - keys = list(counter.keys()) - keys.sort() - - for i in keys: - total+=counter[i] - pi = [] - - for i in keys: - pi.append(counter[i]/total) - pi = np.array(pi) - nb_classes = self.default_num_states - targets = states.reshape(-1) - means_vector.append(means) - pi_s_vector.append(pi) - transmat_vector.append(transmat.T) - states_vector.append(states) - self.num_appliances+=1 - self.signal_aggregates[appliance_name] = (np.mean(X)*self.time_period).reshape((-1,)) - - self.means_vector = means_vector - self.pi_s_vector = pi_s_vector - self.means_vector = means_vector - self.transmat_vector = transmat_vector - print ("Finished Training") - - def disaggregate_thread(self, test_mains,index,d): - - # A threads that does disaggregation - - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - - for epoch in range(6): - # The alernative Minimization - if epoch%2==1: - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) + total = sum(counter.values()) + pi = np.array([ v/total for v in counter.values() ]) + sigagg = (np.mean(X) * self.time_period).reshape(( -1, )) + # Merge with previous values. + # Hypothesis 1: chunk size is constant. (mean of means) + # Hypothesis 2: if the appliance is already registered in + # self.means_vector, then it is also known in all other dicts. + if appli_name in self.means_vector: + self.means_vector[appli_name] = (self.means_vector[appli_name] + means) / 2 + self.pi_s_vector[appli_name] = (self.pi_s_vector[appli_name] + pi) / 2 + self.transmat_vector[appli_name] = (self.transmat_vector[appli_name] + transmat) / 2 + self.signal_aggregates[appli_name] = (self.signal_aggregates[appli_name] + sigagg) / 2 else: - - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable(shape=(len(test_mains), self.default_num_states), name='state_vec-%s'%(appliance_id)) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable(shape=(self.default_num_states, self.default_num_states), name='variable_matrix-%s-%d'%(appliance_id,t)) - appliance_variable_matrix.append(matrix) - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]] - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints+=[cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i]] - - - - total_observed_reading = np.zeros((test_mains.shape)) - # TOtal observed reading equals the sum of each appliance - for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] - - # Loss function to be minimized - - term_1 = 0 - term_2 = 0 - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - flag = 1 - - expression = 0 + self.means_vector[appli_name] = means + self.pi_s_vector[appli_name] = pi + self.transmat_vector[appli_name] = transmat + self.signal_aggregates[appli_name] = sigagg + + print ("{}: Finished training".format(self.MODEL_NAME)) + + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors = [ + cvx.Variable( + shape=( n_samples, self.default_num_states ), + name="state_vec-{}".format(i) + ) + for i in range(n_appliances) + ] + constraints = [] + for stv in cvx_state_vectors: + # State vector values are ranged. + constraints += [ stv >= 0, stv <= 1 ] + # Sum of states equals 1. + for t in range(n_samples): + constraints.append(cvx.sum(stv[t]) == 1) + # Create variable matrices for each appliance, for each sample. + cvx_variable_matrices = [ + [ + cvx.Variable( + shape=( self.default_num_states, self.default_num_states ), + name="variable_matrix-{}-{}".format(i, t) + ) + for t in range(n_samples) + ] + for i in range(n_appliances) + ] + for i, appli_varmats in enumerate(cvx_variable_matrices): + for t, varmat in enumerate(appli_varmats): + # Assign range constraints to variable matrices. + constraints += [ varmat >= 0, varmat <= 1 ] + # Assign equality constraints with state vectors. + constraints += [ + cvx.sum(varmat[l]) == cvx_state_vectors[i][t-1][l] + for l in range(self.default_num_states) + ] + constraints += [ + cvx.sum((varmat.T)[l]) == cvx_state_vectors[i][t][l] + for l in range(self.default_num_states) + ] + + return cvx_state_vectors, constraints, cvx_variable_matrices + + def disaggregate_thread(self, test_mains): + n_epochs = 6 # don't put in __init__, those are inference epochs! + n_samples = len(test_mains) + sigma = 100*np.ones(( n_samples, 1 )) + cvx_state_vectors, constraints, cvx_varmats = self.setup_cvx_constraints( + n_samples, len(self.means_vector)) + # Preparing first terms of the objective function. + term_1 = 0 + term_2 = 0 + total_appli_energy = np.zeros_like(test_mains) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + total_appli_energy += cvx_state_vectors[i]@means + appli_varmats = cvx_varmats[i] + transmat = self.transmat_vector[appli_name] + for varmat in appli_varmats: + term_1 -= cvx.sum(cvx.multiply(varmat, np.log(transmat))) + + first_hot_state = cvx_state_vectors[i][0] + transition_p = self.pi_s_vector[appli_name] + term_2 -= cvx.sum(cvx.multiply(first_hot_state, np.log(transition_p))) + + for epoch in range(n_epochs): + if epoch % 2: + # Alernative minimization on odd epochs. + usage = np.zeros(( n_samples, )) + for i, (appli_name, means) in enumerate(self.means_vector.items()): + usage += np.sum(s_[i]@means, axis=1) + sigma = (test_mains.flatten() - usage.flatten()).reshape(( -1, 1 )) + sigma = np.where(sigma < 1, 1, sigma) + else: + # Primary minimization on even epochs. term_3 = 0 term_4 = 0 - - - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints,) - prob.solve(solver=cvx.SCS,verbose=False,warm_start=True) - s_ = [i.value for i in cvx_state_vectors] + for t in range(n_samples): + term_3 += .5 * (np.log(sigma[t]**2)) + term_4 += .5 * ((test_mains[t][0] - total_appli_energy[t][0])**2 / (sigma[t]**2)) + + objective = cvx.Minimize(term_1 + term_2 + term_3 + term_4) + prob = cvx.Problem(objective, constraints) + prob.solve(solver=cvx.SCS, verbose=False, warm_start=True) + s_ = [ + np.zeros((n_samples, self.default_num_states)) if i.value is None + else i.value + for i in cvx_state_vectors + ] prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() + for i, (appli_name, means) in enumerate(self.means_vector.items()): + app_usage = np.sum(s_[i]@means, axis=1) + prediction_dict[appli_name] = app_usage.flatten() - # Store the result in the index corresponding to the thread. + return pd.DataFrame(prediction_dict, dtype="float32") - d[index] = pd.DataFrame(prediction_dict,dtype='float32') - - def disaggregate_chunk(self, test_mains_list): - - # Sistributes the test mains across multiple threads and runs them in parallel - manager = Manager() - d = manager.dict() - + @nilmtk.docinherit.doc_inherit + def disaggregate_chunk(self, test_mains): + # Distributes the test mains across multiple threads and disaggregate in parallel + # Use all available CPUs except one for the OS. + n_workers = max(( 1, multiprocessing.cpu_count() - 1 )) predictions_lst = [] - for test_mains in test_mains_list: - test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] - threads = [] - for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): - test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] - t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d)) - threads.append(t) - - for t in threads: - t.start() - - for t in threads: - t.join() - - for i in range(len(threads)): - self.arr_of_results.append(d[i]) - prediction = pd.concat(self.arr_of_results,axis=0) - predictions_lst.append(prediction) - - return predictions_lst - + with multiprocessing.Pool(n_workers) as workers: + for mains_df in test_mains: + mains_vect = mains_df.values.flatten().reshape(( -1, 1 )) + n_blocks = int(math.ceil(len(mains_vect)/self.time_period)) + blocks = [ + mains_vect[b * self.time_period:(b + 1) * self.time_period] + for b in range(n_blocks) + ] + res_arr = workers.map(self.disaggregate_thread, blocks) + predictions_lst.append(pd.concat(res_arr, axis=0)) + return predictions_lst diff --git a/nilmtk_contrib/disaggregate/afhmm_sac.py b/nilmtk_contrib/disaggregate/afhmm_sac.py index 1e87b27..e1abf80 100644 --- a/nilmtk_contrib/disaggregate/afhmm_sac.py +++ b/nilmtk_contrib/disaggregate/afhmm_sac.py @@ -1,292 +1,24 @@ -from collections import Counter, OrderedDict -import math -import pandas as pd -import numpy as np -from nilmtk.disaggregate import Disaggregator import cvxpy as cvx -from hmmlearn import hmm -from multiprocessing import Process, Manager -class AFHMM_SAC(Disaggregator): - """1 dimensional baseline Mean algorithm.""" +from nilmtk_contrib.disaggregate import AFHMM + +class AFHMM_SAC(AFHMM): + """ + Additive Factorial Hidden Markov Model with Signal Aggregate Constraints + See: http://papers.nips.cc/paper/5526-signal-aggregate-constraints-in-additive-factorial-hmms-with-application-to-energy-disaggregation.pdf + """ def __init__(self, params): - self.model = [] - self.MIN_CHUNK_LENGTH = 100 + super().__init__(params) self.MODEL_NAME = 'AFHMM_SAC' - self.default_num_states = 2 - self.models = [] - self.num_appliances = 0 - self.appliances = [] - self.time_period = 720 - self.signal_aggregates = OrderedDict() - self.time_period = params.get('time_period', self.time_period) - self.default_num_states = params.get('default_num_states',2) - self.save_model_path = params.get('save-model-path', None) - self.load_model_path = params.get('pretrained-model-path',None) - self.chunk_wise_training = False - if self.load_model_path: - self.load_model(self.load_model_path) - - - - def partial_fit(self, train_main, train_appliances, **load_kwargs): - - self.models = [] - self.num_appliances = 0 - self.appliances = [] - ''' - train_main :- pd.DataFrame It will contain the mains reading. - train_appliances :- list of tuples [('appliance1',df1),('appliance2',df2),...] - ''' - train_main = pd.concat(train_main, axis=0) - train_app_tmp = [] - - for app_name, df_list in train_appliances: - df_list = pd.concat(df_list, axis=0) - train_app_tmp.append((app_name,df_list)) - - train_appliances = train_app_tmp - - - learnt_model = OrderedDict() - - means_vector = [] - - one_hot_states_vector = [] - - pi_s_vector = [] - - transmat_vector = [] - - states_vector = [] - - train_main = train_main.values.flatten().reshape((-1,1)) - - for appliance_name, power in train_appliances: - #print (appliance_name) - self.appliances.append(appliance_name) - - X = power.values.reshape((-1,1)) - - learnt_model[appliance_name] = hmm.GaussianHMM(self.default_num_states, "full") - # Fit - learnt_model[appliance_name].fit(X) - - means = learnt_model[appliance_name].means_.flatten().reshape((-1,1)) - states = learnt_model[appliance_name].predict(X) - transmat = learnt_model[appliance_name].transmat_ - counter = Counter(states.flatten()) - total = 0 - keys = list(counter.keys()) - keys.sort() - - for i in keys: - total+=counter[i] - - pi = [] - - for i in keys: - pi.append(counter[i]/total) - - pi = np.array(pi) - - nb_classes = self.default_num_states - targets = states.reshape(-1) - - means_vector.append(means) - pi_s_vector.append(pi) - transmat_vector.append(transmat.T) - states_vector.append(states) - self.num_appliances+=1 - self.signal_aggregates[appliance_name] = (np.mean(X)*self.time_period).reshape((-1,)) - - self.means_vector = means_vector - self.pi_s_vector = pi_s_vector - self.means_vector = means_vector - self.transmat_vector = transmat_vector - -# print(transmat_vector) -# print (means_vector) -# print (states_vector) -# print (pi_s_vector) - print ("Finished Training") -# print (self.signal_aggregates) -# print (np.log(transmat)) -# print(pi) -# print (np.log(pi)) - #print (np.sum(transmat_vector[0],axis=1)) - #print (np.sum(transmat_vector[0],axis=0)) - #print (states.shape) - #print (one_hot_targets.shape) - - # one_hot_states_vector = np.array(one_hot_states_vector) - - # # print (transmat_vector[0]) - # # print (np.sum(transmat_vector[0],axis=0)) - # # print (np.sum(transmat_vector[0],axis=1)) - # appliance_variable_matrix = [] - - # #print (len(states_vector)) - # #variable_matrix = np.zeros((len(appliance_states),self.default_num_states,self.default_num_states)) - - # for appliance_states in states_vector: - - # variable_matrix = np.zeros((len(appliance_states),self.default_num_states,self.default_num_states)) - - # for i in range(1,len(appliance_states)): - # current_state = appliance_states[i] - # previous_state = appliance_states[i-1] - # variable_matrix[i,current_state, previous_state] = 1 - # appliance_variable_matrix.append(variable_matrix) - - # appliance_variable_matrix = np.array(appliance_variable_matrix) - # term_1_list = [] - - # term_2_list = [] - - - def disaggregate_thread(self, test_mains,index,d): - means_vector = self.means_vector - pi_s_vector = self.pi_s_vector - means_vector = self.means_vector - transmat_vector = self.transmat_vector - sigma = 100*np.ones((len(test_mains),1)) - flag = 0 - for epoch in range(6): - if epoch%2==1: - # The alernative Minimization - usage = np.zeros((len(test_mains))) - for appliance_id in range(self.num_appliances): - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - usage+=app_usage - sigma = (test_mains.flatten() - usage.flatten()).reshape((-1,1)) - sigma = np.where(sigma<1,1,sigma) - else: - - if flag==0: - constraints = [] - cvx_state_vectors = [] - cvx_variable_matrices = [] - delta = cvx.Variable(shape=(len(test_mains),1), name='delta_t') - - for appliance_id in range(self.num_appliances): - state_vector = cvx.Variable(shape=(len(test_mains), self.default_num_states), name='state_vec-%s'%(appliance_id)) - cvx_state_vectors.append(state_vector) - # Enforcing that their values are ranged - constraints+=[cvx_state_vectors[appliance_id]>=0] - constraints+=[cvx_state_vectors[appliance_id]<=1] - # Enforcing that sum of states equals 1 - for t in range(len(test_mains)): # 6c - constraints+=[cvx.sum(cvx_state_vectors[appliance_id][t])==1] - # Creating Variable matrices for every appliance - appliance_variable_matrix = [] - for t in range(len(test_mains)): - matrix = cvx.Variable(shape=(self.default_num_states, self.default_num_states), name='variable_matrix-%s-%d'%(appliance_id,t)) - appliance_variable_matrix.append(matrix) - cvx_variable_matrices.append(appliance_variable_matrix) - # Enforcing that their values are ranged - for t in range(len(test_mains)): - constraints+=[cvx_variable_matrices[appliance_id][t]>=0] - constraints+=[cvx_variable_matrices[appliance_id][t]<=1] - # Constraint 6e - for t in range(0,len(test_mains)): # 6e - for i in range(self.default_num_states): - constraints+=[cvx.sum(((cvx_variable_matrices[appliance_id][t]).T)[i]) == cvx_state_vectors[appliance_id][t][i]] - # Constraint 6d - for t in range(1,len(test_mains)): # 6d - for i in range(self.default_num_states): - constraints+=[cvx.sum(cvx_variable_matrices[appliance_id][t][i]) == cvx_state_vectors[appliance_id][t-1][i]] - - - for appliance_id in range(self.num_appliances): - appliance_usage = cvx_state_vectors[appliance_id]@means_vector[appliance_id] - total_appliance_usage = cvx.sum(appliance_usage) - constraints+=[total_appliance_usage <= self.signal_aggregates[self.appliances[appliance_id]]] - - - # Second order cone constraints - - total_observed_reading = np.zeros((test_mains.shape)) - #print (len(cvx_state_vectors)) - for appliance_id in range(self.num_appliances): - total_observed_reading+=cvx_state_vectors[appliance_id]@means_vector[appliance_id] - flag=1 - - - term_1 = 0 - term_2 = 0 - - for appliance_id in range(self.num_appliances): - # First loop is over appliances - variable_matrix = cvx_variable_matrices[appliance_id] - transmat = transmat_vector[appliance_id] - # Next loop is over different time-stamps - for matrix in variable_matrix: - term_1-=cvx.sum(cvx.multiply(matrix,np.log(transmat))) - one_hot_states = cvx_state_vectors[appliance_id] - pi = pi_s_vector[appliance_id] - # The expression involving start states - first_one_hot_states = one_hot_states[0] - term_2-= cvx.sum(cvx.multiply(first_one_hot_states,np.log(pi))) - - flag = 1 - - expression = 0 - term_3 = 0 - term_4 = 0 - - for t in range(len(test_mains)): - term_4+= .5 * ((test_mains[t][0] - total_observed_reading[t][0])**2 / (sigma[t]**2)) - term_3+= .5 * (np.log(sigma[t]**2)) - expression = term_1 + term_2 + term_3 + term_4 - expression = cvx.Minimize(expression) - prob = cvx.Problem(expression, constraints) - - prob.solve(solver=cvx.SCS,verbose=False, warm_start=True) - s_ = [i.value for i in cvx_state_vectors] - - prediction_dict = {} - for appliance_id in range(self.num_appliances): - app_name = self.appliances[appliance_id] - app_usage= np.sum(s_[appliance_id]@means_vector[appliance_id],axis=1) - prediction_dict[app_name] = app_usage.flatten() - - d[index] = pd.DataFrame(prediction_dict,dtype='float32') - - - - - - - - def disaggregate_chunk(self, test_mains_list): - - # Sistributes the test mains across multiple threads and runs them in parallel - manager = Manager() - d = manager.dict() - - predictions_lst = [] - for test_mains in test_mains_list: - test_mains_big = test_mains.values.flatten().reshape((-1,1)) - self.arr_of_results = [] - threads = [] - for test_block in range(int(math.ceil(len(test_mains_big)/self.time_period))): - test_mains = test_mains_big[test_block*(self.time_period):(test_block+1)*self.time_period] - t = Process(target=self.disaggregate_thread, args=(test_mains,test_block,d)) - threads.append(t) - - for t in threads: - t.start() - for t in threads: - t.join() + def setup_cvx_constraints(self, n_samples, n_appliances): + cvx_state_vectors, constraints, cvx_variable_matrices = super().setup_cvx_constraints(n_samples, n_appliances) + # Constraints on signal aggregates. + for i, (appli_name, signal_aggregate) in enumerate(self.signal_aggregates.items()): + appliance_usage = cvx_state_vectors[i]@self.means_vector[appli_name] + total_appliance_usage = cvx.sum(appliance_usage) + constraints.append(total_appliance_usage <= signal_aggregate) - for i in range(len(threads)): - self.arr_of_results.append(d[i]) - prediction = pd.concat(self.arr_of_results,axis=0) - predictions_lst.append(prediction) - - return predictions_lst + return cvx_state_vectors, constraints, cvx_variable_matrices - \ No newline at end of file