Source code for hax.treemakers.peak_treemakers

"""Tree makers for studying peaks on their own

Treemakers used for analyses such as the single-electron shape in time
and stability.
"""
import hax
from hax.minitrees import MultipleRowExtractor
import numpy as np
from pax import units


[docs]class PeakExtractor(MultipleRowExtractor): """Base class for reading peak data in minitrees. For more information, check out example 10 in hax/examples. """ # Default branch selection is EVERYTHING in peaks, overwrite for speed increase # Don't forget to include branches used in cuts extra_branches = ['peaks.*'] peak_fields = ['area'] event_cut_list = [] peak_cut_list = [] event_cut_string = 'True' peak_cut_string = 'True' stop_after = np.inf # Hacks for want of string support :'( peaktypes = dict(lone_hit=0, s1=1, s2=2, unknown=3) detectors = dict(tpc=0, veto=1, sum_wv=2, busy_on=3, busy_off=4) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.event_cut_string = self.build_cut_string( self.event_cut_list, 'event') self.peak_cut_string = self.build_cut_string( self.peak_cut_list, 'peak')
[docs] def build_cut_string(self, cut_list, obj): ''' Build a string of cuts that can be applied using eval() function. ''' # If no cut is specified, always pass cut if len(cut_list) == 0: return 'True' # Check if user entered range_50p_area, since this won't work cut_list = [ cut.replace( 'range_50p_area', 'range_area_decile[5]') for cut in cut_list] cut_string = '(' for cut in cut_list[:-1]: cut_string += obj + '.' + cut + ') & (' cut_string += obj + '.' + cut_list[-1] + ')' return cut_string
[docs] def extract_data(self, event): if event.event_number == self.stop_after: raise hax.paxroot.StopEventLoop() peak_data = [] # Check if event passes cut if eval(self.build_cut_string(self.event_cut_list, 'event')): # Loop over peaks and check if peak passes cut for peak in event.peaks: if eval(self.peak_cut_string): # Loop over properties and add them to _current_peak one by one _current_peak = {} for field in self.peak_fields: # Deal with special cases if field == 'range_50p_area': _x = list(peak.range_area_decile)[5] elif field == 'rise_time': _x = -peak.area_decile_from_midpoint[1] elif field in ('x', 'y'): # In case of x and y need to get position from reconstructed_positions for rp in peak.reconstructed_positions: if rp.algorithm == 'PosRecTopPatternFit': _x = getattr(rp, field) break else: _x = float('nan') # Change field name! field = field + '_peak' elif field == 'type': _x = self.peaktypes.get(peak.type, -1) elif field == 'detector': _x = self.detectors.get(peak.detector, -1) else: _x = getattr(peak, field) _current_peak[field] = _x # All properties added, now finish this peak # The event number is necessary to join to event properties _current_peak['event_number'] = event.event_number peak_data.append(_current_peak) return peak_data else: # If event does not pass cut return empty list return []
[docs]class IsolatedPeaks(MultipleRowExtractor): # pylint: disable=unused-variable """Returns one row per peak isolated in time Specifically returns properties of each individual peak. """ __version__ = '0.1.2' extra_branches = ['peaks.left', 'peaks.right', 'peaks.n_hits', 'peaks.n_contributing_channels', 'peaks.reconstructed_positions*', 'peaks.area_decile_from_midpoint*'] nhits_bounds = (0, float('inf')) width_bounds = (0, float('inf'))
[docs] def extract_data(self, event): results = [] for peak, time_to_nearest_peak in self.yield_peak(event, self.nhits_bounds, self.width_bounds): result = dict({x: getattr(peak, x) for x in ['area', 'area_fraction_top', 'n_hits']}) result['time_to_nearest_peak'] = time_to_nearest_peak result['range_50p_area'] = peak.range_area_decile[5] result['n_contributing_channels'] = peak.n_contributing_channels result['rise_time'] = - peak.area_decile_from_midpoint[1] result['range_90p_area'] = peak.range_area_decile[9] for rp in peak.reconstructed_positions: if rp.algorithm == 'PosRecTopPatternFit': result['x'] = rp.x result['y'] = rp.y result['xy_gof'] = rp.goodness_of_fit break results.append(result) return results
[docs] @staticmethod def yield_peak(event, nhits_bounds, width_bounds): """Extracts a row per peak The peak type can be single electron and have some selection. This is a generator, and yields (peak, time_to_nearest). """ # Get all non-lone-hit peaks in the TPC peaks = [p for p in event.peaks if p.detector == 'tpc' and not p.type == 'lone_hit'] peaks = sorted(peaks, key=lambda p: p.left) if not len(peaks): return [] # For each of these, find the smallest gap lefts, rights = np.array([(p.left, p.right) for p in peaks]).T * 10 * units.ns gap_on_left = np.concatenate(([0], lefts[1:] - rights[:-1])) gap_on_right = np.concatenate((lefts[1:] - rights[:-1], [0])) smallest_gap = np.clip(gap_on_left, 0, gap_on_right) for i, peak in enumerate(peaks): time_to_nearest_peak = smallest_gap[i] if time_to_nearest_peak < 10 * units.us: continue if not (nhits_bounds[0] <= peak.n_hits < nhits_bounds[1]): continue width = peak.range_area_decile[5] if not (width_bounds[0] <= width < width_bounds[1]): continue yield peak, time_to_nearest_peak
[docs]class SingleElectrons(IsolatedPeaks): nhits_bounds = (15, 26.01) # 26 is in width_bounds = (50, 450)