Source code for hax.trigger_data

import pickle
import zipfile
import zlib
import os
from collections import defaultdict

import pytz
import numpy as np
import bson
import pandas as pd

from pax.configuration import load_configuration
from pax import datastructure, units
import hax

# Custom data types used (others are just np.int)
data_types = {
    'trigger_signals': datastructure.TriggerSignal.get_dtype(),
    'trigger_signals_histogram': np.float,
}
matrix_fields = ['trigger_signals_histogram', 'count_of_2pmt_coincidences']


[docs]def get_special_file_filename(basename, run_id, special_path_key=None): # First try the special path special_path = hax.config.get(special_path_key, None) filename = None if special_path: filename = special_path.format(run_number=hax.runs.get_run_number(run_id)) if not os.path.exists(filename): filename = None # If needed, look in the actual raw data folder(s) if filename is None: for raw_data_path in hax.config['raw_data_local_path']: filename = os.path.join(raw_data_path, hax.runs.get_run_name(run_id), basename) if os.path.exists(filename): break else: raise FileNotFoundError("Can't find %s for run %s" % (basename, run_id)) return filename
[docs]def get_trigger_data(run_id, select_data_types='all', format_version=2): """Return dictionary with the trigger data from run_id select_data_types can be 'all', a trigger data type name, or a list of trigger data type names. If you want to find out which data types exists, use 'all' and look at the keys of the dictionary. """ if isinstance(select_data_types, str) and select_data_types != 'all': select_data_types = [select_data_types] data = defaultdict(list) filename = get_special_file_filename('trigger_monitor_data.zip', run_id, 'trigger_data_special_path') f = zipfile.ZipFile(filename) for doc_name in f.namelist(): data_type, index = doc_name.split('=') if select_data_types != 'all': if data_type not in select_data_types: continue with f.open(doc_name) as doc_file: d = doc_file.read() d = zlib.decompress(d) d = bson.BSON.decode(d) if 'data' in d: if format_version >= 2: # Numpy arrays stored as lists d = np.array(d['data'], dtype=data_types.get(data_type, np.int)) else: # Numpy arrays stored as strings d = np.fromstring(d['data'], dtype=data_types.get(data_type, np.int)) data[data_type].append(d) # Flatten / post-process the data for k in data.keys(): if not len(data[k]): continue if isinstance(data[k][0], dict): # Dictionaries describing data data[k] = pd.DataFrame(data[k]) elif k == 'trigger_signals': data[k] = np.concatenate(data[k]) else: if k in matrix_fields: if format_version <= 1: data[k] = np.vstack(data[k]) # Arrays were flattened when they are converted to strings n = np.sqrt(data[k].shape[1]).astype('int') data[k] = data[k].reshape((-1, n, n)) else: # Not flattened, just need to stack data[k] = np.stack(data[k]) else: data[k] = np.vstack(data[k]) if select_data_types != 'all' and len(select_data_types) == 1: return data[select_data_types[0]] return data
[docs]def get_aqm_pulses(run_id): """Return a dictionary of acquisition monitor pulse times in the run run_id. keys are channel labels (e.g. muon_veto_trigger). Under the keys 'busy' and 'hev', you'll get the sorted combination of all busy/hev _on and _off signals. """ filename = get_special_file_filename('acquisition_monitor_data.pickles', run_id, 'acquisition_monitor_special_path') # Get the run start time in ns since the unix epoch. This isn't known with such accuracy, # but we use the exact same determination here as in the event builder. # Hence we can compare the times to the event times. start_datetime = hax.runs.get_run_info(run_id, 'start').replace(tzinfo=pytz.utc).timestamp() time_of_run_start = int(start_datetime * units.s) # Get the pax configuration pax_config = load_configuration(hax.config['experiment']) dt = pax_config['DEFAULT']['sample_duration'] # Make the (module, channel) -> Acquisition monitor channel label map pmt_map = pax_config['DEFAULT']['pmts'] aqm_channel = {(pmt_map[v[0]]['digitizer']['module'], pmt_map[v[0]]['digitizer']['channel']): k for k, v in pax_config['DEFAULT']['channels_in_detector'].items() if len(v) == 1} # Temp hack for the muon veto synchronization channel, which hasn't been added to pax (it would result in # incompatible raw data files) aqm_channel[(167, 6)] = 'mv_sync' aqm_signals = {k: [] for k in aqm_channel.values()} with open(filename, 'rb') as infile: while True: try: doc = pickle.load(infile) if not isinstance(doc, dict): # There's some random string at the start of the file for newer runs, skip it. continue aqm_signals[aqm_channel[doc['module'], doc['channel']]].append(doc['time'] * dt + time_of_run_start) except EOFError: break # Combine busy_on and hev_on signals for convenience for x in ('busy', 'hev'): aqm_signals[x] = np.sort(aqm_signals[x + '_on'] + aqm_signals[x + '_off']) return {k: np.array(v) for k, v in aqm_signals.items()}