Source code for hax.raw_data

"""Functions for working with raw data.
"""
import atexit
import itertools
import os
import tempfile

import pandas as pd
from urllib.request import HTTPSHandler, build_opener
import http.client

from pax import core

import hax


[docs]def inspect_events_from_minitree(events, *args, **kwargs): """Show the pax event display for events, where events is a (slice of) a dataframe loaded from a minitree Any additional arguments will be passed to inspect_events, see its docstring for details """ if isinstance(events, pd.Series): events = pd.DataFrame([events]) for dataset_number, evts in events.groupby('run_number'): event_numbers = evts.event_number.values inspect_events(dataset_number, event_numbers, *args, **kwargs)
[docs]def inspect_events(run_id, event_numbers, focus='all', save_to_dir=None, config_override=None): """Show the pax event display for the events in run_id, focus can be 'all' (default) which shows the entire event, 'largest', 'first', 'main_s1', or 'main_s2' """ # Config to let pax fo plotting config_dict = {'pax': {'output': ['Plotting.PlotEventSummary' if focus == 'all' else 'Plotting.PeakViewer'], 'encoder_plugin': None, 'pre_output': [], 'output_name': 'SCREEN' if save_to_dir is None else save_to_dir}} # You need to set block_view = True to make it NOT block the view # TODO: Fix this in pax if focus != 'all': config_dict['Plotting.PeakViewer'] = {'starting_peak': focus, 'block_view': True} else: config_dict['Plotting.PlotEventSummary'] = {'block_view': True} if config_override is not None: config_dict = hax.utils.combine_pax_configs(config_dict, config_override) # After we configure pax to do the plotting, we just have to iterate over the events and do nothing # Could do list(...) as well, but that would save all the events and return then in a big list at the end for _ in process_events(run_id, event_numbers, config_override=config_dict): pass
[docs]def inspect_peaks(run_id, event_numbers, peak_boundaries, save_to_dir=None, config_override=None): """Inspect the peaks starting at peak_boundaries (in samples... sorry) in event_numbers. Event numbers and peak_boundaries must be list/arrays of integers of the same length. """ config_dict = {'Plotting.PeakViewer': {'starting_peak_per_event': {k: v for k, v in zip(event_numbers, peak_boundaries)}}} if config_override is not None: config_dict = hax.utils.combine_pax_configs(config_dict, config_override) inspect_events(run_id, event_numbers, focus='_something_else_', save_to_dir=save_to_dir, config_override=config_dict)
[docs]def inspect_peaks_array(run_id, peak_array, save_to_dir=None, config_override=None): """Inspect peaks from a record array returned by hax.DataExtractor""" inspect_peaks(run_id, peak_array['event_number'], peak_array['left'])
[docs]def raw_events(run_id, event_numbers=None, config_override=None): """Yields raw event(s) numbered event_numbers from dataset numbered dataset_number config_override is a dictionary with extra pax options """ if config_override is None: config_override = {} # Combine the users config_override with the options necessary to get pax to spit out raw events config_override.setdefault('pax', {}) pax_config_dict = {'plugin_group_names': ['input', 'preprocessing', 'output'], 'preprocessing': ['CheckPulses.SortPulses', 'CheckPulses.ConcatenateAdjacentPulses', ], 'output': 'Dummy.DummyOutput', 'encoder_plugin': None} for k, v in pax_config_dict.items(): config_override['pax'].setdefault(k, v) for event in process_events(run_id, event_numbers, config_override): yield event
[docs]def process_events(run_id, event_numbers=None, config_override=None): """Yields processed event(s) numbered event_numbers from dataset run_id (name or number) config_override is a dictionary with extra pax options """ if config_override is None: config_override = {} if isinstance(event_numbers, int): # Support passing a single event number event_numbers = [event_numbers] config = hax.config # Get the dataset information run_name = hax.runs.get_run_name(run_id) run_number = hax.runs.get_run_number(run_id) dataset_info = hax.runs.datasets[hax.runs.datasets['name'] == run_name].iloc[0] # Set the events to process in config_override if event_numbers is not None: config_override.setdefault('pax', {}) config_override['pax'].setdefault('events_to_process', event_numbers) if config['raw_data_access_mode'] == 'local': # HURRAY HURRAY we have the raw data locally (either really or through sshfs) # We can let pax deal with jumping from file to file, selecting events, etc. if not dataset_info.raw_data_found: raise ValueError("Raw data for dataset %d (%s) not found." % (run_number, run_name)) dirname = os.path.join(dataset_info.raw_data_used_local_path, dataset_info.raw_data_subfolder, run_name) mypax = raw_data_processor(dirname, config_override) for event in mypax.get_events(): yield mypax.process_event(event) elif config['raw_data_access_mode'] == 'grid': # OH NO we have to get the raw data from GRID (pam pam pam pompadam pompadam) # We only know how to access single files from grid, so we need to predict the file name foreach event, # switch files manually and all sorts of other fun stuff. global temporary_data_files # If event_numbers wasn't specified, just iterate over events until we crash / user had enough if event_numbers is None: event_numbers = itertools.count() if hax.config['experiment'] != 'XENON100': raise ValueError("Can't get raw data from GRID for %s!" % hax.config['experiment']) currently_open_file_name = None for event_number in event_numbers: # Which XED file does this event belong to? data_file_name = 'xe100_%06d_%04d_%06d.xed' % (int(run_number / 1e4), run_number % 1e4, int(event_number / 1e3)) if data_file_name != currently_open_file_name: # Has the required file already been downloaded in this session? Then return its location. cache_key = (run_number, data_file_name) if cache_key in temporary_data_files: path_to_file = temporary_data_files[cache_key] else: # We have to download a new file file_path_tail = os.path.join(dataset_info.raw_data_subfolder, run_name, data_file_name) path_to_file = download_from_grid(file_path_tail) temporary_data_files[cache_key] = path_to_file currently_open_file_name = os.path.basename(path_to_file) # Start a new pax to process the events from this file mypax = raw_data_processor(path_to_file, config_override) event = mypax.get_single_event(event_number) yield mypax.process_event(event) else: raise ValueError("Unknown raw data access mode %s, must be local or grid." % config['raw_data_access_mode'])
[docs]def raw_data_processor(input_file_or_directory, config_override=None): """Return a raw data processor which reads events from input_file_or_directory config_override can be used to set additional pax options """ if config_override is None: config_override = {} # Add the input name to the config_override # Apply the user overrides, section by section config_override.setdefault('pax', {}) config_override['pax']['input_name'] = input_file_or_directory return core.Processor(config_names=hax.config['experiment'], config_dict=config_override)
## # Grid stuff ## # Holds paths to temporarily downloaded data files # dictionary: (dataset, event): path # will be deleted at exit. temporary_data_files = {}
[docs]def cleanup_temporary_data_files(): """Removes all temporarily downloaded raw data files. Run automatically for you when your program quits """ for tempfile_path in temporary_data_files.values(): os.remove(tempfile_path)
atexit.register(cleanup_temporary_data_files)
[docs]def download_from_grid(file_path_tail): """Downloads file_path_tail from grid, returns filename of temporary file """ config = hax.config # Check if we have the grid key & certificate grid_key_path = os.path.expanduser(config['grid_key']) grid_cert_path = os.path.expanduser(config['grid_certificate']) if not os.path.exists(grid_key_path): raise ValueError("Cannot download from grid: grid key does not exist at %s" % grid_key_path) if not os.path.exists(grid_key_path): raise ValueError("Cannot download from grid: grid certificate does not exist at %s" % grid_key_path) # Make the grid URL grid_url = config['raw_data_grid_url'] + '' if not grid_url.endswith('/'): grid_url += '/' # Remember strings are immutable, so don't worry grid_url += file_path_tail # Download the file from GRID opener = build_opener(HTTPSClientAuthHandler(grid_key_path, grid_cert_path)) response = opener.open(grid_url) block_sz = 8192 f = tempfile.NamedTemporaryFile(delete=False) while True: buffer = response.read(block_sz) if not buffer: break f.write(buffer) f.close() return f.name
[docs]class HTTPSClientAuthHandler(HTTPSHandler): """Used for accessing GRID data and handling authentication""" def __init__(self, key, cert): HTTPSHandler.__init__(self) self.key = key self.cert = cert
[docs] def https_open(self, req): return self.do_open(self.getConnection, req)
[docs] def getConnection(self, host, timeout): # TODO: timout is not used, but is passed, can't delete it or error return http.client.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)