"""Make small flat root trees with one entry per event from the pax root files.
"""
from datetime import datetime
from distutils.version import LooseVersion
from glob import glob
import inspect
import logging
import os
import numpy as np
import pandas as pd
import hax
from hax import runs, cuts
from .paxroot import loop_over_dataset, function_results_datasets
from .utils import find_file_in_folders, get_user_id
from .minitree_formats import get_format
from pax.datastructure import INT_NAN
log = logging.getLogger('hax.minitrees')
# update_treemakers() will update this to contain all treemakers included
# with hax
TREEMAKERS = {}
[docs]class TreeMaker(object):
"""Treemaker base class.
If you're seeing this as the documentation of an actual TreeMaker, somebody forgot to add documentation
for their treemaker.
A treemaker loops the extract_data function over events. This function returns a dictionary.
Since dictionaries take a lot of memory, we periodically convert them into pandas dataframes
(interval with which this occurs is controlled by the cache_size attribute).
At the end of data extraction, the various dataframes are concatenated.
You must instantiate a new treemaker for every extraction.
"""
cache_size = 5000
branch_selection = None # List of branches to load during iteration over events
# If the above is empty, load basic branches (set in hax.config) plus these.
extra_branches = tuple()
# Set to True if your treemaker returns array values. This will trigger a
# different root file saving code.
uses_arrays = False
# Set this to true if the treemaker results do not change with the pax
# version (e.g. for trigger information)
pax_version_independent = False
# Set this to True to prevent this minitree from being saved to file. Use e.g. if it computes its values quickly
# computed from other minitrees' info.
never_store = False
# Want to store something more in the metadata for this tree? Put here
extra_metadata = {}
# Flag if loading MC
mc_data = False
def __init__(self):
# Support for string arguments
if isinstance(self.branch_selection, str):
self.branch_selection = [self.branch_selection]
if isinstance(self.extra_branches, str):
self.extra_branches = [self.extra_branches]
if not self.branch_selection:
self.branch_selection = hax.config['basic_branches'] + list(self.extra_branches)
if 'event_number' not in self.branch_selection:
self.branch_selection += ['event_number']
self.cache = []
self.data = []
[docs] def process_event(self, event):
result = self.extract_data(event)
if not isinstance(result, dict):
raise ValueError("TreeMakers must always extract dictionary")
# Add the run and event number to the result. This is required to make joins succeed later on.
result['event_number'] = event.event_number
result['run_number'] = self.run_number
self.cache.append(result)
self.check_cache()
[docs] def get_data(self, dataset, event_list=None):
"""Return data extracted from running over dataset"""
self.mc_data = runs.is_mc(dataset)[0]
self.run_name = runs.get_run_name(dataset)
self.run_number = runs.get_run_number(dataset)
self.run_start = runs.get_run_start(dataset)
loop_over_dataset(dataset, self.process_event,
event_lists=event_list,
branch_selection=self.branch_selection,
desc='Making %s minitree' % self.__class__.__name__)
self.check_cache(force_empty=True)
if not len(self.data):
log.warning("Not a single row was extracted from dataset %s!" % dataset)
return pd.DataFrame([], columns=['event_number', 'run_number'])
else:
hax.log.debug("Extraction completed, now concatenating data")
return pd.concat(self.data, ignore_index=True)
[docs] def check_cache(self, force_empty=False):
if not len(self.cache) or (len(self.cache) < self.cache_size and not force_empty):
return
self.data.append(pd.DataFrame(self.cache))
self.cache = []
[docs]def update_treemakers():
"""Update the list of treemakers hax knows. Called on hax init, you should never have to call this yourself!"""
global TREEMAKERS
TREEMAKERS = {}
for module_filename in glob(os.path.join(hax.hax_dir + '/treemakers/*.py')):
module_name = os.path.splitext(os.path.basename(module_filename))[0]
if module_name.startswith('_'):
continue
# Import the module, after which we can do hax.treemakers.blah
__import__('hax.treemakers.%s' % module_name, globals=globals())
# Now get all the treemakers defined in the module
for tm_name, tm in inspect.getmembers(
getattr(
hax.treemakers, module_name), lambda x: isinstance(
x, type) and issubclass(
x, TreeMaker)):
if tm_name == 'TreeMaker':
# This one is the base class; we get it because we did from ...
# import TreeMaker at the top of the file
continue
if tm_name in TREEMAKERS:
raise ValueError("Two treemakers named %s!" % tm_name)
TREEMAKERS[tm_name] = tm
def _minitree_filename(run_name, treemaker_name, extension):
return "%s_%s.%s" % (run_name, treemaker_name, extension)
[docs]def check(run_id, treemaker, force_reload=False):
"""Return if the minitree exists and where it is found / where to make it.
:param treemaker: treemaker name or class
:param run_id: run name or number
:param force_reload: ignore available minitrees, just tell me where to write the new one.
:returns: (treemaker, available, path).
- treemaker_class: class of the treemaker you named.
- already_made is True if there is an up-to-date minitree we can load, False otherwise (always if force_reload)
- path is the path to the minitree to load if it is available, otherwise path where we should create the minitree.
"""
run_name = runs.get_run_name(run_id)
treemaker_name, treemaker = get_treemaker_name_and_class(treemaker)
preferred_format = hax.config['preferred_minitree_format']
# If we need to remake the minitree, where would we place it?
minitree_filename = _minitree_filename(
run_name, treemaker_name, preferred_format)
creation_dir = hax.config['minitree_paths'][0]
if not os.path.exists(creation_dir):
os.makedirs(creation_dir)
path_to_new = os.path.join(creation_dir, minitree_filename)
# Value to return if the minitree is not available
sorry_not_available = treemaker, False, path_to_new
if force_reload:
return sorry_not_available
# Find the file
try:
minitree_path = find_file_in_folders(
minitree_filename, hax.config['minitree_paths'])
except FileNotFoundError:
# Maybe it exists, but was made in a non-preferred file format
log.debug("Minitree %s not found" % minitree_filename)
for mt_format in hax.config['other_minitree_formats']:
if mt_format == preferred_format:
# Already tried this format
continue
else:
try:
minitree_filename = _minitree_filename(run_name, treemaker_name, mt_format)
minitree_path = find_file_in_folders(minitree_filename, hax.config['minitree_paths'])
log.debug("Minitree found in non-preferred format: %s" % minitree_filename)
break
except FileNotFoundError:
log.debug("Not found in non-preferred formats either. Minitree will be created.")
pass
else:
# Not found in any format
return sorry_not_available
log.debug("Found minitree at %s" % minitree_path)
# Load the metadata ONLY, to see if we can load this file
minitree_metadata = get_format(minitree_path).load_metadata()
# Check if the minitree has an outdated treemaker version
if LooseVersion(minitree_metadata['version']) < treemaker.__version__:
log.debug(
"Minitreefile %s is outdated (version %s, treemaker is version %s), will be recreated" %
(minitree_path, minitree_metadata['version'], treemaker.__version__))
return sorry_not_available
# Check for incompatible hax version (e.g. event_number and run_number
# columns not yet included in each minitree)
if (LooseVersion(minitree_metadata.get('hax_version', '0.0')) < hax.config['minimum_minitree_hax_version']):
log.debug("Minitreefile %s is from an incompatible hax version and must be recreated" % minitree_path)
return sorry_not_available
# Check if pax_version agrees with the version policy.
version_policy = hax.config['pax_version_policy']
if treemaker.pax_version_independent:
return treemaker, True, minitree_path
elif version_policy == 'latest':
# What the latest pax version is differs per dataset. We'll open the root file to find out
# (you may think we can use the runs db info instead, but that won't work on e.g. MC root files)
try:
pax_metadata = hax.paxroot.get_metadata(run_name)
except FileNotFoundError:
log.warning(
"Minitree %s was found, but the main data root file was not. "
"Your version policy is 'latest', but I can't check whether you really have the latest... "
"I'll load the cached minitree and assume you know what you are doing." % minitree_path)
else:
if ('pax_version' not in minitree_metadata or
LooseVersion(minitree_metadata['pax_version']) <
LooseVersion(pax_metadata['file_builder_version'])):
log.debug(
"Minitreefile %s is from an outdated pax version (pax %s, %s available), "
"will be recreated." %
(minitree_path,
minitree_metadata.get(
'pax_version',
'not known'),
pax_metadata['file_builder_version']))
return sorry_not_available
elif version_policy == 'loose':
# Anything goes
pass
else:
if not hax.runs.version_is_consistent_with_policy(
minitree_metadata.get('pax_version', 'unknown')):
log.debug(
"Minitree found from pax version %s, but you required pax version %s. "
"Will attempt to create it from the main root file." %
(minitree_metadata['pax_version'], version_policy))
return sorry_not_available
return treemaker, True, minitree_path
[docs]def load_single_minitree(run_id,
treemaker,
force_reload=False,
return_metadata=False,
save_file=None,
event_list=None):
"""Return pandas DataFrame resulting from running treemaker on run_id (name or number)
:param run_id: name or number of the run to load
:param treemaker: TreeMaker class or class name (but not TreeMaker instance!) to run
:param force_reload: always remake the minitree, never load it from disk.
:param return_metadata: instead return (metadata_dict, dataframe)
:param save_file: save the results to a minitree file on disk.
:param event_list: List of event numbers to visit. Forces save_file=False, force_reload=True.
:returns: pandas.DataFrame
"""
if save_file is None:
save_file = hax.config['minitree_caching']
if event_list is not None:
save_file = False
force_reload = True
treemaker, already_made, minitree_path = check(
run_id, treemaker, force_reload=force_reload)
if already_made:
return get_format(minitree_path).load_data()
if not hax.config['make_minitrees'] and not treemaker.never_store:
# The user didn't want me to make a new minitree :-(
raise NoMinitreeAvailable(
"Minitree %s:%s not created since make_minitrees is False." %
(run_id, treemaker.__name__))
# We have to make the minitree file
# This will raise FileNotFoundError if the root file is not found
skimmed_data = treemaker().get_data(run_id, event_list=event_list)
log.debug(
"Retrieved %s minitree data for dataset %s" %
(treemaker.__name__, run_id))
try:
pax_root_version = hax.paxroot.get_metadata(run_id)['file_builder_version']
except ValueError:
# The ROOT file is missing
# Unless this is a never_store minitree, the creation will fail anyway
pax_root_version = 'unknown'
metadata_dict = dict(
version=treemaker.__version__,
extra=treemaker.extra_metadata,
pax_version=pax_root_version,
hax_version=hax.__version__,
created_by=get_user_id(),
event_list=event_list,
documentation=treemaker.__doc__,
timestamp=str(
datetime.now()))
if save_file and not treemaker.never_store:
get_format(minitree_path, treemaker).save_data(metadata_dict, skimmed_data)
if return_metadata:
return metadata_dict, skimmed_data
return skimmed_data
[docs]def load_single_dataset(run_id, treemakers, preselection=None, force_reload=False, event_list=None,
bypass_blinding=False):
"""Run multiple treemakers on a single run
:returns: (pandas DataFrame, list of dicts describing cut histories)
:param run_id: name or number of the run to load
:param treemakers: list of treemaker classes / names to load
:param preselection: String or list of strings passed to pandas.eval. Should return bool array, to be used
for pre-selecting events to load for each dataset.
If string does not contain spaces, should be lax lichen name.
If string contains a colon and no spaces, should be lichen_file:lichen_name
:param force_reload: always remake the minitrees, never load any from disk.
:param event_list: List of event numbers to visit. Disables load from / save to file.
:bypass_blinding: Flag to disable blinding cut. WARNING: analysts should not use this, only for production! See #211
"""
if isinstance(treemakers, (type, str)):
treemakers = [treemakers]
if isinstance(preselection, str):
preselection = [preselection]
if preselection is None:
preselection = []
dataframes = []
for treemaker in treemakers:
try:
dataset_frame = load_single_minitree(
run_id, treemaker, force_reload=force_reload, event_list=event_list)
except NoMinitreeAvailable as e:
log.debug(str(e))
return pd.DataFrame([], columns=['event_number', 'run_number']), []
dataframes.append(dataset_frame)
# Merge mini-trees of all types by inner join
# (propagating "cuts" applied by skipping rows in MultipleRowExtractor)
if not len(dataframes):
raise RuntimeError("No data was extracted? What's going on??")
result = dataframes[0]
for i in range(1, len(dataframes)):
result = _merge_minitrees(result, dataframes[i])
# Apply the unblinding selection if required.
# Normally this is already done by minitrees.load, but perhaps someone calls
# load_single_dataset_directly.
if (not bypass_blinding and
hax.unblinding.unblinding_selection not in preselection and
('Corrections' in treemakers or
hax.treemakers.corrections.Corrections in treemakers) and
hax.unblinding.is_blind(run_id)):
preselection = [hax.unblinding.unblinding_selection] + preselection
# Apply pre-selection cuts before moving on to the next dataset
for ps in preselection:
if ' ' not in ps and ps[0].isupper():
# Lichen name / list of lichens
if ':' in ps:
# List of lichens separated by colon
lichen_file, lichen_name = ps.split(':')
result = cuts.apply_lichen(result, lichen_name, lichen_file=lichen_file, quiet=True)
else:
# Single lichen name
result = cuts.apply_lichen(result, ps, quiet=True)
else:
result = cuts.eval_selection(result, ps, quiet=True)
return result, cuts._get_history(result)
def _merge_minitrees(mt1, mt2):
"""Returns merger of minitree dataframes mt1 and mt2, which have the same """
# To avoid creation of duplicate columns (which will get _x and _y suffixes),
# look which column names already exist and do not include them in the
# merge
cols_to_use = ['run_number', 'event_number'] + \
mt2.columns.difference(mt1.columns).tolist()
return pd.merge(
mt2[cols_to_use], mt1, on=[
'run_number', 'event_number'], how='inner')
[docs]def load(datasets=None,
treemakers='all',
preselection=None,
force_reload=False,
delayed=False,
num_workers=1,
compute_options=None,
cache_file=None,
remake_cache=False,
event_list=None):
"""Return pandas DataFrame with minitrees of several datasets and treemakers.
:param datasets: names or numbers of datasets (without .root) to load
:param treemakers: treemaker class (or string with name of class) or list of these to load. If value
is set to 'all' then the standard science run minitrees are loaded.
:param preselection: string or list of strings parseable by pd.eval. Should return bool array, to be used
for pre-selecting events to load for each dataset.
:param force_reload: if True, will force mini-trees to be re-made whether they are outdated or not.
:param delayed: Instead of computing a pandas DataFrame, return a dask DataFrame (default False)
:param num_workers: Number of dask workers to use in computation (if delayed=False)
:param compute_options: Dictionary of extra options passed to dask.compute
:param cache_file: Save/load the result to an hdf5 file with filename specified by cahce_file.
Useful if you load in a large volume of data with many preselections.
:param remake_cache: If True, and cache file given, reload (don't remake) minitrees and overwrite the cache file.
:param event_list: List of events to process (warning: only makes sense for single dataset)
"""
# Import dask only here, it causes problems on some systems (batch queues etc)
# Also dask is heavily under development... FIXME
import dask
import dask.multiprocessing
import dask.dataframe
if cache_file and not remake_cache and os.path.exists(cache_file):
# We don't have to do anything and can just load from the cache file
return load_cache_file(cache_file)
if datasets is None:
raise ValueError("If you're not loading from a cache file, "
"specify at least some datasets to load")
if isinstance(preselection, str):
preselection = [preselection]
if preselection is None:
preselection = []
# Handle possible treemaker options
if treemakers == 'all':
# Fetched from inputs_from_bbf/reduce_data.py, 28/Feb/2018
treemakers = ['Corrections', 'Basics', 'Fundamentals', 'Proximity',
'Extended', 'TotalProperties', 'TailCut', 'PositionReconstruction',
'LargestPeakProperties', 'FlashIdentification']
elif isinstance(treemakers, (type, str)):
treemakers = [treemakers]
if isinstance(datasets, (str, int, np.int64, np.int, np.int32)):
datasets = [datasets]
if compute_options is None:
compute_options = {}
compute_options.setdefault('get', dask.multiprocessing.get)
# If the blinding cut is required for any of the datasets, apply it to all of them.
# This avoids crashing or paradoxical cut histories.
if (hax.unblinding.unblinding_selection not in preselection and (
'Corrections' in treemakers or hax.treemakers.corrections.Corrections in treemakers)):
is_blind = [hax.unblinding.is_blind(run_id) for run_id in datasets]
if any(is_blind):
if not all(is_blind):
log.warning(
"You're mixing blind and unblind datasets. "
"The blinding cut will be applied to all data you're loading.")
preselection = [hax.unblinding.unblinding_selection] + preselection
# Compute an example dataframe, so we can tell dask which columns to expect
if isinstance(datasets, pd.Series):
first_dset = datasets.iloc[0]
else:
first_dset = datasets[0]
df_example, _ = load_single_dataset(
first_dset,
treemakers, preselection, force_reload=force_reload, event_list=event_list)
partial_results = []
partial_histories = []
for dataset in datasets:
mashup = dask.delayed(load_single_dataset)(
dataset, treemakers, preselection, force_reload=force_reload, event_list=event_list)
partial_results.append(dask.delayed(lambda x: force_df_types(x[0], df_example))(mashup))
partial_histories.append(dask.delayed(lambda x: x[1])(mashup))
result = dask.dataframe.from_delayed(partial_results, meta=df_example)
if not delayed:
# Dask doesn't seem to want to descend into the lists beyond the first.
# So we mash things into one list before calling compute, then split it
# again
mashedup_result = dask.compute(*([result] + partial_histories),
num_workers=num_workers, **compute_options)
result = mashedup_result[0]
if 'index' in result.columns:
# Clean up index, remove 'index' column
# Probably we're doing something weird with pandas, this doesn't
# seem like the well-trodden path...
log.debug("Removing weird index column")
result.drop('index', axis=1, inplace=True)
result = result.reset_index()
result.drop('index', axis=1, inplace=True)
# Combine the histories of partial results.
# For unavailable minitrees, the histories will be empty: filter these
# empty histories out
partial_histories = mashedup_result[1:]
partial_histories = [x for x in partial_histories if len(x)]
if len(partial_histories):
cuts.record_combined_histories(result, partial_histories)
else:
# Magic for tracking of cut histories while using dask.dataframe here...
pass
if cache_file:
save_cache_file(result, cache_file)
return result
[docs]def function_over_events(function, dataframe, branch_selection=None, **kwargs):
"""Generator which yields `function(event, **kwargs)` of each processed data event in dataframe
"""
for run_number, events in pd.groupby(dataframe, 'run_number'):
yield from function_results_datasets(run_number,
function,
events.event_number.values,
branch_selection=branch_selection,
kwargs=kwargs)
[docs]def extend(data, treemakers):
"""Extends the dataframe data by loading treemakers for the remaining events
See https://github.com/XENON1T/hax/pull/52 for more information.
:param data: dataframe, assumed to be event-per-row
:param treemakers: list of treemakers to load
"""
new_minitrees = []
for run_number, events in pd.groupby(data, 'run_number'):
new_minitrees.append(load_single_dataset(run_number, treemakers, event_list=events.event_number.values)[0])
result = _merge_minitrees(data, pd.concat(new_minitrees))
result.cut_history = data.cut_history
return result
[docs]def load_cache_file(cache_file):
"""Load minitree dataframe + cut history from a cache file"""
store = pd.HDFStore(cache_file)
result = store['data']
result.cut_history = store.get_storer('data').attrs.cut_history
store.close()
return result
[docs]def save_cache_file(data, cache_file, **kwargs):
"""Save minitree dataframe + cut history to a cache file
Any kwargs will be passed to pandas HDFStore. Defaults are:
complib='blosc'
complevel=9
"""
kwargs.setdefault('complib', 'blosc')
kwargs.setdefault('complevel', 9)
dirname = os.path.dirname(cache_file)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
store = pd.HDFStore(cache_file, **kwargs)
store.put('data', data)
# Store the cuts history for the data
store.get_storer('data').attrs.cut_history = cuts._get_history(data)
store.close()
[docs]def get_treemaker_name_and_class(tm):
"""Return (name, class) of treemaker name or class tm"""
if isinstance(tm, str):
if tm not in TREEMAKERS:
raise ValueError("No TreeMaker named %s known to hax!" % tm)
tm_name, tm_class = tm, TREEMAKERS[tm]
elif isinstance(tm, type) and issubclass(tm, TreeMaker):
tm_name, tm_class = tm.__name__, tm
else:
raise ValueError("%s is not a TreeMaker child class or name, but a %s" % (tm, type(tm)))
if not hasattr(tm_class, '__version__'):
raise AttributeError("Please add a __version__ attribute to treemaker %s." % tm_name)
return tm_name, tm_class
[docs]class NoMinitreeAvailable(Exception):
pass
[docs]def force_df_types(df_content, df_types):
"""Return dataframe with same columns and dtypes as df_types,
with content from df_content.
- Extra columns are dropped.
- Missing columns are set to NaN (for floats) or INT_NAN (for integers).
Columns that are neither int or float are set to zero (e.g. '' for strings).
- Columns with different types are converted using numpy's astype.
When converting floats to ints, all nonfinite values are replaced with INT_NAN
"""
if len(df_content) == 0:
# Easy case: return empty dataframe of same type as df_types
return df_types.iloc[:0]
new_df = dict()
for col, dtype in df_types.dtypes.items():
if col in df_content:
x = df_content[col].values.copy()
if np.issubdtype(dtype, np.integer):
# If the content has NaN or infinite values, these will cause an error with astype
weird = ~np.isfinite(x)
x[weird] = INT_NAN
new_df[col] = x.astype(dtype)
else:
# Create column with NaNs
n = len(df_content)
if np.issubdtype(dtype, np.integer):
new_df[col] = np.ones(n) * INT_NAN
elif np.issubdtype(dtype, np.float):
new_df[col] = np.ones(n) * float('nan')
else:
# Fallback: fill with zeros
new_df[col] = np.zeros(len(df_content), dtype=dtype)
return pd.DataFrame(new_df)