Skip to content

Commit

Permalink
Merge pull request #990 from CoffeaTeam/lumi_list_reduction
Browse files Browse the repository at this point in the history
feat!: dask-based LumiList and LumiData
  • Loading branch information
lgray authored Jan 14, 2024
2 parents 4b9147f + d989a2f commit 0bf35d3
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 45 deletions.
231 changes: 188 additions & 43 deletions src/coffea/lumi_tools/lumi_tools.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
import json

import awkward as ak
import dask_awkward as dak
from functools import partial

import awkward
import dask.delayed
import dask_awkward
import numba
import numpy
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
from dask_awkward.layers import AwkwardTreeReductionLayer
from dask_awkward.lib.core import new_array_object
from numba import types
from numba.typed import Dict

from ..util import numba
from ..util import numpy as np

def wrap_get_lumi(runlumis, lumi_index):
runlumis_or_lz = awkward.typetracer.length_zero_if_typetracer(runlumis).to_numpy()
wrap_tot_lumi = numpy.zeros((1,))
LumiData._get_lumi_kernel(
runlumis_or_lz[:, 0], runlumis_or_lz[:, 1], lumi_index, wrap_tot_lumi
)
out = awkward.Array(wrap_tot_lumi)
if awkward.backend(runlumis) == "typetracer":
out = awkward.Array(out.layout.to_typetracer(forget_length=True))
return out


class LumiData:
Expand All @@ -28,11 +45,12 @@ class LumiData:
If you are using a LumiData file containing avg. inst. luminosity, make sure to set is_inst_lumi=True in the constructor of this class.
"""

# 2^18 orbits / 40 MHz machine clock / 3564 bunch positions
seconds_per_lumi_LHC = 2**18 / (40079000 / 3564)

def __init__(self, lumi_csv, is_inst_lumi=False):
self._is_inst_lumi = is_inst_lumi
self._lumidata = np.loadtxt(
self._lumidata = numpy.loadtxt(
lumi_csv,
delimiter=",",
usecols=(0, 1, 6, 7),
Expand All @@ -43,6 +61,8 @@ def __init__(self, lumi_csv, is_inst_lumi=False):
], # not sure what lumi:0 means, appears to be always zero (DAQ off before beam dump?)
},
)
self.index = None
self.index_delayed = None

def get_lumi(self, runlumis):
"""Calculate integrated lumi
Expand All @@ -53,19 +73,40 @@ def get_lumi(self, runlumis):
A 2d numpy array of ``[[run,lumi], [run,lumi], ...]`` or `LumiList` object
of the lumiSections to integrate over.
"""
self.index = Dict.empty(
key_type=types.Tuple([types.uint32, types.uint32]), value_type=types.float64
)
runs = self._lumidata[:, 0].astype("u4")
lumis = self._lumidata[:, 1].astype("u4")
# fill self.index
LumiData._build_lumi_table_kernel(runs, lumis, self._lumidata, self.index)
if self.index is None:
self.index = Dict.empty(
key_type=types.Tuple([types.uint32, types.uint32]),
value_type=types.float64,
)
runs = self._lumidata[:, 0].astype("u4")
lumis = self._lumidata[:, 1].astype("u4")
# fill self.index
LumiData._build_lumi_table_kernel(runs, lumis, self._lumidata, self.index)
# delayed object cache
self.index_delayed = dask.delayed(self.index)

if isinstance(runlumis, LumiList):
runlumis = runlumis.array
tot_lumi = np.zeros((1,), dtype=np.dtype("float64"))
LumiData._get_lumi_kernel(runlumis[:, 0], runlumis[:, 1], self.index, tot_lumi)
return tot_lumi[0] * (self.seconds_per_lumi_LHC if self._is_inst_lumi else 1.0)
tot_lumi = numpy.zeros((1,), dtype=numpy.dtype("float64"))
if isinstance(runlumis, dask_awkward.Array):
lumi_meta = wrap_get_lumi(runlumis._meta, self.index)
lumi_per_partition = dask_awkward.map_partitions(
wrap_get_lumi,
runlumis,
self.index_delayed,
label="get_lumi",
meta=lumi_meta,
)
tot_lumi = awkward.sum(lumi_per_partition, keepdims=True)
else:
LumiData._get_lumi_kernel(
runlumis[:, 0], runlumis[:, 1], self.index, tot_lumi
)
return (
tot_lumi[0] * self.seconds_per_lumi_LHC
if self._is_inst_lumi
else tot_lumi[0]
)

@staticmethod
@numba.njit(parallel=False, fastmath=False)
Expand All @@ -80,8 +121,8 @@ def _build_lumi_table_kernel(runs, lumis, lumidata, index):
def _get_lumi_kernel(runs, lumis, index, tot_lumi):
ks_done = set()
for iev in range(len(runs)):
run = np.uint32(runs[iev])
lumi = np.uint32(lumis[iev])
run = numpy.uint32(runs[iev])
lumi = numpy.uint32(lumis[iev])
k = (run, lumi)
if k not in ks_done:
ks_done.add(k)
Expand All @@ -106,9 +147,9 @@ def __init__(self, jsonfile):
self._masks = {}

for run, lumilist in goldenjson.items():
mask = np.array(lumilist, dtype=np.uint32).flatten()
mask = numpy.array(lumilist, dtype=numpy.uint32).flatten()
mask[::2] -= 1
self._masks[np.uint32(run)] = mask
self._masks[numpy.uint32(run)] = mask

def __call__(self, runs, lumis):
"""Check if run and lumi are valid
Expand All @@ -134,20 +175,26 @@ def apply(runs, lumis):
_masks[k] = v

runs_orig = runs
if isinstance(runs, ak.highlevel.Array):
runs = ak.to_numpy(ak.typetracer.length_zero_if_typetracer(runs))
if isinstance(lumis, ak.highlevel.Array):
lumis = ak.to_numpy(ak.typetracer.length_zero_if_typetracer(lumis))
mask_out = np.zeros(dtype="bool", shape=runs.shape)
if isinstance(runs, awkward.highlevel.Array):
runs = awkward.to_numpy(
awkward.typetracer.length_zero_if_typetracer(runs)
)
if isinstance(lumis, awkward.highlevel.Array):
lumis = awkward.to_numpy(
awkward.typetracer.length_zero_if_typetracer(lumis)
)
mask_out = numpy.zeros(dtype="bool", shape=runs.shape)
LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out)
if isinstance(runs_orig, ak.Array):
mask_out = ak.Array(mask_out)
if ak.backend(runs_orig) == "typetracer":
mask_out = ak.Array(mask_out.layout.to_typetracer(forget_length=True))
if isinstance(runs_orig, awkward.Array):
mask_out = awkward.Array(mask_out)
if awkward.backend(runs_orig) == "typetracer":
mask_out = awkward.Array(
mask_out.layout.to_typetracer(forget_length=True)
)
return mask_out

if isinstance(runs, dak.Array):
return dak.map_partitions(apply, runs, lumis)
if isinstance(runs, dask_awkward.Array):
return dask_awkward.map_partitions(apply, runs, lumis)
else:
return apply(runs, lumis)

Expand All @@ -156,42 +203,140 @@ def apply(runs, lumis):
@numba.njit(parallel=False, fastmath=True)
def _apply_run_lumi_mask_kernel(masks, runs, lumis, mask_out):
for iev in numba.prange(len(runs)):
run = np.uint32(runs[iev])
lumi = np.uint32(lumis[iev])
run = numpy.uint32(runs[iev])
lumi = numpy.uint32(lumis[iev])

if run in masks:
lumimask = masks[run]
ind = np.searchsorted(lumimask, lumi)
if np.mod(ind, 2) == 1:
ind = numpy.searchsorted(lumimask, lumi)
if numpy.mod(ind, 2) == 1:
mask_out[iev] = 1


def _wrap_unique(array):
out = numpy.unique(awkward.typetracer.length_one_if_typetracer(array))

if awkward.backend(array) == "typetracer":
out = awkward.Array(
out.layout.to_typetracer(forget_length=True),
behavior=out.behavior,
attrs=out.attrs,
)
return out


def _lumilist_dak_unique(runs_and_lumis, split_every=8):
concat_fn = partial(awkward.concatenate, axis=0)

tree_node_fn = _wrap_unique
finalize_fn = _wrap_unique

label = "lumilist-unique"

token = tokenize(
runs_and_lumis,
numpy.unique,
label,
numpy.uint64,
split_every,
)

name_tree_node = f"{label}-tree-node-{token}"
name_finalize = f"{label}-finalize-{token}"

chunked = dask_awkward.map_partitions(
_wrap_unique, runs_and_lumis, label="lumilist-unique-chunked"
)

trl = AwkwardTreeReductionLayer(
name=name_finalize,
name_input=chunked.name,
npartitions_input=chunked.npartitions,
concat_func=concat_fn,
tree_node_func=tree_node_fn,
finalize_func=finalize_fn,
split_every=split_every,
tree_node_name=name_tree_node,
)

graph = HighLevelGraph.from_collections(name_finalize, trl, dependencies=(chunked,))

meta = _wrap_unique(runs_and_lumis._meta)

return new_array_object(graph, name_finalize, meta=meta, npartitions=1)


def _packed_unique(runs, lumis):
merged = (awkward.values_astype(runs, numpy.uint64) << 32) | awkward.values_astype(
lumis, numpy.uint64
)
uniques = _lumilist_dak_unique(merged)
return (uniques >> 32), (uniques & 0xFFFFFFFF)


class LumiList:
"""Mergeable list of unique (run, lumiSection) values
This list can be merged with another via ``+=``.
Parameters
----------
runs : numpy.ndarray
runs : numpy.ndarray, dask_awkward.Array
Vectorized list of run numbers
lumis : numpy.ndarray
lumis : numpy.ndarray, dask_awkward.Array
Vectorized list of lumiSection values
delayed_default: bool
"""

def __init__(self, runs=None, lumis=None):
self.array = np.zeros(shape=(0, 2))
if runs is not None:
self.array = np.unique(np.c_[runs, lumis], axis=0)
def __init__(self, runs=None, lumis=None, delayed_default=True):
if (runs is None) != (lumis is None):
raise ValueError(
"Both runs and lumis must be provided when given to the constructor of LumiList."
)

if delayed_default and runs is None:
raise ValueError(
"You must supply runs and lumis when using LumiList is delayed mode."
)

self.array = None
if not delayed_default:
self.array = numpy.zeros(shape=(0, 2))

if isinstance(runs, dask_awkward.Array) and isinstance(
lumis, dask_awkward.Array
):
unq_run, unq_lumi = _packed_unique(runs, lumis)
self.array = awkward.concatenate(
[unq_run[:, None], unq_lumi[:, None]], axis=1
)
else:
if runs is not None:
self.array = numpy.unique(numpy.c_[runs, lumis], axis=0)

def __iadd__(self, other):
# TODO: re-apply unique? Or wait until end
if isinstance(other, LumiList):
self.array = np.r_[self.array, other.array]
if isinstance(self.array, dask_awkward.Array):
carray = awkward.concatenate([self.array, other.array], axis=0)
unq_run, unq_lumi = _packed_unique(carray[:, 0], carray[:, 1])
self.array = awkward.concatenate(
[unq_run[:, None], unq_lumi[:, None]], axis=1
)
else:
self.array = numpy.r_[self.array, other.array]
else:
raise ValueError("Expected LumiList object, got %r" % other)
return self

def __add__(self, other):
temp = LumiList(runs=other.array[:, 0], lumis=other.array[:, 1])
temp += self
return temp

def clear(self):
"""Clear current lumi list"""
self.array = np.zeros(shape=(0, 2))
if isinstance(self.array, dask_awkward.Array):
raise RuntimeError("Delayed-mode LumiList cannot be cleared!")
self.array = numpy.zeros(shape=(0, 2))
46 changes: 44 additions & 2 deletions tests/test_lumi_tools.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import awkward as ak
import cloudpickle
import dask
import dask_awkward as dak
import numpy as np
from dask.distributed import Client

from coffea.lumi_tools import LumiData, LumiList, LumiMask
from coffea.util import numpy as np
from coffea.nanoevents import NanoEventsFactory


def test_lumidata():
Expand Down Expand Up @@ -114,7 +116,7 @@ def test_lumilist():

llist1 = LumiList(runs=runslumis1[:, 0], lumis=runslumis1[:, 1])
llist2 = LumiList(runs=runslumis2[:, 0], lumis=runslumis2[:, 1])
llist3 = LumiList()
llist3 = LumiList(delayed_default=False)

llist3 += llist1
llist3 += llist2
Expand All @@ -127,3 +129,43 @@ def test_lumilist():

llist1.clear()
assert llist1.array.size == 0


def test_lumilist_dask():
lumidata = LumiData("tests/samples/lumi_small.csv")

runslumis1 = np.zeros((10, 2), dtype=np.uint32)
runslumis1[:, 0] = lumidata._lumidata[0:10, 0]
runslumis1[:, 1] = lumidata._lumidata[0:10, 1]

runslumis2 = np.zeros((10, 2), dtype=np.uint32)
runslumis2[:, 0] = lumidata._lumidata[10:20, 0]
runslumis2[:, 1] = lumidata._lumidata[10:20, 1]

drunslumis1 = dak.from_awkward(ak.Array(runslumis1), 3)
drunslumis2 = dak.from_awkward(ak.Array(runslumis2), 3)

llist1 = LumiList(runs=drunslumis1[:, 0], lumis=drunslumis1[:, 1])
llist2 = LumiList(runs=drunslumis2[:, 0], lumis=drunslumis2[:, 1])
llist3 = llist1 + llist2

lumi1 = lumidata.get_lumi(llist1)
lumi2 = lumidata.get_lumi(llist2)
lumi3 = lumidata.get_lumi(llist3)

lumi1, lumi2, lumi3 = dask.compute(lumi1, lumi2, lumi3)

assert abs(lumi3 - (lumi1 + lumi2)) < 1e-4


def test_lumilist_client_fromfile():
with Client() as _:
events = NanoEventsFactory.from_root(
{"tests/samples/nano_dy.root": "Events"},
).events()

lumilist = LumiList(runs=events.run, lumis=events.luminosityBlock)

(result,) = dask.compute(lumilist.array)

assert result.to_list() == [[1, 13889]]

0 comments on commit 0bf35d3

Please sign in to comment.