본문 바로가기
대학원 생활

HEP03 Reconstruct Z boson using Coffea #2 Processor

by Miracle_Morning 2021. 12. 10.

 

 

이전 버전에서 Coffea framework을 이용해서 NanoAOD를 analysis 하는 포스팅을 올렸다. Z boson mass를 reconstruct 하는 예제였다. 아래 링크를 참고하기 바란다. 

https://jwcorp.tistory.com/2?category=1011393 

 

HEP02 Reconstruct Z boson using Coffea #1 Basic

Coffea framework 와 CMS 의 NanoAOD type DY process 샘플을 가지고 Electron pair 을 selection 하여 Z boson mass peak 을 reconstruction 하는 튜토리얼이다. [Jupyter notebook 링크] 와 함께 이 글을 보기..

jwcorp.tistory.com

 

이번에는 Coffea 의 Processor라는 강력한 기능을 이용해서, Job을 병렬화시키는 방법을 소개하겠다. Analysis 내용은 이전 포스팅과 똑같으니 중복 설명은 하지 않겠다.

 

먼저 필요한 라이브러리들을 import 한다. 

 

import awkward as ak
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea import processor, hist
import numpy as np
import matplotlib.pyplot as plt
from coffea import lumi_tools
import glob
from coffea.util import load, save

 

1. Processor class

Processor 는 class 형태로 이전 포스팅에 작성했던 코드를 변형시켜줘야 한다. Processor의 목적은 여러 개의 인풋 파일을 프로세싱하여 히스토그램에 차곡차곡 쌓는 것이다. 생성자에서 히스토그램을 정의한다. bin size xmin xmax 가 여기서 결정이 되므로 신중하게 결정해야 한다. 여기서 Coffea의 장점과 단점이 보인다. output 이 direct로 histogram으로 나온다는 점, analysis의 성격에 따라 단점이 될 수도 있고 장점일 수도 있겠다. 

 

# ---> Class JW Processor
class JW_Processor(processor.ProcessorABC):

    # -- Initializer
    def __init__(self, year, sample_name):
        
        
        self._year = year
        # Trigger set
        self._doubleelectron_triggers = {
            "2018": [
                "Ele23_Ele12_CaloIdL_TrackIdL_IsoVL",  # Recomended
            ]
        }

        # hist set
        self._accumulator = processor.dict_accumulator(
            {
                "sumw": processor.defaultdict_accumulator(float),


                # -- Kinematics -- #
                "mass": hist.Hist(
                    "Events",
                    hist.Cat("dataset", "Dataset"),
                    hist.Bin("mass", "$m_{e+e-}$ [GeV]", 100, 0, 200),
                ),
            }
        )

    # -- Accumulator: accumulate histograms
    @property
    def accumulator(self):
        return self._accumulator

    # -- Main function : Process events
    def process(self, events):

        # Initialize accumulator
        out = self.accumulator.identity()
        dataset = sample_name

        # Data or MC
        isData = "genWeight" not in events.fields
        
        # Golden Json file
        if (self._year == "2018") and isData:
            injson = "/x5/cms/jwkim/gitdir/JWCorp/JW_analysis/Coffea_WZG/Corrections/Cert_314472-325175_13TeV_Legacy2018_Collisions18_JSON.txt.RunABD"

        # << Sort by PT  helper function >>
        def sort_by_pt(ele,pho,jet):
            ele = ele[ak.argsort(ele.pt,ascending=False,axis=1)]
            pho = pho[ak.argsort(pho.pt,ascending=False,axis=1)]
            jet = jet[ak.argsort(jet.pt,ascending=False,axis=1)]

            return ele,pho,jet
        events.Electron, events.Photon, events.Jet = sort_by_pt(
            events.Electron, events.Photon, events.Jet
        )

        # Golden Json file
        if (year == "2018") and isData:
            injson = "data/json/Cert_314472-325175_13TeV_Legacy2018_Collisions18_JSON.txt.RunABD"

        if isData:
            lumi_mask_builder = lumi_tools.LumiMask(injson)
            lumimask = ak.Array(
                lumi_mask_builder.__call__(events.run, events.luminosityBlock)
            )
            events = events[lumimask]


        # double lepton trigger
        is_double_ele_trigger=True
        if not is_double_ele_trigger:
            double_ele_triggers_arr=np.ones(len(events), dtype=np.bool)
        else:
            double_ele_triggers_arr = np.zeros(len(events), dtype=np.bool)
            for path in self._doubleelectron_triggers[year]:
                if path not in events.HLT.fields: continue
                double_ele_triggers_arr = double_ele_triggers_arr | events.HLT[path]


        # Sort particle order by PT  # RunD --> has problem
        events.Electron,events.Photon,events.Jet = sort_by_pt(events.Electron,events.Photon,events.Jet)

        # Apply trigger
        Initial_events = events
        events = events[double_ele_triggers_arr]
        print("Events passing triggers and skiimig: ",len(events) )
        # Set Particle
        Electron = events.Electron
        ##----------- Cut flow2: Electron Selection

        EleSelmask = (
            (Electron.pt >= 10)
            & (np.abs(Electron.eta + Electron.deltaEtaSC) < 1.479)
            & (Electron.cutBased > 2)
            & (abs(Electron.dxy) < 0.05)
            & (abs(Electron.dz) < 0.1)
        ) | (
            (Electron.pt >= 10)
            & (np.abs(Electron.eta + Electron.deltaEtaSC) > 1.479)
            & (np.abs(Electron.eta + Electron.deltaEtaSC) <= 2.5)
            & (Electron.cutBased > 2)
            & (abs(Electron.dxy) < 0.1)
            & (abs(Electron.dz) < 0.2)
        )

        Electron = Electron[EleSelmask]

        # apply cut 2
        Diele_mask = ak.num(Electron) == 2
        Electron = Electron[Diele_mask]
        events = events[Diele_mask]

        diele = Electron[:,0] + Electron[:,1]

        # OSSF
        ossf_mask = (diele.charge == 0)

        # Z mass window
        Mee_cut_mask = diele.mass > 10

        # Electron PT cuts
        Elept_mask = (Electron[:,0].pt >= 25) & (Electron[:,1].pt >= 10)

        # Mask
        Event_sel_mask = Elept_mask & ossf_mask & Mee_cut_mask

        diele = diele[Event_sel_mask]
        Electron = Electron[Event_sel_mask]
        events = events[Event_sel_mask]

        # Fill hist
        Mee_arr = diele.mass
        out["sumw"][dataset] += len(Initial_events)
        out["mass"].fill(
            dataset=dataset,mass=Mee_arr
        )
        return out

    # -- Finally! return accumulator
    def postprocess(self, accumulator):
        return accumulator

 

2. Executer

 

Processor 를 본격적으로 run 하는 executor 코드이다. file 이름들을 리스트 형태로 준비해주고, 샘플 (DrellYan signletop.. ) 등이 여러 개일 수 있으니 {샘플 이름 : 파일리스트}의 딕셔너리 형태로 정의한다.  workers에 사용할 수 있는 cpu 개수를 넣어준다. 이러면 무료 자동으로! 병렬화시켜서 코드를 실행시켜준다. 연구실에는 멀티스레딩 포함하여 노드당 고성능 cpu가 48개 있어서, condor job 필요 없이, crab job 필요 없이 로컬에서 2018년도 수천만 개의 UL 샘플을 전부 처리할 수 있는 powerful 한 성능을 보여준다. 

 

if __name__ == "__main__":
    
    file_list = glob.glob("../data/DY/*.root")
    year='2018'
    sample_name = 'DY'
    samples = {sample_name:file_list}
    # Class tio object
    JW_Processor_instance = JW_Processor(year, sample_name)
    
    
    ## -->Multi-node Executor
    result = processor.run_uproot_job(
        samples,  # dataset
        "Events",  # Tree name
        JW_Processor_instance,  # Class
        executor=processor.futures_executor,
        executor_args={"schema": NanoAODSchema, "workers": 4}, # number of CPU
    )    
    
    outname = sample_name + ".futures"
    save(result, outname)

 

3. Draw histogram

히스토그램 아웃풋들을 모아서 합치고, 하나의 플롯으로 이쁘게 만들어 주는 코드는 따로 작성하였다. 역시나 먼저 필요한 라이브러리들을 import 해준다.

 

import os
import numpy as np
from coffea.util import load, save
import matplotlib.pyplot as plt
import coffea.hist as hist
import time
import sys

 

불러올 히스토그램을 재정의 한다. 

 

hsum_Mee = hist.Hist(
	"Events",
	hist.Cat("dataset","Dataset"),
	hist.Bin("mass","Z mass",100,0,200)	
)

histdict = {'mass':hsum_Mee}

 

여러개의 히스토그램을 하나로 합쳐주는 reduce 함수를 만들었다.

 

def reduce(folder,sample_list,histname):
	hists={}
	

	for filename in os.listdir(folder):
        
		hin = load(folder + '/' + filename)
		hists[filename] = hin.copy()
		
		if filename.split('.')[0] not in sample_list:
			continue

		hist_ = histdict[histname]
		hist_.add(hists[filename][histname])
	return hist_
    
file_path = 'output'
sample_list = ['DY']
histname = 'mass'
h1  = reduce(file_path,sample_list,histname)

 

Scale 함수로 xsec으로 normalize 해준다

 

## --Noramlize
lumi_factor = 59.971
scales={
   'DY'      : lumi_factor * 1000  * 2137.0 / 1933600,
}
h1.scale(scales,axis='dataset')

#h1 = h1.rebin(histname,hist.Bin("mass","Z mass",100,50,150))

 

플롯 디자인을 해준다

 

import mplhep as hep

XMIN=0
XMAX=200
YMIN=0
YMAX=3000000


plt.style.use(hep.style.CMS)

plt.rcParams.update({
	'font.size': 14,
	'axes.titlesize': 18,
	'axes.labelsize': 18,
	'xtick.labelsize': 12,
	'ytick.labelsize': 12
})

fig, ax = plt.subplots(
	nrows=1,
	ncols=1,
	figsize=(10,10),
	sharex=True
)
    
from cycler import cycler
prop_cycle = plt.rcParams['axes.prop_cycle']
colors = prop_cycle.by_key()['color']

ax.set_prop_cycle(cycler(color=colors))

fill_opts = {
    'edgecolor': (0,0,0,0.3),
    'alpha': 0.8
}
error_opts = {
    'label': 'Stat. Unc.',
    'hatch': '///',
    'facecolor': 'none',
    'edgecolor': (0,0,0,.5),
    'linewidth': 0
}

hist.plot1d(
    h1['DY'],
    ax=ax,
    clear=False,
    #stack=True,
    fill_opts=fill_opts,
    error_opts = error_opts,
)

ax._get_lines.prop_cycler = ax._get_patches_for_fill.prop_cycler
ax.autoscale(axis="x", tight=True)
ax.set_ylim(YMIN, YMAX)
ax.set_xlim(XMIN, XMAX)

lum = plt.text(
    1.0,
    1.0,
    r"%.2f fb$^{-1}$ (13 TeV)" % (lumi_factor),
    fontsize=16,
    horizontalalignment="right",
    verticalalignment="bottom",
    transform=ax.transAxes,
)

완성이다. Coffea 장점은 말했듯이 매우 빠른 속도이고, 단점은 Ntuple 생성이 불가능해서 기존 ROOT 기반의 툴을 쓰는 사람들과 협업을 하기 곤란하다. 아직 발전 중인 프레임워크이고, 지금도 충분히 Template fit method를 위한 template generation이나 간단한 analysis 혹은 Signal region cut optimization 등에는 쓰일 수 있을 것 같다. 나는 메인으로 coffea를 활용하다 협업 중에 기술적인 문제점이 있어서 결국 포기하고 ROOT 기반의 NanoAOD tool로 갈아탔지만.. 언젠간 Coffea 가 충분히 발전이 됐을 때 메인 툴로서 입지를 가질 수 있다고 생각한다. 

 

모든 코드는 아래에서 확인가능하다.

https://github.com/JW-corp/J.W_Analysis/tree/main/coffea_tuto/N02_Processor