Source code for hax.data_extractor

"""Extract peak or hit info from processed root file
"""
import warnings

import numpy as np
import hax


[docs]def root_to_numpy(base_object, field_name, attributes): """Convert objects stored in base_object.field_name to numpy array Will query attributes for each of the objects in base_object.field_name No, root_numpy does not do this for you, that's for trees... """ objects_to_convert = getattr(base_object, field_name) if not len(objects_to_convert): return None return np.array([tuple([getattr(p, pa) for pa in attributes]) for p in objects_to_convert])
[docs]def build_cut_string(cut_list, obj): ''' Build a string of cuts that can be applied using eval() function. ''' # If no cut is specified, always pass cut if len(cut_list) == 0: return 'True' # Check if user entered range_50p_area, since this won't work for cut in cut_list: if cut[:14] == 'range_50p_area': raise ValueError('You cannot use range_50p_area in your cut, use range_area_decile[5] instead!') cut_string = '(' for cut in cut_list[:-1]: cut_string += obj + '.' + cut + ') & (' cut_string += obj + '.' + cut_list[-1] + ')' return cut_string
[docs]def make_branch_selection(level, event_fields, peak_fields, added_branches): """Make the list of branches that have to be selected. """ branch_selection_events = event_fields branch_selection_peaks = ['peaks.' + field for field in peak_fields] # For hits, just select the whole hit branch. # Unfortunately, specifically setting one variable does not seem to work. branch_selection_hits = ['peaks.hits*'] if level == 'hit': branch_selection = branch_selection_events + branch_selection_peaks + branch_selection_hits + added_branches if level == 'peak': branch_selection = branch_selection_events + branch_selection_peaks + added_branches # Hack to enable range_50p_area branch_selection = [field.replace('peaks.range_50p_area', 'peaks.range_area_decile*') for field in branch_selection] return branch_selection
[docs]def make_named_array(array, field_names): """Make a named array from a numpy array. """ import pandas as pd df = pd.DataFrame(array, columns=field_names) array = df.to_records() return array
[docs]class DataExtractor(): """This class is meant for extracting properties that are *not* on the event level, such as peak or hit properties. For more information, check the docs of DataExtractor.get_data(). """ def __init__(self): # Initialize empty data list warnings.warn( "DataExtractor is deprecated, please switch to multi-row minitrees instead.", DeprecationWarning) self.data = []
[docs] def loop_body(self, event): """Function that extracts data from each event and adds array with that data to the data list. """ # Check if event passes event cut if eval(self.event_cut_string): event_entry = np.array([getattr(event, field) for field in self.event_fields]) for peak in event.peaks: # Check if peak passes the cut if eval(self.peak_cut_string): # Get peak information, which is stored in _temp_data _temp_data = [] for field in self.peak_fields: if field == 'range_50p_area': _x = list(peak.range_area_decile)[5] elif field in ('x', 'y'): # In case of x and y need to get position from reconstructed_positions for rp in peak.reconstructed_positions: if rp.algorithm == 'PosRecTopPatternFit': _x = getattr(rp, field) break else: _x = float('nan') elif field[-1] == ']': # This means that the parameter is a list element. # We need a slightly different approach parsed_field = field.split(sep='[') field_list_name = parsed_field[0] field_number = int(parsed_field[1][:-1]) _list = getattr(peak, field_list_name) _x = list(_list)[field_number] else: # Default case for 'normal' peak variable _x = getattr(peak, field) _temp_data.append(_x) peak_entry = np.array(_temp_data) if self.level == 'hit': # Extract hit info. Numpy array with n_hit_channel entries x number of hit properties hit_entry = root_to_numpy( peak, 'hits', self.hit_fields) if hit_entry is None: # If there is no hit data: crash loudly raise ValueError("Unable to read hit info. " "Is it in the root file? (Try: peak_cuts = ['type == 's1''])") # Append all properties. First event properties, then # peak, then hits entry = np.c_[np.zeros((len(hit_entry), len(event_entry) + len(peak_entry))), hit_entry] entry[:, 0:len(event_entry)] = event_entry entry[:, len(event_entry):( len(peak_entry) + len(event_entry))] = peak_entry elif self.level == 'peak': # Extra brackets needed for concatenation in the end, else we'll get flat array entry = np.c_[[event_entry], [peak_entry]] else: # We should actually never reach this since the checking has been done before raise ValueError( "Enter either 'peak' of 'hit' for level!") self.data.append(entry) # Check if user-defined event number limit is reached if event.event_number >= self.stop_after: print("User-defined limit of %d events reached, stopping..." % self.stop_after) raise hax.paxroot.StopEventLoop return None
[docs] def get_data(self, dataset, level='peak', event_fields=['event_number'], peak_fields=['area', 'hit_time_std'], hit_fields=[], event_cuts=[], peak_cuts=[], stop_after=np.inf, added_branches=[]): """Extract peak or hit data from a dataset. Peak or hit can be toggled by specifying level = 'peak' or level = 'hit'. Example useage: d = DataExtractor.get_data(dataset=run_name,level='peak',event_fields = ['event_number'], peak_fields=['area'],event_cuts=['event_number > 5', 'event_number < 10'], peak_cuts=['area > 100', 'type = "s1"'],stop_after=10000,added_branches= ['peak.type']) """ # Sanity checking if (level != 'peak') and (level != 'hit'): raise SyntaxError("Enter either 'peak' of 'hit' for level!") if (hit_fields != []) and (level == 'peak'): print("Warning: You set hit properties, but your input will be ignored since you specified peak level!") branch_selection = make_branch_selection(level, event_fields, peak_fields, added_branches) self.event_cut_string = build_cut_string(event_cuts, 'event') self.peak_cut_string = build_cut_string(peak_cuts, 'peak') self.event_fields = event_fields self.peak_fields = peak_fields self.hit_fields = hit_fields self.stop_after = stop_after self.level = level hax.paxroot.loop_over_dataset(dataset, self.loop_body, branch_selection=branch_selection) # Now reshape data # list of arrays -> one array -> named array self.data = np.concatenate(self.data) # Build list of strings with field names. if level == 'hit': # For the hit level, we have to be careful. For example, 'area' can be peak or hit area. # How to solve this? Well, just add hit_ or peak_ before the property field_names = (event_fields + ['peak_' + field for field in peak_fields] + ['hit_' + field for field in hit_fields]) if level == 'peak': # For peak level no such problem exists (yet) so just keep normal names field_names = event_fields + peak_fields self.data = make_named_array(self.data, field_names) return self.data