
이전 버전에서 Coffea framework을 이용해서 NanoAOD를 analysis 하는 포스팅을 올렸다. Z boson mass를 reconstruct 하는 예제였다. 아래 링크를 참고하기 바란다.
https://jwcorp.tistory.com/2?category=1011393
HEP02 Reconstruct Z boson using Coffea #1 Basic
Coffea framework 와 CMS 의 NanoAOD type DY process 샘플을 가지고 Electron pair 을 selection 하여 Z boson mass peak 을 reconstruction 하는 튜토리얼이다. [Jupyter notebook 링크] 와 함께 이 글을 보기..
jwcorp.tistory.com
이번에는 Coffea 의 Processor라는 강력한 기능을 이용해서, Job을 병렬화시키는 방법을 소개하겠다. Analysis 내용은 이전 포스팅과 똑같으니 중복 설명은 하지 않겠다.
먼저 필요한 라이브러리들을 import 한다.
import awkward as ak
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea import processor, hist
import numpy as np
import matplotlib.pyplot as plt
from coffea import lumi_tools
import glob
from coffea.util import load, save
1. Processor class
Processor 는 class 형태로 이전 포스팅에 작성했던 코드를 변형시켜줘야 한다. Processor의 목적은 여러 개의 인풋 파일을 프로세싱하여 히스토그램에 차곡차곡 쌓는 것이다. 생성자에서 히스토그램을 정의한다. bin size xmin xmax 가 여기서 결정이 되므로 신중하게 결정해야 한다. 여기서 Coffea의 장점과 단점이 보인다. output 이 direct로 histogram으로 나온다는 점, analysis의 성격에 따라 단점이 될 수도 있고 장점일 수도 있겠다.
# ---> Class JW Processor
class JW_Processor(processor.ProcessorABC):
# -- Initializer
def __init__(self, year, sample_name):
self._year = year
# Trigger set
self._doubleelectron_triggers = {
"2018": [
"Ele23_Ele12_CaloIdL_TrackIdL_IsoVL", # Recomended
]
}
# hist set
self._accumulator = processor.dict_accumulator(
{
"sumw": processor.defaultdict_accumulator(float),
# -- Kinematics -- #
"mass": hist.Hist(
"Events",
hist.Cat("dataset", "Dataset"),
hist.Bin("mass", "$m_{e+e-}$ [GeV]", 100, 0, 200),
),
}
)
# -- Accumulator: accumulate histograms
@property
def accumulator(self):
return self._accumulator
# -- Main function : Process events
def process(self, events):
# Initialize accumulator
out = self.accumulator.identity()
dataset = sample_name
# Data or MC
isData = "genWeight" not in events.fields
# Golden Json file
if (self._year == "2018") and isData:
injson = "/x5/cms/jwkim/gitdir/JWCorp/JW_analysis/Coffea_WZG/Corrections/Cert_314472-325175_13TeV_Legacy2018_Collisions18_JSON.txt.RunABD"
# << Sort by PT helper function >>
def sort_by_pt(ele,pho,jet):
ele = ele[ak.argsort(ele.pt,ascending=False,axis=1)]
pho = pho[ak.argsort(pho.pt,ascending=False,axis=1)]
jet = jet[ak.argsort(jet.pt,ascending=False,axis=1)]
return ele,pho,jet
events.Electron, events.Photon, events.Jet = sort_by_pt(
events.Electron, events.Photon, events.Jet
)
# Golden Json file
if (year == "2018") and isData:
injson = "data/json/Cert_314472-325175_13TeV_Legacy2018_Collisions18_JSON.txt.RunABD"
if isData:
lumi_mask_builder = lumi_tools.LumiMask(injson)
lumimask = ak.Array(
lumi_mask_builder.__call__(events.run, events.luminosityBlock)
)
events = events[lumimask]
# double lepton trigger
is_double_ele_trigger=True
if not is_double_ele_trigger:
double_ele_triggers_arr=np.ones(len(events), dtype=np.bool)
else:
double_ele_triggers_arr = np.zeros(len(events), dtype=np.bool)
for path in self._doubleelectron_triggers[year]:
if path not in events.HLT.fields: continue
double_ele_triggers_arr = double_ele_triggers_arr | events.HLT[path]
# Sort particle order by PT # RunD --> has problem
events.Electron,events.Photon,events.Jet = sort_by_pt(events.Electron,events.Photon,events.Jet)
# Apply trigger
Initial_events = events
events = events[double_ele_triggers_arr]
print("Events passing triggers and skiimig: ",len(events) )
# Set Particle
Electron = events.Electron
##----------- Cut flow2: Electron Selection
EleSelmask = (
(Electron.pt >= 10)
& (np.abs(Electron.eta + Electron.deltaEtaSC) < 1.479)
& (Electron.cutBased > 2)
& (abs(Electron.dxy) < 0.05)
& (abs(Electron.dz) < 0.1)
) | (
(Electron.pt >= 10)
& (np.abs(Electron.eta + Electron.deltaEtaSC) > 1.479)
& (np.abs(Electron.eta + Electron.deltaEtaSC) <= 2.5)
& (Electron.cutBased > 2)
& (abs(Electron.dxy) < 0.1)
& (abs(Electron.dz) < 0.2)
)
Electron = Electron[EleSelmask]
# apply cut 2
Diele_mask = ak.num(Electron) == 2
Electron = Electron[Diele_mask]
events = events[Diele_mask]
diele = Electron[:,0] + Electron[:,1]
# OSSF
ossf_mask = (diele.charge == 0)
# Z mass window
Mee_cut_mask = diele.mass > 10
# Electron PT cuts
Elept_mask = (Electron[:,0].pt >= 25) & (Electron[:,1].pt >= 10)
# Mask
Event_sel_mask = Elept_mask & ossf_mask & Mee_cut_mask
diele = diele[Event_sel_mask]
Electron = Electron[Event_sel_mask]
events = events[Event_sel_mask]
# Fill hist
Mee_arr = diele.mass
out["sumw"][dataset] += len(Initial_events)
out["mass"].fill(
dataset=dataset,mass=Mee_arr
)
return out
# -- Finally! return accumulator
def postprocess(self, accumulator):
return accumulator
2. Executer
Processor 를 본격적으로 run 하는 executor 코드이다. file 이름들을 리스트 형태로 준비해주고, 샘플 (DrellYan signletop.. ) 등이 여러 개일 수 있으니 {샘플 이름 : 파일리스트}의 딕셔너리 형태로 정의한다. workers에 사용할 수 있는 cpu 개수를 넣어준다. 이러면 무료 자동으로! 병렬화시켜서 코드를 실행시켜준다. 연구실에는 멀티스레딩 포함하여 노드당 고성능 cpu가 48개 있어서, condor job 필요 없이, crab job 필요 없이 로컬에서 2018년도 수천만 개의 UL 샘플을 전부 처리할 수 있는 powerful 한 성능을 보여준다.
if __name__ == "__main__":
file_list = glob.glob("../data/DY/*.root")
year='2018'
sample_name = 'DY'
samples = {sample_name:file_list}
# Class tio object
JW_Processor_instance = JW_Processor(year, sample_name)
## -->Multi-node Executor
result = processor.run_uproot_job(
samples, # dataset
"Events", # Tree name
JW_Processor_instance, # Class
executor=processor.futures_executor,
executor_args={"schema": NanoAODSchema, "workers": 4}, # number of CPU
)
outname = sample_name + ".futures"
save(result, outname)
3. Draw histogram
히스토그램 아웃풋들을 모아서 합치고, 하나의 플롯으로 이쁘게 만들어 주는 코드는 따로 작성하였다. 역시나 먼저 필요한 라이브러리들을 import 해준다.
import os
import numpy as np
from coffea.util import load, save
import matplotlib.pyplot as plt
import coffea.hist as hist
import time
import sys
불러올 히스토그램을 재정의 한다.
hsum_Mee = hist.Hist(
"Events",
hist.Cat("dataset","Dataset"),
hist.Bin("mass","Z mass",100,0,200)
)
histdict = {'mass':hsum_Mee}
여러개의 히스토그램을 하나로 합쳐주는 reduce 함수를 만들었다.
def reduce(folder,sample_list,histname):
hists={}
for filename in os.listdir(folder):
hin = load(folder + '/' + filename)
hists[filename] = hin.copy()
if filename.split('.')[0] not in sample_list:
continue
hist_ = histdict[histname]
hist_.add(hists[filename][histname])
return hist_
file_path = 'output'
sample_list = ['DY']
histname = 'mass'
h1 = reduce(file_path,sample_list,histname)
Scale 함수로 xsec으로 normalize 해준다
## --Noramlize
lumi_factor = 59.971
scales={
'DY' : lumi_factor * 1000 * 2137.0 / 1933600,
}
h1.scale(scales,axis='dataset')
#h1 = h1.rebin(histname,hist.Bin("mass","Z mass",100,50,150))
플롯 디자인을 해준다
import mplhep as hep
XMIN=0
XMAX=200
YMIN=0
YMAX=3000000
plt.style.use(hep.style.CMS)
plt.rcParams.update({
'font.size': 14,
'axes.titlesize': 18,
'axes.labelsize': 18,
'xtick.labelsize': 12,
'ytick.labelsize': 12
})
fig, ax = plt.subplots(
nrows=1,
ncols=1,
figsize=(10,10),
sharex=True
)
from cycler import cycler
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']
ax.set_prop_cycle(cycler(color=colors))
fill_opts = {
'edgecolor': (0,0,0,0.3),
'alpha': 0.8
}
error_opts = {
'label': 'Stat. Unc.',
'hatch': '///',
'facecolor': 'none',
'edgecolor': (0,0,0,.5),
'linewidth': 0
}
hist.plot1d(
h1['DY'],
ax=ax,
clear=False,
#stack=True,
fill_opts=fill_opts,
error_opts = error_opts,
)
ax._get_lines.prop_cycler = ax._get_patches_for_fill.prop_cycler
ax.autoscale(axis="x", tight=True)
ax.set_ylim(YMIN, YMAX)
ax.set_xlim(XMIN, XMAX)
lum = plt.text(
1.0,
1.0,
r"%.2f fb$^{-1}$ (13 TeV)" % (lumi_factor),
fontsize=16,
horizontalalignment="right",
verticalalignment="bottom",
transform=ax.transAxes,
)

완성이다. Coffea 장점은 말했듯이 매우 빠른 속도이고, 단점은 Ntuple 생성이 불가능해서 기존 ROOT 기반의 툴을 쓰는 사람들과 협업을 하기 곤란하다. 아직 발전 중인 프레임워크이고, 지금도 충분히 Template fit method를 위한 template generation이나 간단한 analysis 혹은 Signal region cut optimization 등에는 쓰일 수 있을 것 같다. 나는 메인으로 coffea를 활용하다 협업 중에 기술적인 문제점이 있어서 결국 포기하고 ROOT 기반의 NanoAOD tool로 갈아탔지만.. 언젠간 Coffea 가 충분히 발전이 됐을 때 메인 툴로서 입지를 가질 수 있다고 생각한다.
모든 코드는 아래에서 확인가능하다.
https://github.com/JW-corp/J.W_Analysis/tree/main/coffea_tuto/N02_Processor
'대학원 생활' 카테고리의 다른 글
HEP04 columnar database in HEP (0) | 2022.03.31 |
---|---|
CMS01: 2016 pre-VFP vs post-VFP (0) | 2021.12.10 |
HEP02 Reconstruct Z boson using Coffea #1 Basic (1) | 2021.07.19 |
HEP01 Data cleaning using Coffea framework in CMS (1) | 2021.07.17 |