fab.instruments

Instruments

Multiple data sources can be used and loaded togheter by combining them in afab.instruments.Instrument. This allows for syncornization of all sources, so that their indexes (e.g. train_id and shot_id) are aligned. Moreover, joint operations like filtering and sorting can be done simultaneously on all data sources.

An instrument can be constructed from a collection of sources:

from fab.magic improt config

from fab.instruments import Instrument
from fab.datasources import HDFSource, GMD 

instr_list = [ HDFSource('eTof', "/FL2/Experiment/MTCA-EXP1/ADQ412 GHz ADC/CH00/TD"),
               GMD('gmd', "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall")]
data = Instrument(instr_list).load(daq_run=43861)

in this case, data will be a xarray.Dataset containing two variables, 'eTof' and 'gmd'.

Configuration

In most cases there is no need to instantiate datasources and instruments directly. It is much easier to set up your instrument config through a configuration file (see the configuration section and the settings module documentation). If the loaded configuration defines an instrument, you can then directly import it like this:

from fab.magic improt config, ursa

data = ursa.load(daq_run=43861)

have a look at the fab.settings module documentation for more info on how to configure custum instruments in your config file.

Using multiple instruments

In some experiments the datasources to be loaded might differ between different run types. In this case, it is possible to define multiple instruments in the configuration file, and combine them at runtime as needed using the + operator:

from fab.magic improt config, beamtime, first_instrument, second_instrument

joint_instrument = first_instrument + second_instrument # this will merge the two instruments
data = joint_instrument.load(daq_run=...)
 1'''
 2## Instruments
 3
 4Multiple data sources can be used and loaded togheter by combining them in a`fab.instruments.Instrument`.
 5This allows for syncornization of all sources, so that their indexes (e.g. train_id and shot_id)
 6are aligned. Moreover, joint operations like filtering and sorting can be done simultaneously 
 7on all data sources.
 8
 9An instrument can be constructed from a collection of sources:
10
11```python
12from fab.magic improt config
13
14from fab.instruments import Instrument
15from fab.datasources import HDFSource, GMD 
16
17instr_list = [ HDFSource('eTof', "/FL2/Experiment/MTCA-EXP1/ADQ412 GHz ADC/CH00/TD"),
18               GMD('gmd', "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall")]
19data = Instrument(instr_list).load(daq_run=43861)
20```
21
22in this case, data will be a `xarray.Dataset` containing two variables, 'eTof' and 'gmd'.
23
24## Configuration
25
26In most cases there is no need to instantiate datasources and instruments 
27directly. It is much easier to set up your instrument config through a configuration file 
28(see the configuration section and the settings module documentation).
29If the loaded configuration defines an instrument, you can then directly import it like this:
30
31```python
32from fab.magic improt config, ursa
33
34data = ursa.load(daq_run=43861)
35```
36
37have a look at the `fab.settings` module documentation for more info on how to configure custum
38instruments in your config file.
39
40## Using multiple instruments
41
42In some experiments the datasources to be loaded might differ between different run types. 
43In this case, it is possible to define multiple instruments in the configuration file, 
44and combine them at runtime as needed using the + operator:
45
46```python
47from fab.magic improt config, beamtime, first_instrument, second_instrument
48
49joint_instrument = first_instrument + second_instrument # this will merge the two instruments
50data = joint_instrument.load(daq_run=...)
51```
52
53'''
54from ..settings import cfg
55from .baseinst import Instrument
56
57import logging
58logger = logging.getLogger(__name__)
59
60__all__ = ['Instrument']
61
62def __getattr__(name):
63    if name == '__path__':
64        raise AttributeError
65
66    logger.warning(f"Magic import from fab.instruments is deprecated and will be removed, use from fab.magic import {name}")        
67    try:
68        return Instrument.from_name(name)
69    except Exception as e:
70        logger.exception(e)
71        raise AttributeError
class Instrument:
 18class Instrument:
 19    ''' Represents an instrument 
 20    
 21    This class is used to hold togheter and syncornize data coming from 
 22    multiple datasources. It is initialized with a list of soruces and 
 23    provides method for loading data from those sources, as well as 
 24    postprocessing the data and performing "data fusion" to combine
 25    multiple data sources togheter. 
 26
 27    Patameters can be passed directly or loaded for a toml config file
 28    through the from_name function.
 29
 30    Args:
 31        sources (optional): a list of sources.
 32    '''
 33
 34    def __init__(self, sources: List[DataSource]):
 35        if isinstance(sources, list):
 36            self.sources = sources
 37        else:
 38            raise ValueError("Sources must be a list of datasources")
 39
 40    @staticmethod
 41    def from_name(name):
 42        ''' Initializes an instrument looking for it's configuration
 43        parameters in the loaded configuration'''
 44
 45        logger.debug(f"Loading instrument {name} from config.")
 46        return Instrument.from_config(cfg.instruments[name])
 47
 48    @staticmethod
 49    def from_config(instrument_config):
 50        preprocess = instrument_config.pop('__preprocess__', []) # list of preprocessors        
 51        sources = [DataSource.from_config(name, source_config) 
 52                   for name, source_config in instrument_config.items()]
 53        inst = Instrument(sources)
 54
 55        for preproc_config in preprocess:
 56            preprocessor = get_config_object(preproc_config.pop('__name__'))
 57            inst = preprocessor(inst, **preproc_config)
 58        return inst
 59
 60
 61    def __repr__(self):
 62        return f"{self.__class__.__name__}({self.sources})"
 63
 64    def __getattr__(self, key):
 65        ''' Easy access to the datasources making up the instrument'''
 66        return self[key]
 67
 68    def __getitem__(self, key):
 69        for source in self.sources:
 70            if source.name == key:
 71                return source
 72        raise KeyError(f"Source {key} not found")
 73
 74    def _fillna(self, dataset):
 75        ''' Fills null values in a dataset according to the methods 
 76            specified in each sources item'''
 77
 78        for name, data in dataset.items():
 79            source = self[name]
 80            if not source.fillna_method:
 81                continue
 82
 83            logger.debug(f"Filling null values for source {name}, method: {source.fillna_method}")
 84            if source.fillna_method == "ffill":
 85                dataset[name] = data.ffill(dim='train_id')
 86            else:
 87                if data.chunks is not None:
 88                    if data.nbytes > 4E9:
 89                        logger.warning(f"Interpolating source {name} will cause significant memory usage " +
 90                                        "since data will need to be rechunked. This could be a very bad idea " +
 91                                        "Please consider using 'ffill' instead")
 92                    else:
 93                        logger.warning(f"Interpolating source {name} will create one single chunk. " +
 94                                        "Consider setting preload_values to true.")
 95
 96                    dataset[name] = data.chunk(dict(train_id=-1)).interpolate_na(dim='train_id', method=source.fillna_method)
 97                else:
 98                    dataset[name] = data.interpolate_na(dim='train_id', method=source.fillna_method)
 99
100        return dataset
101
102    def load(self, run_df = None, **kwargs) -> xr.Dataset:
103        ''' Loads data from Instrument sources 
104        
105        Iteratively calls .load() on all datasources making up the instrument, returning
106        a xr.Dataset that contains the requested data, indexed by train_id. 
107        Sources are aligned and eventual missing data is filled accroding to each source
108        parameters (if no configuration is found, missing data is left as nan).
109
110        See the documentation of DataSource for more info about the filling of missing 
111        values.
112
113        Args:
114            run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the 
115                load method of each source. The DataFrame must have the same columns as the 
116                sources parameters. Use in conjunction with the dataframe returned by a
117                `fab.beamtime.Beamtime` object to load data from a scicat run table.
118            **kwargs: keyword arguments to be passed to the load method of each source. 
119                Each keyword arguments must correspond to the parameter name of at least one
120                of the sources to be loaed. 
121        Returns:
122            xr.Dataset
123        '''
124        assert not( (run_df is not None) and kwargs ), "Either run_df or kwargs must be passed, but not both."
125
126        if run_df is not None:
127            assert isinstance(run_df, pd.DataFrame), "run_df must be a pandas DataFrame"
128            kwargs = run_df.reset_index().to_dict(orient='list')
129        
130        loaded = {}
131        for source in self.sources:
132            # get only the kwargs that are valid for this source by inspecting the keyword arguments 
133            args = inspect.signature(source.load).parameters.keys()
134            args_dict = { key: val for key, val in kwargs.items() if key in args }
135            if len(args_dict) == 0 and len(kwargs) > 0:
136                logger.warning(f"Source {source.name} only accepts the following arguments: {args}, but {kwargs} were passed. All available data will be loaded.")
137                tele_log.warning("Kwargs to instrument load don't match")
138            loaded[source.name] = source.load(**args_dict)
139
140        # Merge, fillna and return        
141        try:
142            with dask.config.set({'array.slicing.split_large_chunks': True}): #Set and reset to avid large chunks formation during merge
143                return self._fillna(xr.merge(loaded.values(), compat='no_conflicts' if cfg.export_daq_run else 'minimal'))
144
145        except xr.MergeError as e:
146            raise xr.MergeError("Error merging datasources. Maybe multiple DAQs were used? Set `export_daq_run = false` in config to bypass daq_run conflict") from e
147
148    def __add__(self, other):
149        if isinstance(other, Instrument):
150            return Instrument([*self.sources, *other.sources])
151        elif isinstance(other, DataSource):
152            return Instrument([*self.sources, other])
153        return NotImplemented
154
155    __radd__=__add__

Represents an instrument

This class is used to hold togheter and syncornize data coming from multiple datasources. It is initialized with a list of soruces and provides method for loading data from those sources, as well as postprocessing the data and performing "data fusion" to combine multiple data sources togheter.

Patameters can be passed directly or loaded for a toml config file through the from_name function.

Arguments:
  • sources (optional): a list of sources.
Instrument(sources: List[fab.datasources.basesources.DataSource])
34    def __init__(self, sources: List[DataSource]):
35        if isinstance(sources, list):
36            self.sources = sources
37        else:
38            raise ValueError("Sources must be a list of datasources")
@staticmethod
def from_name(name):
40    @staticmethod
41    def from_name(name):
42        ''' Initializes an instrument looking for it's configuration
43        parameters in the loaded configuration'''
44
45        logger.debug(f"Loading instrument {name} from config.")
46        return Instrument.from_config(cfg.instruments[name])

Initializes an instrument looking for it's configuration parameters in the loaded configuration

@staticmethod
def from_config(instrument_config):
48    @staticmethod
49    def from_config(instrument_config):
50        preprocess = instrument_config.pop('__preprocess__', []) # list of preprocessors        
51        sources = [DataSource.from_config(name, source_config) 
52                   for name, source_config in instrument_config.items()]
53        inst = Instrument(sources)
54
55        for preproc_config in preprocess:
56            preprocessor = get_config_object(preproc_config.pop('__name__'))
57            inst = preprocessor(inst, **preproc_config)
58        return inst
def load(self, run_df=None, **kwargs) -> xarray.core.dataset.Dataset:
102    def load(self, run_df = None, **kwargs) -> xr.Dataset:
103        ''' Loads data from Instrument sources 
104        
105        Iteratively calls .load() on all datasources making up the instrument, returning
106        a xr.Dataset that contains the requested data, indexed by train_id. 
107        Sources are aligned and eventual missing data is filled accroding to each source
108        parameters (if no configuration is found, missing data is left as nan).
109
110        See the documentation of DataSource for more info about the filling of missing 
111        values.
112
113        Args:
114            run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the 
115                load method of each source. The DataFrame must have the same columns as the 
116                sources parameters. Use in conjunction with the dataframe returned by a
117                `fab.beamtime.Beamtime` object to load data from a scicat run table.
118            **kwargs: keyword arguments to be passed to the load method of each source. 
119                Each keyword arguments must correspond to the parameter name of at least one
120                of the sources to be loaed. 
121        Returns:
122            xr.Dataset
123        '''
124        assert not( (run_df is not None) and kwargs ), "Either run_df or kwargs must be passed, but not both."
125
126        if run_df is not None:
127            assert isinstance(run_df, pd.DataFrame), "run_df must be a pandas DataFrame"
128            kwargs = run_df.reset_index().to_dict(orient='list')
129        
130        loaded = {}
131        for source in self.sources:
132            # get only the kwargs that are valid for this source by inspecting the keyword arguments 
133            args = inspect.signature(source.load).parameters.keys()
134            args_dict = { key: val for key, val in kwargs.items() if key in args }
135            if len(args_dict) == 0 and len(kwargs) > 0:
136                logger.warning(f"Source {source.name} only accepts the following arguments: {args}, but {kwargs} were passed. All available data will be loaded.")
137                tele_log.warning("Kwargs to instrument load don't match")
138            loaded[source.name] = source.load(**args_dict)
139
140        # Merge, fillna and return        
141        try:
142            with dask.config.set({'array.slicing.split_large_chunks': True}): #Set and reset to avid large chunks formation during merge
143                return self._fillna(xr.merge(loaded.values(), compat='no_conflicts' if cfg.export_daq_run else 'minimal'))
144
145        except xr.MergeError as e:
146            raise xr.MergeError("Error merging datasources. Maybe multiple DAQs were used? Set `export_daq_run = false` in config to bypass daq_run conflict") from e

Loads data from Instrument sources

Iteratively calls .load() on all datasources making up the instrument, returning a xr.Dataset that contains the requested data, indexed by train_id. Sources are aligned and eventual missing data is filled accroding to each source parameters (if no configuration is found, missing data is left as nan).

See the documentation of DataSource for more info about the filling of missing values.

Arguments:
  • run_df (pd.DataFrame): a DataFrame containing the arguments to be passed to the load method of each source. The DataFrame must have the same columns as the sources parameters. Use in conjunction with the dataframe returned by a fab.beamtime.Beamtime object to load data from a scicat run table.
  • **kwargs: keyword arguments to be passed to the load method of each source. Each keyword arguments must correspond to the parameter name of at least one of the sources to be loaed.
Returns:

xr.Dataset