diff --git a/analysis/mc_validation/README.md b/analysis/mc_validation/README.md index e31ffc4d6..b4a7ce1a4 100644 --- a/analysis/mc_validation/README.md +++ b/analysis/mc_validation/README.md @@ -16,5 +16,8 @@ This directory contains scripts from the validation studies of the FullR2 privat - Should be run on the output of the topeft processor - Was used during the June 2022 MC validation studies (for TOP-22-006 pre approval checks) +* `gen_photon_rocessor.py`: + - This script produces a pkl file compatible with the datacard maker using nanoGEN files. + diff --git a/analysis/mc_validation/gen_photon_processor.py b/analysis/mc_validation/gen_photon_processor.py new file mode 100644 index 000000000..ab4f9af4b --- /dev/null +++ b/analysis/mc_validation/gen_photon_processor.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python +import copy +import coffea +import numpy as np +import awkward as ak +np.seterr(divide='ignore', invalid='ignore', over='ignore') +from coffea import processor +from coffea.analysis_tools import PackedSelection +import hist + +import topeft.modules.object_selection as te_os +from topcoffea.modules.histEFT import HistEFT +import topcoffea.modules.eft_helper as efth +import topcoffea.modules.event_selection as tc_es +from topcoffea.modules.get_param_from_jsons import GetParam +from topcoffea.modules.paths import topcoffea_path +get_tc_param = GetParam(topcoffea_path("params/params.json")) +def construct_cat_name(chan_str,njet_str=None,flav_str=None): + + # Get the component strings + nlep_str = chan_str.split("_")[0] # Assumes n leps comes first in the str + chan_str = "_".join(chan_str.split("_")[1:]) # The rest of the channel name is everything that comes after nlep + if chan_str == "": chan_str = None # So that we properly skip this in the for loop below + if flav_str is not None: + flav_str = flav_str + if njet_str is not None: + njet_str = njet_str[-2:] # Assumes number of n jets comes at the end of the string + if "j" not in njet_str: + # The njet string should really have a "j" in it + raise Exception(f"Something when wrong while trying to consturct channel name, is \"{njet_str}\" an njet string?") + + # Put the component strings into the channel name + ret_str = nlep_str + for component in [flav_str,chan_str,njet_str]: + if component is None: continue + ret_str = "_".join([ret_str,component]) + return ret_str + + +class AnalysisProcessor(processor.ProcessorABC): + + def __init__(self, samples, wc_names_lst=[], hist_lst=None, ecut_threshold=None, do_errors=False, do_systematics=False, split_by_lepton_flavor=False, skip_signal_regions=False, skip_control_regions=False, dtype=np.float32): + + self._samples = samples + self._wc_names_lst = wc_names_lst + self._dtype = dtype + self._do_systematics = do_systematics # Whether to process systematic samples + self._skip_signal_regions = skip_signal_regions # Whether to skip the SR categories + self._skip_control_regions = skip_control_regions # Whether to skip the CR categories + + # Create the histograms + proc_axis = hist.axis.StrCategory([], name="process", growth=True) + chan_axis = hist.axis.StrCategory([], name="channel", growth=True) + syst_axis = hist.axis.StrCategory([], name="systematic", growth=True) + appl_axis = hist.axis.StrCategory([], name="appl", label=r"AR/SR", growth=True) + self._accumulator = { + "mll_fromzg_e" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_e", label=r"invmass ee from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll_fromzg_m" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_m", label=r"invmass mm from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll_fromzg_t" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_t", label=r"invmass tautau from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(60, 0, 600, name="mll", label=r"Invmass l0l1"), wc_names=wc_names_lst, rebin=False), + "ht" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(100, 0, 1000, name="ht", label=r"Scalar sum of genjet pt"), wc_names=wc_names_lst, rebin=False), + "ht_clean" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(100, 0, 1000, name="ht_clean", label=r"Scalar sum of clean genjet pt"), wc_names=wc_names_lst, rebin=False), + "tops_pt" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(50, 0, 500, name="tops_pt", label=r"Pt of the sum of the tops"), wc_names=wc_names_lst, rebin=False), + "photon_pt" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Variable([20,35,50,70,100,170,200,250,300], name="photon_pt", label=r"$p_{T}$ $\gamma$ (GeV)"), wc_names=wc_names_lst, rebin=False), + "l0_pt" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(50, 0, 500, name="l0_pt", label=r"Pt of leading lepton"), wc_names=wc_names_lst, rebin=False), + "j0_pt" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(50, 0, 500, name="j0_pt", label=r"Pt of leading jet"), wc_names=wc_names_lst, rebin=False), + "tX_pt" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(40, 0, 400, name="tX_pt", label=r"Pt of the t(t)X system"), wc_names=wc_names_lst, rebin=False), + "njets" : HistEFT(proc_axis, chan_axis, syst_axis, appl_axis, hist.axis.Regular(10, 0, 10, name="njets", label=r"njets"), wc_names=wc_names_lst, rebin=False), + } + + # Set the list of hists to fill + if hist_lst is None: + # If the hist list is none, assume we want to fill all hists + self._hist_lst = list(self._accumulator.keys()) + else: + # Otherwise, just fill the specified subset of hists + for hist_to_include in hist_lst: + if hist_to_include not in self._accumulator.keys(): + raise Exception(f"Error: Cannot specify hist \"{hist_to_include}\", it is not defined in the processor.") + self._hist_lst = hist_lst # Which hists to fill + + # Set the booleans + self._do_errors = do_errors # Whether to calculate and store the w**2 coefficients + + + @property + def accumulator(self): + return self._accumulator + + @property + def columns(self): + return self._columns + + # Main function: run on a given dataset + def process(self, events): + + ### Dataset parameters ### + dataset = events.metadata["dataset"] + + isData = self._samples[dataset]["isData"] + histAxisName = self._samples[dataset]["histAxisName"] + year = self._samples[dataset]["year"] + xsec = self._samples[dataset]["xsec"] + sow = self._samples[dataset]["nSumOfWeights"] + + if isData: raise Exception("Error: This processor is not for data") + + selections = PackedSelection(dtype='uint64') + + ### Get gen particles collection ### + genpart = events.GenPart + genjet = events.GenJet + + + ### Lepton object selection ### + + is_final_mask = genpart.hasFlags(["fromHardProcess","isLastCopy"]) + + gen_top = ak.pad_none(genpart[is_final_mask & (abs(genpart.pdgId) == 6)],2) + + gen_l = genpart[is_final_mask & ((abs(genpart.pdgId) == 11) | (abs(genpart.pdgId) == 13) | (abs(genpart.pdgId) == 15))] + gen_e = genpart[is_final_mask & (abs(genpart.pdgId) == 11)] + gen_m = genpart[is_final_mask & (abs(genpart.pdgId) == 13)] + gen_t = genpart[is_final_mask & (abs(genpart.pdgId) == 15)] + + gen_p = genpart[is_final_mask & (abs(genpart.pdgId) == 22)] + ######### Systematics ########### + + # Define the lists of systematics we include + wgt_correction_syst_lst = [ + "phoSFUp","phoSFDown" # Exp systs + #"CMS_effUp","CMS_effDown" # Exp systs + ] + + gen_l = gen_l[ak.argsort(gen_l.pt, axis=-1, ascending=False)] + gen_l = ak.pad_none(gen_l,2) + gen_e = gen_e[ak.argsort(gen_e.pt, axis=-1, ascending=False)] + gen_m = gen_m[ak.argsort(gen_m.pt, axis=-1, ascending=False)] + gen_t = gen_t[ak.argsort(gen_t.pt, axis=-1, ascending=False)] + + # Object selection after padding + is_em = (np.abs(gen_l[:,0].pdgId)==11) & (np.abs(gen_l[:,1].pdgId)==13) + is_me = (np.abs(gen_l[:,1].pdgId)==11) & (np.abs(gen_l[:,0].pdgId)==13) + is2lOS = ak.any(gen_l[:,0].pdgId/np.abs(gen_l[:,1].pdgId)+gen_l[:,1].pdgId/np.abs(gen_l[:,1].pdgId)==0) & ((ak.num(gen_e) + ak.num(gen_m))==2) + is2lOS_em = is2lOS & ak.any(is_em | is_me) + + gen_p = ak.fill_none(ak.pad_none(gen_p,1), 0) + gen_p = gen_p[ak.argsort(gen_p.pt, axis=-1, ascending=False)] + + gen_l_from_zg = ak.pad_none(gen_l[(gen_l.distinctParent.pdgId == 23) | (gen_l.distinctParent.pdgId == 22)], 2) + gen_e_from_zg = ak.pad_none(gen_e[(gen_e.distinctParent.pdgId == 23) | (gen_e.distinctParent.pdgId == 22)], 2) + gen_m_from_zg = ak.pad_none(gen_m[(gen_m.distinctParent.pdgId == 23) | (gen_m.distinctParent.pdgId == 22)], 2) + gen_t_from_zg = ak.pad_none(gen_t[(gen_t.distinctParent.pdgId == 23) | (gen_t.distinctParent.pdgId == 22)], 2) + + + # Jet object selection + genjet = genjet[genjet.pt > 30] + is_clean_jet = te_os.isClean(genjet, gen_e, drmin=0.4) & te_os.isClean(genjet, gen_m, drmin=0.4) & te_os.isClean(genjet, gen_t, drmin=0.4) + genjet_clean = genjet[is_clean_jet] + genbjet_clean = genjet[np.abs(genjet.partonFlavour)==5] + njets = ak.num(genjet_clean) + nbjets = ak.num(genbjet_clean) + atleast_1ph = ak.num(gen_p)>0 + + + selections.add("2los_CRtt", is2lOS_em & (nbjets==2) & (~atleast_1ph)) # Explicitly add the em requirement here, so we don't have to rely on running with _split_by_lepton_flavor turned on to enforce this requirement + #selections.add("2los_CRtt", (events.is2l_nozeeveto & charge2l_0 & events.is_em & bmask_exactly2med & pass_trg)) # Explicitly add the em requirement here, so we don't have to rely on running with _split_by_lepton_flavor turned on to enforce this requirement + selections.add("2los_ph", is2lOS & (nbjets==2) & (ak.num(gen_p)>0) & (ak.firsts(gen_p).pt>20)) # Explicitly add the em requirement here, so we don't have to rely on running with _split_by_lepton_flavor turned on to enforce this requirement + #selections.add("2los_sf_ph", (retainedbyOverlap & events.is2l & charge2l_0 & (events.is_ee | events.is_mm) & ~sfosz_2los_ttg_mask & events.mask_SF_Zllgamma & bmask_atleast1med & pass_trg & exactly_1ph)) + sfosz_2los_ttg_mask = tc_es.get_Z_peak_mask(gen_l[:,0:2],pt_window=15.0) + Zllgamma_SF_mask = (abs( (gen_l[:,0] + gen_l[:,1] + gen_p[:,0]).mass -91.2) > 15) + selections.add("2los_sf_Zg_CR_ULttg", (is2lOS_em & ~sfosz_2los_ttg_mask & ~Zllgamma_SF_mask & (nbjets==2) & atleast_1ph)) + #selections.add("2los_sf_Zg_CR_ULttg", (retainedbyOverlap & events.is2l_nozeeveto & charge2l_0 & (events.is_ee | events.is_mm) & ~sfosz_2los_ttg_mask & ~events.mask_SF_Zllgamma & bmask_atleast1med & pass_trg & atleast_1ph)) + + + # Njets selection + selections.add("exactly_0j", (njets==0)) + selections.add("exactly_1j", (njets==1)) + selections.add("exactly_2j", (njets==2)) + selections.add("exactly_3j", (njets==3)) + selections.add("exactly_4j", (njets==4)) + selections.add("exactly_5j", (njets==5)) + selections.add("exactly_6j", (njets==6)) + selections.add("atleast_1j", (njets>=1)) + selections.add("atleast_2j", (njets>=2)) + selections.add("atleast_3j", (njets>=3)) + selections.add("atleast_4j", (njets>=4)) + selections.add("atleast_5j", (njets>=5)) + selections.add("atleast_7j", (njets>=7)) + selections.add("atleast_0j", (njets>=0)) + selections.add("atmost_1j" , (njets<=1)) + selections.add("atmost_3j" , (njets<=3)) + + + # AR/SR categories + #selections.add("isSR_2lSS", ( events.is2l_SR) & charge2l_1) + #selections.add("isAR_2lSS", (~events.is2l_SR) & charge2l_1) + #selections.add("isAR_2lSS_OS", ( events.is2l_SR) & charge2l_0) # Sideband for the charge flip + selections.add("isSR_2lOS", is2lOS)#( events.is2l_SR) & charge2l_0) + selections.add("isAR_2lOS", is2lOS)#(~events.is2l_SR) & charge2l_0) + + #selections.add("isSR_3l", events.is3l_SR) + #selections.add("isAR_3l", ~events.is3l_SR) + #selections.add("isSR_4l", events.is4l_SR) + + + ### Get dense axis variables ### + + ht = ak.sum(genjet.pt,axis=-1) + ht_clean = ak.sum(genjet_clean.pt,axis=-1) + + tops_pt = gen_top.sum().pt + + # Pt of the t(t)X system + tX_system = ak.concatenate([gen_top,gen_l_from_zg],axis=1) + tX_pt = tX_system.sum().pt + + # Invmass distributions + mll_e_from_zg = (gen_e_from_zg[:,0] + gen_e_from_zg[:,1]).mass + mll_m_from_zg = (gen_m_from_zg[:,0] + gen_m_from_zg[:,1]).mass + mll_t_from_zg = (gen_t_from_zg[:,0] + gen_t_from_zg[:,1]).mass + mll_l0l1 = (gen_l[:,0] + gen_l[:,1]).mass + + # Dictionary of dense axis values + dense_axis_dict = { + "mll_fromzg_e" : mll_e_from_zg, + "mll_fromzg_m" : mll_m_from_zg, + "mll_fromzg_t" : mll_t_from_zg, + "mll" : mll_l0l1, + "ht" : ht, + "ht_clean" : ht_clean, + "tX_pt" : tX_pt, + "tops_pt" : tops_pt, + "photon_pt" : ak.firsts(gen_p).pt, + "l0_pt" : ak.firsts(gen_l.pt), + "j0_pt" : ak.firsts(genjet.pt), + "njets" : njets, + } + cat_dict = { + "2los_CRtt" : { + "atmost_3j" : { + "lep_chan_lst" : ["2los_ph"], + "lep_flav_lst" : ["em"], + "appl_lst" : ["isSR_2lOS" , "isAR_2lOS"], + }, + }, + } + sr_cat_dict = { + "2los_ph" : { + "atleast_3j" : { + "lep_chan_lst" : ["2los_ph"], + "lep_flav_lst" : ["ee", "mm", "em"], + "appl_lst" : ["isSR_2lOS", "isAR_2lOS"], + }, + }, + } + cr_cat_dict = { + "2los_CR_Zg_ULttg" : { + "atleast_2j" : { + "lep_chan_lst" : ["2los_sf_Zg_CR_ULttg"], + "lep_flav_lst" : ["ee", "mm"], + "appl_lst" : ["isSR_2lOS", "isAR_2lOS"], + }, + }, + } + + + ### Get weights ### + + # Extract the EFT quadratic coefficients and optionally use them to calculate the coefficients on the w**2 quartic function + # eft_coeffs is never Jagged so convert immediately to numpy for ease of use. + eft_coeffs = ak.to_numpy(events["EFTfitCoefficients"]) if hasattr(events, "EFTfitCoefficients") else None + if eft_coeffs is not None: + # Check to see if the ordering of WCs for this sample matches what want + if self._samples[dataset]["WCnames"] != self._wc_names_lst: + eft_coeffs = efth.remap_coeffs(self._samples[dataset]["WCnames"], self._wc_names_lst, eft_coeffs) + eft_w2_coeffs = efth.calc_w2_coeffs(eft_coeffs,self._dtype) if (self._do_errors and eft_coeffs is not None) else None + + # If this is not an eft sample, get the genWeight + if eft_coeffs is None: genw = events["genWeight"] + else: genw = np.ones_like(events["event"]) + lumi = 1000.0*get_tc_param(f"lumi_{year}") + event_weight = lumi*xsec*genw/sow + + weights_dict = {} + # For both data and MC + weights_obj_base = coffea.analysis_tools.Weights(len(events),storeIndividual=True) + weights_obj_base.add("norm",(xsec/sow)*genw*lumi) + for ch_name in ["2los_CRtt", "2los_ph", "2los_CR_Zg_ULttg"]: + weights_dict[ch_name] = copy.deepcopy(weights_obj_base) + weights_dict[ch_name].add("phoSF", ak.ones_like(ak.firsts(gen_p).pt), np.random.rand(*ak.to_numpy(ak.firsts(gen_p).pt).shape), np.random.rand(*ak.to_numpy(ak.firsts(gen_p).pt).shape)) + #weights_dict[ch_name].add("CMS_eff_g", ak.ones_like(ak.firsts(gen_p).pt), np.random.rand(*ak.to_numpy(ak.firsts(gen_p).pt).shape), np.random.rand(*ak.to_numpy(ak.firsts(gen_p).pt).shape)) + + # Example of reweighting based on Ht + #if "private" in histAxisName: + # ht_sf = get_ht_sf(ht,histAxisName) + # event_weight = event_weight*ht_sf + + + ### Loop over the hists we want to fill ### + + hout = self.accumulator + + for dense_axis_name, dense_axis_vals in dense_axis_dict.items(): + if dense_axis_name not in self._hist_lst: + #print(f"Skipping \"{dense_axis_name}\", it is not in the list of hists to include.") + continue + if not self._skip_signal_regions: + cat_dict.update(sr_cat_dict) + if not self._skip_control_regions: + cat_dict.update(cr_cat_dict) + if (not self._skip_signal_regions and not self._skip_control_regions): + for k in sr_cat_dict: + if k in cr_cat_dict: + raise Exception(f"The key {k} is in both CR and SR dictionaries.") + + # Set up the list of syst wgt variations to loop over + wgt_var_lst = ["nominal"] + if self._do_systematics: + if not isData: + wgt_var_lst = wgt_var_lst + wgt_correction_syst_lst + + for wgt_fluct in wgt_var_lst: + for nlep_cat in cat_dict.keys(): + flav_ch = None + njet_ch = None + njets_any_mask = selections.any(*cat_dict[nlep_cat].keys()) + weights_object = weights_dict[nlep_cat] + if (wgt_fluct == "nominal"): + # In the case of "nominal", or the jet energy systematics, no weight systematic variation is used + weight = weights_object.weight(None) + else: + # Otherwise get the weight from the Weights object + if wgt_fluct in weights_object.variations: + weight = weights_object.weight(wgt_fluct) + else: + # Note in this case there is no up/down fluct for this cateogry, so we don't want to fill a hist for it + continue + # Loop over the njets list for each channel + for njet_val in cat_dict[nlep_cat]: + + # Loop over the appropriate AR and SR for this channel + for appl in cat_dict[nlep_cat][njet_val]["appl_lst"]: + # Loop over the channels in each nlep cat (e.g. "3l_m_offZ_1b") + for lep_chan in cat_dict[nlep_cat][njet_val]["lep_chan_lst"]: + + # Loop over the lep flavor list for each channel + for lep_flav in cat_dict[nlep_cat][njet_val]["lep_flav_lst"]: + #cuts_lst = [chan, njet_val] + cuts_lst = [appl,lep_chan] + if dense_axis_name == "njets": + all_cuts_mask = (selections.all(*cuts_lst) & njets_any_mask) + else: + njet_ch = njet_val + all_cuts_mask = (selections.all(*cuts_lst)) + ch_name = construct_cat_name(lep_chan,njet_str=njet_ch,flav_str=flav_ch) + + # Mask out the none values + isnotnone_mask = (ak.fill_none((dense_axis_vals != None),False)) + isnotnone_mask = isnotnone_mask & all_cuts_mask + dense_axis_vals_cut = dense_axis_vals[isnotnone_mask] + event_weight_cut = weight[isnotnone_mask] + eft_coeffs_cut = eft_coeffs + if eft_coeffs is not None: eft_coeffs_cut = eft_coeffs[isnotnone_mask] + #eft_w2_coeffs_cut = eft_w2_coeffs + #if eft_w2_coeffs is not None: eft_w2_coeffs_cut = eft_w2_coeffs[isnotnone_mask] + + # Fill the histos + axes_fill_info_dict = { + dense_axis_name : dense_axis_vals_cut, + "process" : histAxisName, + "appl" : appl, + "channel" : ch_name, + "systematic" : wgt_fluct, + "weight" : event_weight_cut, + "eft_coeff" : eft_coeffs_cut, + #"eft_err_coeff" : eft_w2_coeffs_cut, + } + + hout[dense_axis_name].fill(**axes_fill_info_dict) + + return hout + + def postprocess(self, accumulator): + return accumulator diff --git a/analysis/mc_validation/mc_validation_gen_processor.py b/analysis/mc_validation/mc_validation_gen_processor.py index e0e68c7d4..4c64a7b90 100644 --- a/analysis/mc_validation/mc_validation_gen_processor.py +++ b/analysis/mc_validation/mc_validation_gen_processor.py @@ -2,36 +2,40 @@ import numpy as np import awkward as ak np.seterr(divide='ignore', invalid='ignore', over='ignore') -from coffea import hist, processor +from coffea import processor +import hist -from topcoffea.modules.GetValuesFromJsons import get_lumi -from topcoffea.modules.objects import * -#from topcoffea.modules.corrections import get_ht_sf -from topcoffea.modules.selection import * -from topcoffea.modules.HistEFT import HistEFT +import topeft.modules.object_selection as te_os +from topcoffea.modules.histEFT import HistEFT import topcoffea.modules.eft_helper as efth +from topcoffea.modules.get_param_from_jsons import GetParam +from topcoffea.modules.paths import topcoffea_path +get_tc_param = GetParam(topcoffea_path("params/params.json")) class AnalysisProcessor(processor.ProcessorABC): - def __init__(self, samples, wc_names_lst=[], hist_lst=None, ecut_threshold=None, do_errors=False, do_systematics=False, split_by_lepton_flavor=False, skip_signal_regions=False, skip_control_regions=False, muonSyst='nominal', dtype=np.float32): + def __init__(self, samples, wc_names_lst=[], hist_lst=None, ecut_threshold=None, do_errors=False, do_systematics=False, split_by_lepton_flavor=False, skip_signal_regions=False, skip_control_regions=False, dtype=np.float32): self._samples = samples self._wc_names_lst = wc_names_lst self._dtype = dtype # Create the histograms - self._accumulator = processor.dict_accumulator({ - "mll_fromzg_e" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("mll_fromzg_e", "invmass ee from z/gamma", 40, 0, 200)), - "mll_fromzg_m" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("mll_fromzg_m", "invmass mm from z/gamma", 40, 0, 200)), - "mll_fromzg_t" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("mll_fromzg_t", "invmass tautau from z/gamma", 40, 0, 200)), - "mll" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("mll", "Invmass l0l1", 60, 0, 600)), - "ht" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("ht", "Scalar sum of genjet pt", 100, 0, 1000)), - "ht_clean" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("ht_clean", "Scalar sum of clean genjet pt", 100, 0, 1000)), - "tops_pt" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("tops_pt", "Pt of the sum of the tops", 50, 0, 500)), - "tX_pt" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("tX_pt", "Pt of the t(t)X system", 40, 0, 400)), - "njets" : HistEFT("Events", wc_names_lst, hist.Cat("sample", "sample"), hist.Bin("njets", "njets", 10, 0, 10)), - }) + proc_axis = hist.axis.StrCategory([], name="process", growth=True) + self._accumulator = { + "mll_fromzg_e" : HistEFT(proc_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_e", label=r"invmass ee from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll_fromzg_m" : HistEFT(proc_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_m", label=r"invmass mm from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll_fromzg_t" : HistEFT(proc_axis, hist.axis.Regular(40, 0, 200, name="mll_fromzg_t", label=r"invmass tautau from z/gamma"), wc_names=wc_names_lst, rebin=False), + "mll" : HistEFT(proc_axis, hist.axis.Regular(60, 0, 600, name="mll", label=r"Invmass l0l1"), wc_names=wc_names_lst, rebin=False), + "ht" : HistEFT(proc_axis, hist.axis.Regular(100, 0, 1000, name="ht", label=r"Scalar sum of genjet pt"), wc_names=wc_names_lst, rebin=False), + "ht_clean" : HistEFT(proc_axis, hist.axis.Regular(100, 0, 1000, name="ht_clean", label=r"Scalar sum of clean genjet pt"), wc_names=wc_names_lst, rebin=False), + "tops_pt" : HistEFT(proc_axis, hist.axis.Regular(50, 0, 500, name="tops_pt", label=r"Pt of the sum of the tops"), wc_names=wc_names_lst, rebin=False), + "l0_pt" : HistEFT(proc_axis, hist.axis.Regular(50, 0, 500, name="l0_pt", label=r"Pt of leading lepton"), wc_names=wc_names_lst, rebin=False), + "j0_pt" : HistEFT(proc_axis, hist.axis.Regular(50, 0, 500, name="j0_pt", label=r"Pt of leading jet"), wc_names=wc_names_lst, rebin=False), + "tX_pt" : HistEFT(proc_axis, hist.axis.Regular(40, 0, 400, name="tX_pt", label=r"Pt of the t(t)X system"), wc_names=wc_names_lst, rebin=False), + "njets" : HistEFT(proc_axis, hist.axis.Regular(10, 0, 10, name="njets", label=r"njets"), wc_names=wc_names_lst, rebin=False), + } # Set the list of hists to fill if hist_lst is None: @@ -100,7 +104,7 @@ def process(self, events): # Jet object selection genjet = genjet[genjet.pt > 30] - is_clean_jet = isClean(genjet, gen_e, drmin=0.4) & isClean(genjet, gen_m, drmin=0.4) & isClean(genjet, gen_t, drmin=0.4) + is_clean_jet = te_os.isClean(genjet, gen_e, drmin=0.4) & te_os.isClean(genjet, gen_m, drmin=0.4) & te_os.isClean(genjet, gen_t, drmin=0.4) genjet_clean = genjet[is_clean_jet] njets = ak.num(genjet_clean) @@ -132,6 +136,8 @@ def process(self, events): "ht_clean" : ht_clean, "tX_pt" : tX_pt, "tops_pt" : tops_pt, + "l0_pt" : ak.firsts(gen_l.pt), + "j0_pt" : ak.firsts(genjet.pt), "njets" : njets, } @@ -150,7 +156,7 @@ def process(self, events): # If this is not an eft sample, get the genWeight if eft_coeffs is None: genw = events["genWeight"] else: genw = np.ones_like(events["event"]) - lumi = get_lumi(year)*1000.0 + lumi = 1000.0*get_tc_param(f"lumi_{year}") event_weight = lumi*xsec*genw/sow # Example of reweighting based on Ht @@ -161,7 +167,7 @@ def process(self, events): ### Loop over the hists we want to fill ### - hout = self.accumulator.identity() + hout = self.accumulator for dense_axis_name, dense_axis_vals in dense_axis_dict.items(): @@ -171,16 +177,16 @@ def process(self, events): event_weight_cut = event_weight[isnotnone_mask] eft_coeffs_cut = eft_coeffs if eft_coeffs is not None: eft_coeffs_cut = eft_coeffs[isnotnone_mask] - eft_w2_coeffs_cut = eft_w2_coeffs - if eft_w2_coeffs is not None: eft_w2_coeffs_cut = eft_w2_coeffs[isnotnone_mask] + #eft_w2_coeffs_cut = eft_w2_coeffs + #if eft_w2_coeffs is not None: eft_w2_coeffs_cut = eft_w2_coeffs[isnotnone_mask] # Fill the histos axes_fill_info_dict = { dense_axis_name : dense_axis_vals_cut, - "sample" : histAxisName, + "process" : histAxisName, "weight" : event_weight_cut, "eft_coeff" : eft_coeffs_cut, - "eft_err_coeff" : eft_w2_coeffs_cut, + #"eft_err_coeff" : eft_w2_coeffs_cut, } hout[dense_axis_name].fill(**axes_fill_info_dict) diff --git a/analysis/mc_validation/run_analysis.py b/analysis/mc_validation/run_analysis.py new file mode 100644 index 000000000..650a8dbac --- /dev/null +++ b/analysis/mc_validation/run_analysis.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python + +import argparse +import json +import time +import cloudpickle +import gzip +import os + +from coffea import processor +from coffea.nanoevents import NanoAODSchema + +import topcoffea.modules.utils as utils +import topcoffea.modules.remote_environment as remote_environment + +from topeft.modules.dataDrivenEstimation import DataDrivenProducer +from topeft.modules.get_renormfact_envelope import get_renormfact_envelope +import mc_validation_gen_processor as gen_processor + +LST_OF_KNOWN_EXECUTORS = ["futures","work_queue"] + +WGT_VAR_LST = [ + "nSumOfWeights_ISRUp", + "nSumOfWeights_ISRDown", + "nSumOfWeights_FSRUp", + "nSumOfWeights_FSRDown", + "nSumOfWeights_renormUp", + "nSumOfWeights_renormDown", + "nSumOfWeights_factUp", + "nSumOfWeights_factDown", + "nSumOfWeights_renormfactUp", + "nSumOfWeights_renormfactDown", +] + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='You can customize your run') + parser.add_argument('jsonFiles' , nargs='?', default='', help = 'Json file(s) containing files and metadata') + parser.add_argument('--executor','-x' , default='work_queue', help = 'Which executor to use') + parser.add_argument('--prefix', '-r' , nargs='?', default='', help = 'Prefix or redirector to look for the files') + parser.add_argument('--test','-t' , action='store_true' , help = 'To perform a test, run over a few events in a couple of chunks') + parser.add_argument('--pretend' , action='store_true', help = 'Read json files but, not execute the analysis') + parser.add_argument('--nworkers','-n' , default=8 , help = 'Number of workers') + parser.add_argument('--chunksize','-s' , default=100000, help = 'Number of events per chunk') + parser.add_argument('--nchunks','-c' , default=None, help = 'You can choose to run only a number of chunks') + parser.add_argument('--outname','-o' , default='plotsTopEFT', help = 'Name of the output file with histograms') + parser.add_argument('--outpath','-p' , default='histos', help = 'Name of the output directory') + parser.add_argument('--treename' , default='Events', help = 'Name of the tree inside the files') + parser.add_argument('--do-errors' , action='store_true', help = 'Save the w**2 coefficients') + parser.add_argument('--do-systs', action='store_true', help = 'Compute systematic variations') + parser.add_argument('--split-lep-flavor', action='store_true', help = 'Split up categories by lepton flavor') + parser.add_argument('--skip-sr', action='store_true', help = 'Skip all signal region categories') + parser.add_argument('--skip-cr', action='store_true', help = 'Skip all control region categories') + parser.add_argument('--do-np' , action='store_true', help = 'Perform nonprompt estimation on the output hist, and save a new hist with the np contribution included. Note that signal, background and data samples should all be processed together in order for this option to make sense.') + parser.add_argument('--do-renormfact-envelope', action='store_true', help = 'Perform renorm/fact envelope calculation on the output hist (saves the modified with the the same name as the original.') + parser.add_argument('--wc-list', action='extend', nargs='+', help = 'Specify a list of Wilson coefficients to use in filling histograms.') + parser.add_argument('--hist-list', action='extend', nargs='+', help = 'Specify a list of histograms to fill.') + parser.add_argument('--ecut', default=None , help = 'Energy cut threshold i.e. throw out events above this (GeV)') + parser.add_argument('--port', default='9123-9130', help = 'Specify the Work Queue port. An integer PORT or an integer range PORT_MIN-PORT_MAX.') + + + args = parser.parse_args() + jsonFiles = args.jsonFiles + prefix = args.prefix + executor = args.executor + dotest = args.test + nworkers = int(args.nworkers) + chunksize = int(args.chunksize) + nchunks = int(args.nchunks) if not args.nchunks is None else args.nchunks + outname = args.outname + outpath = args.outpath + pretend = args.pretend + treename = args.treename + do_errors = args.do_errors + do_systs = args.do_systs + split_lep_flavor = args.split_lep_flavor + skip_sr = args.skip_sr + skip_cr = args.skip_cr + do_np = args.do_np + do_renormfact_envelope = args.do_renormfact_envelope + wc_lst = args.wc_list if args.wc_list is not None else [] + + # Check if we have valid options + if executor not in LST_OF_KNOWN_EXECUTORS: + raise Exception(f"The \"{executor}\" executor is not known. Please specify an executor from the known executors ({LST_OF_KNOWN_EXECUTORS}). Exiting.") + if do_renormfact_envelope: + if not do_systs: + raise Exception("Error: Cannot specify do_renormfact_envelope if we are not including systematics.") + if not do_np: + raise Exception("Error: Cannot specify do_renormfact_envelope if we have not already done the integration across the appl axis that occurs in the data driven estimator step.") + if dotest: + if executor == "futures": + nchunks = 2 + chunksize = 10000 + nworkers = 1 + print('Running a fast test with %i workers, %i chunks of %i events'%(nworkers, nchunks, chunksize)) + else: + raise Exception(f"The \"test\" option is not set up to work with the {executor} executor. Exiting.") + + + # Set the threshold for the ecut (if not applying a cut, should be None) + ecut_threshold = args.ecut + if ecut_threshold is not None: ecut_threshold = float(args.ecut) + + if executor == "work_queue": + # construct wq port range + port = list(map(int, args.port.split('-'))) + if len(port) < 1: + raise ValueError("At least one port value should be specified.") + if len(port) > 2: + raise ValueError("More than one port range was specified.") + if len(port) == 1: + # convert single values into a range of one element + port.append(port[0]) + + # Figure out which hists to include + if args.hist_list == ["ana"]: + # Here we hardcode a list of hists used for the analysis + hist_lst = ["cosZ", "cosq", "azim", "tops_pt", "tX_pt", "njets"] + elif args.hist_list == ["cr"]: + # Here we hardcode a list of hists used for the CRs + hist_lst = ["lj0pt", "ptz", "met", "ljptsum", "l0pt", "l0eta", "l1pt", "l1eta", "j0pt", "j0eta", "njets", "nbtagsl", "invmass"] + else: + # We want to specify a custom list + # If we don't specify this argument, it will be None, and the processor will fill all hists + hist_lst = args.hist_list + + + ### Load samples from json + samplesdict = {} + allInputFiles = [] + + def LoadJsonToSampleName(jsonFile, prefix): + sampleName = jsonFile if not '/' in jsonFile else jsonFile[jsonFile.rfind('/')+1:] + if sampleName.endswith('.json'): sampleName = sampleName[:-5] + with open(jsonFile) as jf: + samplesdict[sampleName] = json.load(jf) + samplesdict[sampleName]['redirector'] = prefix + + if isinstance(jsonFiles, str) and ',' in jsonFiles: + jsonFiles = jsonFiles.replace(' ', '').split(',') + elif isinstance(jsonFiles, str): + jsonFiles = [jsonFiles] + for jsonFile in jsonFiles: + if os.path.isdir(jsonFile): + if not jsonFile.endswith('/'): jsonFile+='/' + for f in os.path.listdir(jsonFile): + if f.endswith('.json'): allInputFiles.append(jsonFile+f) + else: + allInputFiles.append(jsonFile) + + # Read from cfg files + for f in allInputFiles: + if not os.path.isfile(f): + raise Exception(f'[ERROR] Input file {f} not found!') + # This input file is a json file, not a cfg + if f.endswith('.json'): + LoadJsonToSampleName(f, prefix) + # Open cfg files + else: + with open(f) as fin: + print(' >> Reading json from cfg file...') + lines = fin.readlines() + for l in lines: + if '#' in l: + l=l[:l.find('#')] + l = l.replace(' ', '').replace('\n', '') + if l == '': continue + if ',' in l: + l = l.split(',') + for nl in l: + if not os.path.isfile(l): + prefix = nl + else: + LoadJsonToSampleName(nl, prefix) + else: + if not os.path.isfile(l): + prefix = l + else: + LoadJsonToSampleName(l, prefix) + + flist = {} + nevts_total = 0 + for sname in samplesdict.keys(): + redirector = samplesdict[sname]['redirector'] + flist[sname] = [(redirector+f) for f in samplesdict[sname]['files']] + samplesdict[sname]['year'] = samplesdict[sname]['year'] + samplesdict[sname]['xsec'] = float(samplesdict[sname]['xsec']) + samplesdict[sname]['nEvents'] = int(samplesdict[sname]['nEvents']) + nevts_total += samplesdict[sname]['nEvents'] + samplesdict[sname]['nGenEvents'] = int(samplesdict[sname]['nGenEvents']) + samplesdict[sname]['nSumOfWeights'] = float(samplesdict[sname]['nSumOfWeights']) + if not samplesdict[sname]["isData"]: + for wgt_var in WGT_VAR_LST: + # Check that MC samples have all needed weight sums (only needed if doing systs) + if do_systs: + if (wgt_var not in samplesdict[sname]): + raise Exception(f"Missing weight variation \"{wgt_var}\".") + else: + samplesdict[sname][wgt_var] = float(samplesdict[sname][wgt_var]) + # Print file info + print('>> '+sname) + print(' - isData? : %s' %('YES' if samplesdict[sname]['isData'] else 'NO')) + print(' - year : %s' %samplesdict[sname]['year']) + print(' - xsec : %f' %samplesdict[sname]['xsec']) + print(' - histAxisName : %s' %samplesdict[sname]['histAxisName']) + print(' - options : %s' %samplesdict[sname]['options']) + print(' - tree : %s' %samplesdict[sname]['treeName']) + print(' - nEvents : %i' %samplesdict[sname]['nEvents']) + print(' - nGenEvents : %i' %samplesdict[sname]['nGenEvents']) + print(' - SumWeights : %i' %samplesdict[sname]['nSumOfWeights']) + if not samplesdict[sname]["isData"]: + for wgt_var in WGT_VAR_LST: + if wgt_var in samplesdict[sname]: + print(f' - {wgt_var}: {samplesdict[sname][wgt_var]}') + print(' - Prefix : %s' %samplesdict[sname]['redirector']) + print(' - nFiles : %i' %len(samplesdict[sname]['files'])) + for fname in samplesdict[sname]['files']: print(' %s'%fname) + + if pretend: + print('pretending...') + exit() + + # Extract the list of all WCs, as long as we haven't already specified one. + if len(wc_lst) == 0: + for k in samplesdict.keys(): + for wc in samplesdict[k]['WCnames']: + if wc not in wc_lst: + wc_lst.append(wc) + + if len(wc_lst) > 0: + # Yes, why not have the output be in correct English? + if len(wc_lst) == 1: + wc_print = wc_lst[0] + elif len(wc_lst) == 2: + wc_print = wc_lst[0] + ' and ' + wc_lst[1] + else: + wc_print = ', '.join(wc_lst[:-1]) + ', and ' + wc_lst[-1] + print('Wilson Coefficients: {}.'.format(wc_print)) + else: + print('No Wilson coefficients specified') + + processor_instance = gen_processor.AnalysisProcessor(samplesdict,wc_lst,hist_lst,ecut_threshold,do_errors,do_systs,split_lep_flavor,skip_sr,skip_cr) + + if executor == "work_queue": + executor_args = { + 'master_name': '{}-workqueue-coffea'.format(os.environ['USER']), + + # find a port to run work queue in this range: + 'port': port, + + 'debug_log': 'debug.log', + 'transactions_log': 'tr.log', + 'stats_log': 'stats.log', + 'tasks_accum_log': 'tasks.log', + + 'environment_file': remote_environment.get_environment( + extra_pip_local = {"topeft": ["topeft", "setup.py"]}, + ), + 'filepath': f'/project01/ndcms/{os.environ["USER"]}', + 'extra_input_files': ["mc_validation_gen_processor.py"], + + 'retries': 5, + + # use mid-range compression for chunks results. 9 is the default for work + # queue in coffea. Valid values are 0 (minimum compression, less memory + # usage) to 16 (maximum compression, more memory usage). + 'compression': 9, + + # automatically find an adequate resource allocation for tasks. + # tasks are first tried using the maximum resources seen of previously ran + # tasks. on resource exhaustion, they are retried with the maximum resource + # values, if specified below. if a maximum is not specified, the task waits + # forever until a larger worker connects. + 'resource_monitor': True, + 'resources_mode': 'auto', + + # this resource values may be omitted when using + # resources_mode: 'auto', but they do make the initial portion + # of a workflow run a little bit faster. + # Rather than using whole workers in the exploratory mode of + # resources_mode: auto, tasks are forever limited to a maximum + # of 8GB of mem and disk. + # + # NOTE: The very first tasks in the exploratory + # mode will use the values specified here, so workers need to be at least + # this large. If left unspecified, tasks will use whole workers in the + # exploratory mode. + # 'cores': 1, + # 'disk': 8000, #MB + # 'memory': 10000, #MB + + # control the size of accumulation tasks. Results are + # accumulated in groups of size chunks_per_accum, keeping at + # most chunks_per_accum at the same time in memory per task. + 'chunks_per_accum': 25, + 'chunks_accum_in_mem': 2, + + # terminate workers on which tasks have been running longer than average. + # This is useful for temporary conditions on worker nodes where a task will + # be finish faster is ran in another worker. + # the time limit is computed by multipliying the average runtime of tasks + # by the value of 'fast_terminate_workers'. Since some tasks can be + # legitimately slow, no task can trigger the termination of workers twice. + # + # warning: small values (e.g. close to 1) may cause the workflow to misbehave, + # as most tasks will be terminated. + # + # Less than 1 disables it. + 'fast_terminate_workers': 0, + + # print messages when tasks are submitted, finished, etc., + # together with their resource allocation and usage. If a task + # fails, its standard output is also printed, so we can turn + # off print_stdout for all tasks. + 'verbose': True, + 'print_stdout': False, + } + + # Run the processor and get the output + tstart = time.time() + + if executor == "futures": + exec_instance = processor.futures_executor(workers=nworkers) + runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks) + elif executor == "work_queue": + executor = processor.WorkQueueExecutor(**executor_args) + runner = processor.Runner(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180) + + output = runner(flist, treename, processor_instance) + + dt = time.time() - tstart + + if executor == "work_queue": + print('Processed {} events in {} seconds ({:.2f} evts/sec).'.format(nevts_total,dt,nevts_total/dt)) + + #nbins = sum(sum(arr.size for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist)) + #nfilled = sum(sum(np.sum(arr > 0) for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist)) + #print("Filled %.0f bins, nonzero bins: %1.1f %%" % (nbins, 100*nfilled/nbins,)) + + if executor == "futures": + print("Processing time: %1.2f s with %i workers (%.2f s cpu overall)" % (dt, nworkers, dt*nworkers, )) + + # Save the output + if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath) + out_pkl_file = os.path.join(outpath,outname+".pkl.gz") + print(f"\nSaving output in {out_pkl_file}...") + with gzip.open(out_pkl_file, "wb") as fout: + cloudpickle.dump(output, fout) + print("Done!") + + # Run the data driven estimation, save the output + if do_np: + print("\nDoing the nonprompt estimation...") + out_pkl_file_name_np = os.path.join(outpath,outname+"_np.pkl.gz") + ddp = DataDrivenProducer(out_pkl_file,out_pkl_file_name_np) + print(f"Saving output in {out_pkl_file_name_np}...") + ddp.dumpToPickle() + print("Done!") + # Run the renorm fact envelope calculation + if do_renormfact_envelope: + print("\nDoing the renorm. fact. envelope calculation...") + dict_of_histos = utils.get_hist_from_pkl(out_pkl_file_name_np,allow_empty=False) + dict_of_histos_after_applying_envelope = get_renormfact_envelope(dict_of_histos) + utils.dump_to_pkl(out_pkl_file_name_np,dict_of_histos_after_applying_envelope) diff --git a/analysis/mc_validation/run_gen_analysis.py b/analysis/mc_validation/run_gen_analysis.py new file mode 100644 index 000000000..d28565152 --- /dev/null +++ b/analysis/mc_validation/run_gen_analysis.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python + +import argparse +import json +import time +import cloudpickle +import gzip +import os + +from coffea import processor +from coffea.nanoevents import NanoAODSchema + +import topcoffea.modules.utils as utils +import topcoffea.modules.remote_environment as remote_environment + +from topeft.modules.dataDrivenEstimation import DataDrivenProducer +from topeft.modules.get_renormfact_envelope import get_renormfact_envelope +import gen_photon_processor as gen_processor + +LST_OF_KNOWN_EXECUTORS = ["futures","work_queue"] + +WGT_VAR_LST = [ + #"nSumOfWeights_ISRUp", + #"nSumOfWeights_ISRDown", + #"nSumOfWeights_FSRUp", + #"nSumOfWeights_FSRDown", + #"nSumOfWeights_renormUp", + #"nSumOfWeights_renormDown", + #"nSumOfWeights_factUp", + #"nSumOfWeights_factDown", + #"nSumOfWeights_renormfactUp", + #"nSumOfWeights_renormfactDown", +] + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='You can customize your run') + parser.add_argument('jsonFiles' , nargs='?', default='', help = 'Json file(s) containing files and metadata') + parser.add_argument('--executor','-x' , default='work_queue', help = 'Which executor to use') + parser.add_argument('--prefix', '-r' , nargs='?', default='', help = 'Prefix or redirector to look for the files') + parser.add_argument('--test','-t' , action='store_true' , help = 'To perform a test, run over a few events in a couple of chunks') + parser.add_argument('--pretend' , action='store_true', help = 'Read json files but, not execute the analysis') + parser.add_argument('--nworkers','-n' , default=8 , help = 'Number of workers') + parser.add_argument('--chunksize','-s' , default=100000, help = 'Number of events per chunk') + parser.add_argument('--nchunks','-c' , default=None, help = 'You can choose to run only a number of chunks') + parser.add_argument('--outname','-o' , default='plotsTopEFT', help = 'Name of the output file with histograms') + parser.add_argument('--outpath','-p' , default='histos', help = 'Name of the output directory') + parser.add_argument('--treename' , default='Events', help = 'Name of the tree inside the files') + parser.add_argument('--do-errors' , action='store_true', help = 'Save the w**2 coefficients') + parser.add_argument('--do-systs', action='store_true', help = 'Compute systematic variations') + parser.add_argument('--split-lep-flavor', action='store_true', help = 'Split up categories by lepton flavor') + parser.add_argument('--skip-sr', action='store_true', help = 'Skip all signal region categories') + parser.add_argument('--skip-cr', action='store_true', help = 'Skip all control region categories') + parser.add_argument('--do-np' , action='store_true', help = 'Perform nonprompt estimation on the output hist, and save a new hist with the np contribution included. Note that signal, background and data samples should all be processed together in order for this option to make sense.') + parser.add_argument('--do-renormfact-envelope', action='store_true', help = 'Perform renorm/fact envelope calculation on the output hist (saves the modified with the the same name as the original.') + parser.add_argument('--wc-list', action='extend', nargs='+', help = 'Specify a list of Wilson coefficients to use in filling histograms.') + parser.add_argument('--hist-list', action='extend', nargs='+', help = 'Specify a list of histograms to fill.') + parser.add_argument('--ecut', default=None , help = 'Energy cut threshold i.e. throw out events above this (GeV)') + parser.add_argument('--port', default='9123-9130', help = 'Specify the Work Queue port. An integer PORT or an integer range PORT_MIN-PORT_MAX.') + + + args = parser.parse_args() + jsonFiles = args.jsonFiles + prefix = args.prefix + executor = args.executor + dotest = args.test + nworkers = int(args.nworkers) + chunksize = int(args.chunksize) + nchunks = int(args.nchunks) if not args.nchunks is None else args.nchunks + outname = args.outname + outpath = args.outpath + pretend = args.pretend + treename = args.treename + do_errors = args.do_errors + do_systs = args.do_systs + split_lep_flavor = args.split_lep_flavor + skip_sr = args.skip_sr + skip_cr = args.skip_cr + do_np = args.do_np + do_renormfact_envelope = args.do_renormfact_envelope + wc_lst = args.wc_list if args.wc_list is not None else [] + + # Check if we have valid options + if executor not in LST_OF_KNOWN_EXECUTORS: + raise Exception(f"The \"{executor}\" executor is not known. Please specify an executor from the known executors ({LST_OF_KNOWN_EXECUTORS}). Exiting.") + if do_renormfact_envelope: + if not do_systs: + raise Exception("Error: Cannot specify do_renormfact_envelope if we are not including systematics.") + if not do_np: + raise Exception("Error: Cannot specify do_renormfact_envelope if we have not already done the integration across the appl axis that occurs in the data driven estimator step.") + if dotest: + if executor == "futures": + nchunks = 2 + chunksize = 10000 + nworkers = 1 + print('Running a fast test with %i workers, %i chunks of %i events'%(nworkers, nchunks, chunksize)) + else: + raise Exception(f"The \"test\" option is not set up to work with the {executor} executor. Exiting.") + + + # Set the threshold for the ecut (if not applying a cut, should be None) + ecut_threshold = args.ecut + if ecut_threshold is not None: ecut_threshold = float(args.ecut) + + if executor == "work_queue": + # construct wq port range + port = list(map(int, args.port.split('-'))) + if len(port) < 1: + raise ValueError("At least one port value should be specified.") + if len(port) > 2: + raise ValueError("More than one port range was specified.") + if len(port) == 1: + # convert single values into a range of one element + port.append(port[0]) + + # Figure out which hists to include + if args.hist_list == ["ana"]: + # Here we hardcode a list of hists used for the analysis + hist_lst = ["cosZ", "cosq", "azim", "tops_pt", "tX_pt", "njets"] + elif args.hist_list == ["cr"]: + # Here we hardcode a list of hists used for the CRs + hist_lst = ["lj0pt", "ptz", "met", "ljptsum", "l0pt", "l0eta", "l1pt", "l1eta", "j0pt", "j0eta", "njets", "nbtagsl", "invmass"] + else: + # We want to specify a custom list + # If we don't specify this argument, it will be None, and the processor will fill all hists + hist_lst = args.hist_list + + + ### Load samples from json + samplesdict = {} + allInputFiles = [] + + def LoadJsonToSampleName(jsonFile, prefix): + sampleName = jsonFile if not '/' in jsonFile else jsonFile[jsonFile.rfind('/')+1:] + if sampleName.endswith('.json'): sampleName = sampleName[:-5] + with open(jsonFile) as jf: + samplesdict[sampleName] = json.load(jf) + samplesdict[sampleName]['redirector'] = prefix + + if isinstance(jsonFiles, str) and ',' in jsonFiles: + jsonFiles = jsonFiles.replace(' ', '').split(',') + elif isinstance(jsonFiles, str): + jsonFiles = [jsonFiles] + for jsonFile in jsonFiles: + if os.path.isdir(jsonFile): + if not jsonFile.endswith('/'): jsonFile+='/' + for f in os.path.listdir(jsonFile): + if f.endswith('.json'): allInputFiles.append(jsonFile+f) + else: + allInputFiles.append(jsonFile) + + # Read from cfg files + for f in allInputFiles: + if not os.path.isfile(f): + raise Exception(f'[ERROR] Input file {f} not found!') + # This input file is a json file, not a cfg + if f.endswith('.json'): + LoadJsonToSampleName(f, prefix) + # Open cfg files + else: + with open(f) as fin: + print(' >> Reading json from cfg file...') + lines = fin.readlines() + for l in lines: + if '#' in l: + l=l[:l.find('#')] + l = l.replace(' ', '').replace('\n', '') + if l == '': continue + if ',' in l: + l = l.split(',') + for nl in l: + if not os.path.isfile(l): + prefix = nl + else: + LoadJsonToSampleName(nl, prefix) + else: + if not os.path.isfile(l): + prefix = l + else: + LoadJsonToSampleName(l, prefix) + + flist = {} + nevts_total = 0 + for sname in samplesdict.keys(): + redirector = samplesdict[sname]['redirector'] + flist[sname] = [(redirector+f) for f in samplesdict[sname]['files']] + samplesdict[sname]['year'] = samplesdict[sname]['year'] + samplesdict[sname]['xsec'] = float(samplesdict[sname]['xsec']) + samplesdict[sname]['nEvents'] = int(samplesdict[sname]['nEvents']) + nevts_total += samplesdict[sname]['nEvents'] + samplesdict[sname]['nGenEvents'] = int(samplesdict[sname]['nGenEvents']) + samplesdict[sname]['nSumOfWeights'] = float(samplesdict[sname]['nSumOfWeights']) + if not samplesdict[sname]["isData"]: + for wgt_var in WGT_VAR_LST: + # Check that MC samples have all needed weight sums (only needed if doing systs) + if do_systs: + if (wgt_var not in samplesdict[sname]): + raise Exception(f"Missing weight variation \"{wgt_var}\".") + else: + samplesdict[sname][wgt_var] = float(samplesdict[sname][wgt_var]) + # Print file info + print('>> '+sname) + print(' - isData? : %s' %('YES' if samplesdict[sname]['isData'] else 'NO')) + print(' - year : %s' %samplesdict[sname]['year']) + print(' - xsec : %f' %samplesdict[sname]['xsec']) + print(' - histAxisName : %s' %samplesdict[sname]['histAxisName']) + print(' - options : %s' %samplesdict[sname]['options']) + print(' - tree : %s' %samplesdict[sname]['treeName']) + print(' - nEvents : %i' %samplesdict[sname]['nEvents']) + print(' - nGenEvents : %i' %samplesdict[sname]['nGenEvents']) + print(' - SumWeights : %i' %samplesdict[sname]['nSumOfWeights']) + if not samplesdict[sname]["isData"]: + for wgt_var in WGT_VAR_LST: + if wgt_var in samplesdict[sname]: + print(f' - {wgt_var}: {samplesdict[sname][wgt_var]}') + print(' - Prefix : %s' %samplesdict[sname]['redirector']) + print(' - nFiles : %i' %len(samplesdict[sname]['files'])) + for fname in samplesdict[sname]['files']: print(' %s'%fname) + + if pretend: + print('pretending...') + exit() + + # Extract the list of all WCs, as long as we haven't already specified one. + if len(wc_lst) == 0: + for k in samplesdict.keys(): + for wc in samplesdict[k]['WCnames']: + if wc not in wc_lst: + wc_lst.append(wc) + + if len(wc_lst) > 0: + # Yes, why not have the output be in correct English? + if len(wc_lst) == 1: + wc_print = wc_lst[0] + elif len(wc_lst) == 2: + wc_print = wc_lst[0] + ' and ' + wc_lst[1] + else: + wc_print = ', '.join(wc_lst[:-1]) + ', and ' + wc_lst[-1] + print('Wilson Coefficients: {}.'.format(wc_print)) + else: + print('No Wilson coefficients specified') + + processor_instance = gen_processor.AnalysisProcessor(samplesdict,wc_lst,hist_lst,ecut_threshold,do_errors,do_systs,split_lep_flavor,skip_sr,skip_cr) + + if executor == "work_queue": + executor_args = { + 'master_name': '{}-workqueue-coffea'.format(os.environ['USER']), + + # find a port to run work queue in this range: + 'port': port, + + 'debug_log': 'debug.log', + 'transactions_log': 'tr.log', + 'stats_log': 'stats.log', + 'tasks_accum_log': 'tasks.log', + + 'environment_file': remote_environment.get_environment( + extra_pip_local = {"topeft": ["topeft", "setup.py"]}, + ), + 'filepath': f'/project01/ndcms/{os.environ["USER"]}', + 'extra_input_files': ["gen_photon_processor.py"], + + 'retries': 5, + + # use mid-range compression for chunks results. 9 is the default for work + # queue in coffea. Valid values are 0 (minimum compression, less memory + # usage) to 16 (maximum compression, more memory usage). + 'compression': 9, + + # automatically find an adequate resource allocation for tasks. + # tasks are first tried using the maximum resources seen of previously ran + # tasks. on resource exhaustion, they are retried with the maximum resource + # values, if specified below. if a maximum is not specified, the task waits + # forever until a larger worker connects. + 'resource_monitor': True, + 'resources_mode': 'auto', + + # this resource values may be omitted when using + # resources_mode: 'auto', but they do make the initial portion + # of a workflow run a little bit faster. + # Rather than using whole workers in the exploratory mode of + # resources_mode: auto, tasks are forever limited to a maximum + # of 8GB of mem and disk. + # + # NOTE: The very first tasks in the exploratory + # mode will use the values specified here, so workers need to be at least + # this large. If left unspecified, tasks will use whole workers in the + # exploratory mode. + # 'cores': 1, + # 'disk': 8000, #MB + # 'memory': 10000, #MB + + # control the size of accumulation tasks. Results are + # accumulated in groups of size chunks_per_accum, keeping at + # most chunks_per_accum at the same time in memory per task. + 'chunks_per_accum': 25, + 'chunks_accum_in_mem': 2, + + # terminate workers on which tasks have been running longer than average. + # This is useful for temporary conditions on worker nodes where a task will + # be finish faster is ran in another worker. + # the time limit is computed by multipliying the average runtime of tasks + # by the value of 'fast_terminate_workers'. Since some tasks can be + # legitimately slow, no task can trigger the termination of workers twice. + # + # warning: small values (e.g. close to 1) may cause the workflow to misbehave, + # as most tasks will be terminated. + # + # Less than 1 disables it. + 'fast_terminate_workers': 0, + + # print messages when tasks are submitted, finished, etc., + # together with their resource allocation and usage. If a task + # fails, its standard output is also printed, so we can turn + # off print_stdout for all tasks. + 'verbose': True, + 'print_stdout': False, + } + + # Run the processor and get the output + tstart = time.time() + + if executor == "futures": + exec_instance = processor.futures_executor(workers=nworkers) + runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks) + elif executor == "work_queue": + executor = processor.WorkQueueExecutor(**executor_args) + runner = processor.Runner(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180) + + output = runner(flist, treename, processor_instance) + + dt = time.time() - tstart + + if executor == "work_queue": + print('Processed {} events in {} seconds ({:.2f} evts/sec).'.format(nevts_total,dt,nevts_total/dt)) + + #nbins = sum(sum(arr.size for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist)) + #nfilled = sum(sum(np.sum(arr > 0) for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist)) + #print("Filled %.0f bins, nonzero bins: %1.1f %%" % (nbins, 100*nfilled/nbins,)) + + if executor == "futures": + print("Processing time: %1.2f s with %i workers (%.2f s cpu overall)" % (dt, nworkers, dt*nworkers, )) + + # Save the output + if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath) + out_pkl_file = os.path.join(outpath,outname+".pkl.gz") + print(f"\nSaving output in {out_pkl_file}...") + with gzip.open(out_pkl_file, "wb") as fout: + cloudpickle.dump(output, fout) + print("Done!") + + # Run the data driven estimation, save the output + if do_np: + print("\nDoing the nonprompt estimation...") + out_pkl_file_name_np = os.path.join(outpath,outname+"_np.pkl.gz") + ddp = DataDrivenProducer(out_pkl_file,out_pkl_file_name_np) + print(f"Saving output in {out_pkl_file_name_np}...") + ddp.dumpToPickle() + print("Done!") + # Run the renorm fact envelope calculation + if do_renormfact_envelope: + print("\nDoing the renorm. fact. envelope calculation...") + dict_of_histos = utils.get_hist_from_pkl(out_pkl_file_name_np,allow_empty=False) + dict_of_histos_after_applying_envelope = get_renormfact_envelope(dict_of_histos) + utils.dump_to_pkl(out_pkl_file_name_np,dict_of_histos_after_applying_envelope)