fab.instruments
Instruments
Multiple data sources can be used and loaded togheter by combining them in afab.instruments.Instrument
.
This allows for syncornization of all sources, so that their indexes (e.g. train_id and shot_id)
are aligned. Moreover, joint operations like filtering and sorting can be done simultaneously
on all data sources.
An instrument can be constructed from a collection of sources:
from fab.magic improt config
from fab.instruments import Instrument
from fab.datasources import HDFSource, GMD
instr_list = [ HDFSource('eTof', "/FL2/Experiment/MTCA-EXP1/ADQ412 GHz ADC/CH00/TD"),
GMD('gmd', "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall")]
data = Instrument(instr_list).load(daq_run=43861)
in this case, data will be a xarray.Dataset
containing two variables, 'eTof' and 'gmd'.
Configuration
In most cases there is no need to instantiate datasources and instruments directly. It is much easier to set up your instrument config through a configuration file (see the configuration section and the settings module documentation). If the loaded configuration defines an instrument, you can then directly import it like this:
from fab.magic improt config, ursa
data = ursa.load(daq_run=43861)
have a look at the fab.settings
module documentation for more info on how to configure custum
instruments in your config file.
Using multiple instruments
In some experiments the datasources to be loaded might differ between different run types. In this case, it is possible to define multiple instruments in the configuration file, and combine them at runtime as needed using the + operator:
from fab.magic improt config, beamtime, first_instrument, second_instrument
joint_instrument = first_instrument + second_instrument # this will merge the two instruments
data = joint_instrument.load(daq_run=...)
1''' 2## Instruments 3 4Multiple data sources can be used and loaded togheter by combining them in a`fab.instruments.Instrument`. 5This allows for syncornization of all sources, so that their indexes (e.g. train_id and shot_id) 6are aligned. Moreover, joint operations like filtering and sorting can be done simultaneously 7on all data sources. 8 9An instrument can be constructed from a collection of sources: 10 11```python 12from fab.magic improt config 13 14from fab.instruments import Instrument 15from fab.datasources import HDFSource, GMD 16 17instr_list = [ HDFSource('eTof', "/FL2/Experiment/MTCA-EXP1/ADQ412 GHz ADC/CH00/TD"), 18 GMD('gmd', "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall")] 19data = Instrument(instr_list).load(daq_run=43861) 20``` 21 22in this case, data will be a `xarray.Dataset` containing two variables, 'eTof' and 'gmd'. 23 24## Configuration 25 26In most cases there is no need to instantiate datasources and instruments 27directly. It is much easier to set up your instrument config through a configuration file 28(see the configuration section and the settings module documentation). 29If the loaded configuration defines an instrument, you can then directly import it like this: 30 31```python 32from fab.magic improt config, ursa 33 34data = ursa.load(daq_run=43861) 35``` 36 37have a look at the `fab.settings` module documentation for more info on how to configure custum 38instruments in your config file. 39 40## Using multiple instruments 41 42In some experiments the datasources to be loaded might differ between different run types. 43In this case, it is possible to define multiple instruments in the configuration file, 44and combine them at runtime as needed using the + operator: 45 46```python 47from fab.magic improt config, beamtime, first_instrument, second_instrument 48 49joint_instrument = first_instrument + second_instrument # this will merge the two instruments 50data = joint_instrument.load(daq_run=...) 51``` 52 53''' 54from ..settings import cfg 55from .baseinst import Instrument 56 57import logging 58logger = logging.getLogger(__name__) 59 60__all__ = ['Instrument'] 61 62def __getattr__(name): 63 if name == '__path__': 64 raise AttributeError 65 66 logger.warning(f"Magic import from fab.instruments is deprecated and will be removed, use from fab.magic import {name}") 67 try: 68 return Instrument.from_name(name) 69 except Exception as e: 70 logger.exception(e) 71 raise AttributeError
18class Instrument: 19 ''' Represents an instrument 20 21 This class is used to hold togheter and syncornize data coming from 22 multiple datasources. It is initialized with a list of soruces and 23 provides method for loading data from those sources, as well as 24 postprocessing the data and performing "data fusion" to combine 25 multiple data sources togheter. 26 27 Patameters can be passed directly or loaded for a toml config file 28 through the from_name function. 29 30 Args: 31 sources (optional): a list of sources. 32 ''' 33 34 def __init__(self, sources: List[DataSource]): 35 if isinstance(sources, list): 36 self.sources = sources 37 else: 38 raise ValueError("Sources must be a list of datasources") 39 40 @staticmethod 41 def from_name(name): 42 ''' Initializes an instrument looking for it's configuration 43 parameters in the loaded configuration''' 44 45 logger.debug(f"Loading instrument {name} from config.") 46 return Instrument.from_config(cfg.instruments[name]) 47 48 @staticmethod 49 def from_config(instrument_config): 50 preprocess = instrument_config.pop('__preprocess__', []) # list of preprocessors 51 sources = [DataSource.from_config(name, source_config) 52 for name, source_config in instrument_config.items()] 53 inst = Instrument(sources) 54 55 for preproc_config in preprocess: 56 preprocessor = get_config_object(preproc_config.pop('__name__')) 57 inst = preprocessor(inst, **preproc_config) 58 return inst 59 60 61 def __repr__(self): 62 return f"{self.__class__.__name__}({self.sources})" 63 64 def __getattr__(self, key): 65 ''' Easy access to the datasources making up the instrument''' 66 return self[key] 67 68 def __getitem__(self, key): 69 for source in self.sources: 70 if source.name == key: 71 return source 72 raise KeyError(f"Source {key} not found") 73 74 def _fillna(self, dataset): 75 ''' Fills null values in a dataset according to the methods 76 specified in each sources item''' 77 78 for name, data in dataset.items(): 79 source = self[name] 80 if not source.fillna_method: 81 continue 82 83 logger.debug(f"Filling null values for source {name}, method: {source.fillna_method}") 84 if source.fillna_method == "ffill": 85 dataset[name] = data.ffill(dim='train_id') 86 else: 87 if data.chunks is not None: 88 if data.nbytes > 4E9: 89 logger.warning(f"Interpolating source {name} will cause significant memory usage " + 90 "since data will need to be rechunked. This could be a very bad idea " + 91 "Please consider using 'ffill' instead") 92 else: 93 logger.warning(f"Interpolating source {name} will create one single chunk. " + 94 "Consider setting preload_values to true.") 95 96 dataset[name] = data.chunk(dict(train_id=-1)).interpolate_na(dim='train_id', method=source.fillna_method) 97 else: 98 dataset[name] = data.interpolate_na(dim='train_id', method=source.fillna_method) 99 100 return dataset 101 102 def load(self, run_df = None, **kwargs) -> xr.Dataset: 103 ''' Loads data from Instrument sources 104 105 Iteratively calls .load() on all datasources making up the instrument, returning 106 a xr.Dataset that contains the requested data, indexed by train_id. 107 Sources are aligned and eventual missing data is filled accroding to each source 108 parameters (if no configuration is found, missing data is left as nan). 109 110 See the documentation of DataSource for more info about the filling of missing 111 values. 112 113 Args: 114 run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the 115 load method of each source. The DataFrame must have the same columns as the 116 sources parameters. Use in conjunction with the dataframe returned by a 117 `fab.beamtime.Beamtime` object to load data from a scicat run table. 118 **kwargs: keyword arguments to be passed to the load method of each source. 119 Each keyword arguments must correspond to the parameter name of at least one 120 of the sources to be loaed. 121 Returns: 122 xr.Dataset 123 ''' 124 assert not( (run_df is not None) and kwargs ), "Either run_df or kwargs must be passed, but not both." 125 126 if run_df is not None: 127 assert isinstance(run_df, pd.DataFrame), "run_df must be a pandas DataFrame" 128 kwargs = run_df.reset_index().to_dict(orient='list') 129 130 loaded = {} 131 for source in self.sources: 132 # get only the kwargs that are valid for this source by inspecting the keyword arguments 133 args = inspect.signature(source.load).parameters.keys() 134 args_dict = { key: val for key, val in kwargs.items() if key in args } 135 if len(args_dict) == 0 and len(kwargs) > 0: 136 logger.warning(f"Source {source.name} only accepts the following arguments: {args}, but {kwargs} were passed. All available data will be loaded.") 137 tele_log.warning("Kwargs to instrument load don't match") 138 loaded[source.name] = source.load(**args_dict) 139 140 # Merge, fillna and return 141 try: 142 with dask.config.set({'array.slicing.split_large_chunks': True}): #Set and reset to avid large chunks formation during merge 143 return self._fillna(xr.merge(loaded.values(), compat='no_conflicts' if cfg.export_daq_run else 'minimal')) 144 145 except xr.MergeError as e: 146 raise xr.MergeError("Error merging datasources. Maybe multiple DAQs were used? Set `export_daq_run = false` in config to bypass daq_run conflict") from e 147 148 def __add__(self, other): 149 if isinstance(other, Instrument): 150 return Instrument([*self.sources, *other.sources]) 151 elif isinstance(other, DataSource): 152 return Instrument([*self.sources, other]) 153 return NotImplemented 154 155 __radd__=__add__
Represents an instrument
This class is used to hold togheter and syncornize data coming from multiple datasources. It is initialized with a list of soruces and provides method for loading data from those sources, as well as postprocessing the data and performing "data fusion" to combine multiple data sources togheter.
Patameters can be passed directly or loaded for a toml config file through the from_name function.
Arguments:
- sources (optional): a list of sources.
40 @staticmethod 41 def from_name(name): 42 ''' Initializes an instrument looking for it's configuration 43 parameters in the loaded configuration''' 44 45 logger.debug(f"Loading instrument {name} from config.") 46 return Instrument.from_config(cfg.instruments[name])
Initializes an instrument looking for it's configuration parameters in the loaded configuration
48 @staticmethod 49 def from_config(instrument_config): 50 preprocess = instrument_config.pop('__preprocess__', []) # list of preprocessors 51 sources = [DataSource.from_config(name, source_config) 52 for name, source_config in instrument_config.items()] 53 inst = Instrument(sources) 54 55 for preproc_config in preprocess: 56 preprocessor = get_config_object(preproc_config.pop('__name__')) 57 inst = preprocessor(inst, **preproc_config) 58 return inst
102 def load(self, run_df = None, **kwargs) -> xr.Dataset: 103 ''' Loads data from Instrument sources 104 105 Iteratively calls .load() on all datasources making up the instrument, returning 106 a xr.Dataset that contains the requested data, indexed by train_id. 107 Sources are aligned and eventual missing data is filled accroding to each source 108 parameters (if no configuration is found, missing data is left as nan). 109 110 See the documentation of DataSource for more info about the filling of missing 111 values. 112 113 Args: 114 run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the 115 load method of each source. The DataFrame must have the same columns as the 116 sources parameters. Use in conjunction with the dataframe returned by a 117 `fab.beamtime.Beamtime` object to load data from a scicat run table. 118 **kwargs: keyword arguments to be passed to the load method of each source. 119 Each keyword arguments must correspond to the parameter name of at least one 120 of the sources to be loaed. 121 Returns: 122 xr.Dataset 123 ''' 124 assert not( (run_df is not None) and kwargs ), "Either run_df or kwargs must be passed, but not both." 125 126 if run_df is not None: 127 assert isinstance(run_df, pd.DataFrame), "run_df must be a pandas DataFrame" 128 kwargs = run_df.reset_index().to_dict(orient='list') 129 130 loaded = {} 131 for source in self.sources: 132 # get only the kwargs that are valid for this source by inspecting the keyword arguments 133 args = inspect.signature(source.load).parameters.keys() 134 args_dict = { key: val for key, val in kwargs.items() if key in args } 135 if len(args_dict) == 0 and len(kwargs) > 0: 136 logger.warning(f"Source {source.name} only accepts the following arguments: {args}, but {kwargs} were passed. All available data will be loaded.") 137 tele_log.warning("Kwargs to instrument load don't match") 138 loaded[source.name] = source.load(**args_dict) 139 140 # Merge, fillna and return 141 try: 142 with dask.config.set({'array.slicing.split_large_chunks': True}): #Set and reset to avid large chunks formation during merge 143 return self._fillna(xr.merge(loaded.values(), compat='no_conflicts' if cfg.export_daq_run else 'minimal')) 144 145 except xr.MergeError as e: 146 raise xr.MergeError("Error merging datasources. Maybe multiple DAQs were used? Set `export_daq_run = false` in config to bypass daq_run conflict") from e
Loads data from Instrument sources
Iteratively calls .load() on all datasources making up the instrument, returning a xr.Dataset that contains the requested data, indexed by train_id. Sources are aligned and eventual missing data is filled accroding to each source parameters (if no configuration is found, missing data is left as nan).
See the documentation of DataSource for more info about the filling of missing values.
Arguments:
- run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the
load method of each source. The DataFrame must have the same columns as the
sources parameters. Use in conjunction with the dataframe returned by a
fab.beamtime.Beamtime
object to load data from a scicat run table. - **kwargs: keyword arguments to be passed to the load method of each source. Each keyword arguments must correspond to the parameter name of at least one of the sources to be loaed.
Returns:
xr.Dataset