Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions analysis/topeft_run2/analysis_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def process(self, events):

# Extract the EFT quadratic coefficients and optionally use them to calculate the coefficients on the w**2 quartic function
# eft_coeffs is never Jagged so convert immediately to numpy for ease of use.
eft_coeffs = ak.to_numpy(events["EFTfitCoefficients"]) if hasattr(events, "EFTfitCoefficients") else None
eft_coeffs = ak.flatten(events["EFTfitCoefficients"]) if hasattr(events, "EFTfitCoefficients") else None
if eft_coeffs is not None:
# Check to see if the ordering of WCs for this sample matches what want
if self._samples[dataset]["WCnames"] != self._wc_names_lst:
Expand All @@ -246,7 +246,8 @@ def process(self, events):

################### Muon selection ####################

mu["pt"] = ApplyRochesterCorrections(year, mu, isData) # Need to apply corrections before doing muon selection
#TODO Enable when rnd(shape) is sorted out
#mu["pt"] = ApplyRochesterCorrections(year, mu, isData) # Need to apply corrections before doing muon selection
mu["isPres"] = te_os.isPresMuon(mu.dxy, mu.dz, mu.sip3d, mu.eta, mu.pt, mu.miniPFRelIso_all)
mu["isLooseM"] = te_os.isLooseMuon(mu.miniPFRelIso_all,mu.sip3d,mu.looseId)
mu["isFO"] = te_os.isFOMuon(mu.pt, mu.conept, mu.btagDeepFlavB, mu.mvaTTHUL, mu.jetRelIso, year)
Expand Down Expand Up @@ -305,12 +306,12 @@ def process(self, events):
# We only calculate these values if not isData
# Note: add() will generally modify up/down weights, so if these are needed for any reason after this point, we should instead pass copies to add()
# Note: Here we will to the weights object the SFs that do not depend on any of the forthcoming loops
weights_obj_base = coffea.analysis_tools.Weights(len(events),storeIndividual=True)
weights_obj_base = coffea.analysis_tools.Weights(None,storeIndividual=True)
if not isData:

# If this is no an eft sample, get the genWeight
if eft_coeffs is None: genw = events["genWeight"]
else: genw= np.ones_like(events["event"])
else: genw= ak.ones_like(events["event"])

# Normalize by (xsec/sow)*genw where genw is 1 for EFT samples
# Note that for theory systs, will need to multiply by sow/sow_wgtUP to get (xsec/sow_wgtUp)*genw and same for Down
Expand Down Expand Up @@ -344,15 +345,15 @@ def process(self, events):
for syst_var in syst_var_list:
# Make a copy of the base weights object, so that each time through the loop we do not double count systs
# In this loop over systs that impact kinematics, we will add to the weights objects the SFs that depend on the object kinematics
weights_obj_base_for_kinematic_syst = copy.deepcopy(weights_obj_base)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to verify this didn't change the results.

weights_obj_base_for_kinematic_syst = copy.copy(weights_obj_base)

#################### Jets ####################

# Jet cleaning, before any jet selection
#vetos_tocleanjets = ak.with_name( ak.concatenate([tau, l_fo], axis=1), "PtEtaPhiMCandidate")
vetos_tocleanjets = ak.with_name( l_fo, "PtEtaPhiMCandidate")
tmp = ak.cartesian([ak.local_index(jets.pt), vetos_tocleanjets.jetIdx], nested=True)
cleanedJets = jets[~ak.any(tmp.slot0 == tmp.slot1, axis=-1)] # this line should go before *any selection*, otherwise lep.jetIdx is not aligned with the jet index
tmp = ak.local_index(jets.pt) == vetos_tocleanjets.jetIdx
cleanedJets = jets[~ak.any(tmp, axis=-1)] # this line should go before *any selection*, otherwise lep.jetIdx is not aligned with the jet index

# Selecting jets and cleaning them
jetptname = "pt_nom" if hasattr(cleanedJets, "pt_nom") else "pt"
Expand All @@ -363,11 +364,12 @@ def process(self, events):
cleanedJets["mass_raw"] = (1 - cleanedJets.rawFactor)*cleanedJets.mass
cleanedJets["pt_gen"] =ak.values_astype(ak.fill_none(cleanedJets.matched_gen.pt, 0), np.float32)
cleanedJets["rho"] = ak.broadcast_arrays(events.fixedGridRhoFastjetAll, cleanedJets.pt)[0]
events_cache = events.caches[0]
cleanedJets = ApplyJetCorrections(year, corr_type='jets').build(cleanedJets, lazy_cache=events_cache)
#TODO implement or remove
#events_cache = events.caches[0]
cleanedJets = ApplyJetCorrections(year, corr_type='jets').build(cleanedJets)#, lazy_cache=events_cache)
# SYSTEMATICS
cleanedJets=ApplyJetSystematics(year,cleanedJets,syst_var)
met=ApplyJetCorrections(year, corr_type='met').build(met_raw, cleanedJets, lazy_cache=events_cache)
met=ApplyJetCorrections(year, corr_type='met').build(met_raw, cleanedJets)#, lazy_cache=events_cache)
cleanedJets["isGood"] = tc_os.is_tight_jet(getattr(cleanedJets, jetptname), cleanedJets.eta, cleanedJets.jetId, pt_cut=30., eta_cut=get_te_param("eta_j_cut"), id_cut=get_te_param("jet_id_cut"))
goodJets = cleanedJets[cleanedJets.isGood]

Expand Down Expand Up @@ -432,8 +434,12 @@ def process(self, events):

# Btag SF following 1a) in https://twiki.cern.ch/twiki/bin/viewauth/CMS/BTagSFMethods
isBtagJetsLooseNotMedium = (isBtagJetsLoose & isNotBtagJetsMedium)
bJetSF = [GetBTagSF(goodJets, year, 'LOOSE'),GetBTagSF(goodJets, year, 'MEDIUM')]
bJetEff = [GetBtagEff(goodJets, year, 'loose'),GetBtagEff(goodJets, year, 'medium')]
#TODO fix SFs pkl
#bJetSF = [GetBTagSF(goodJets, year, 'LOOSE'),GetBTagSF(goodJets, year, 'MEDIUM')]
bJetSF = [ak.ones_like(goodJets.pt),ak.ones_like(goodJets.pt)]
#TODO fix SFs pkl
#bJetEff = [GetBtagEff(goodJets, year, 'loose'),GetBtagEff(goodJets, year, 'medium')]
bJetEff = [ak.ones_like(goodJets.pt),ak.ones_like(goodJets.pt)]
bJetEff_data = [bJetEff[0]*bJetSF[0],bJetEff[1]*bJetSF[1]]
pMC = ak.prod(bJetEff[1] [isBtagJetsMedium], axis=-1) * ak.prod((bJetEff[0] [isBtagJetsLooseNotMedium] - bJetEff[1] [isBtagJetsLooseNotMedium]), axis=-1) * ak.prod((1-bJetEff[0] [isNotBtagJetsLoose]), axis=-1)
pMC = ak.where(pMC==0,1,pMC) # removeing zeroes from denominator...
Expand All @@ -452,7 +458,8 @@ def process(self, events):

# Trigger SFs
GetTriggerSF(year,events,l0,l1)
weights_obj_base_for_kinematic_syst.add(f"triggerSF_{year}", events.trigger_sf, copy.deepcopy(events.trigger_sfUp), copy.deepcopy(events.trigger_sfDown)) # In principle does not have to be in the lep cat loop
#TODO Enable after trigger SFs are fixed
#weights_obj_base_for_kinematic_syst.add(f"triggerSF_{year}", events.trigger_sf, copy.deepcopy(events.trigger_sfUp), copy.deepcopy(events.trigger_sfDown)) # In principle does not have to be in the lep cat loop


######### Event weights that do depend on the lep cat ###########
Expand All @@ -462,7 +469,9 @@ def process(self, events):
for ch_name in ["2l", "2l_4t", "3l", "4l", "2l_CR", "2l_CRflip", "3l_CR", "2los_CRtt", "2los_CRZ"]:

# For both data and MC
weights_dict[ch_name] = copy.deepcopy(weights_obj_base_for_kinematic_syst)
weights_dict[ch_name] = copy.copy(weights_obj_base_for_kinematic_syst)
#TODO Enable after fake rates are fixed
'''
if ch_name.startswith("2l"):
weights_dict[ch_name].add("FF", events.fakefactor_2l, copy.deepcopy(events.fakefactor_2l_up), copy.deepcopy(events.fakefactor_2l_down))
weights_dict[ch_name].add("FFpt", events.nom, copy.deepcopy(events.fakefactor_2l_pt1/events.fakefactor_2l), copy.deepcopy(events.fakefactor_2l_pt2/events.fakefactor_2l))
Expand All @@ -480,8 +489,11 @@ def process(self, events):
if isData:
if ch_name in ["2l","2l_4t","2l_CR","2l_CRflip"]:
weights_dict[ch_name].add("fliprate", events.flipfactor_2l)
'''

# For MC only
#TODO Enable after lep SFs are fixed
'''
if not isData:
if ch_name.startswith("2l"):
weights_dict[ch_name].add("lepSF_muon", events.sf_2l_muon, copy.deepcopy(events.sf_2l_hi_muon), copy.deepcopy(events.sf_2l_lo_muon))
Expand All @@ -494,6 +506,7 @@ def process(self, events):
weights_dict[ch_name].add("lepSF_elec", events.sf_4l_elec, copy.deepcopy(events.sf_4l_hi_elec), copy.deepcopy(events.sf_4l_lo_elec))
else:
raise Exception(f"Unknown channel name: {ch_name}")
'''


######### Masks we need for the selection ##########
Expand Down Expand Up @@ -635,7 +648,7 @@ def process(self, events):
ecut_mask = (ljptsum<self._ecut_threshold)

# Counts
counts = np.ones_like(events['event'])
counts = ak.ones_like(events['event'])

# Variables we will loop over when filling hists
varnames = {}
Expand Down
76 changes: 74 additions & 2 deletions analysis/topeft_run2/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@
import gzip
import os
import dask
from distributed import Client

from coffea import processor
from coffea.nanoevents import NanoAODSchema
from coffea.nanoevents import NanoAODSchema, NanoEventsFactory
from coffea.dataset_tools import preprocess
from coffea.dataset_tools import (
apply_to_fileset,
max_chunks,
preprocess,
)
#from coffea.dataset_tools import filter_files

import topcoffea.modules.utils as utils
import topcoffea.modules.remote_environment as remote_environment
Expand Down Expand Up @@ -317,11 +325,75 @@ def LoadJsonToSampleName(jsonFile, prefix):
'print_stdout': False,
}

# Get fileset
fileset = {}
for name, fpaths in flist.items():
fileset[name] = {}
fileset[name]["files"] = {}
for fpath in fpaths:
fileset[name]["files"][fpath] = {"object_path": "Events"}
fileset[name]["metadata"] = {"dataset": name}
print(f'{fileset=}')
print("Number of datasets:",len(fileset))

# Run the processor and get the output
tstart = time.time()

if executor == "futures":
output = dask.compute(processor_instance)
'''
dataset_runnable, dataset_updated = preprocess(
fileset,
align_clusters=False,
#step_size=100_000,
files_per_batch=1,
skip_bad_files=True,
#save_form=False,
)
to_compute = apply_to_fileset(
processor_instance,
max_chunks(dataset_runnable, 300),
#schemaclass=BaseSchema,
)
(out,) = dask.compute(to_compute)
'''
with Client(n_workers=8, threads_per_worker=1) as client:

# Run preprocess
print("\nRunning preprocess...")
dataset_runnable, dataset_updated = preprocess(
fileset,
#step_size=50_000, # missing in current version?
align_clusters=False,
files_per_batch=1,
#save_form=True,
)
#dataset_runnable = filter_files(dataset_runnable)

t_beforeapplytofileset = time.time()
# Run apply_to_fileset
print("\nRunning apply_to_fileset...")
histos_to_compute, reports = apply_to_fileset(
processor_instance,
dataset_runnable,
uproot_options={"allow_read_errors_with_report": True},
#parallelize_with_dask=True,
)

print("DONE with apply to fileset")

# Check columns to be read
#print("\nRunning necessary_columns...")
#columns_read = dak.necessary_columns(histos_to_compute[list(histos_to_compute.keys())[0]])
#print(columns_read)

t_beforecompute = time.time()
# Compute
print("\nRunning compute...")
output_futures, report_futures = {}, {}
for key in histos_to_compute:
output_futures[key], report_futures[key] = client.compute((histos_to_compute[key], reports[key],))

coutputs, creports = client.gather((output_futures, report_futures,))
elif executor == "work_queue":
executor = processor.WorkQueueExecutor(**executor_args)
runner = processor(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180)
Expand Down
Loading