fab.datasources.special

  1from . import DataSource, HDFSource
  2import h5py as h5
  3import dask
  4
  5from ..settings import cfg
  6
  7import logging
  8logger = logging.getLogger(__name__)
  9
 10__all__ = ['Timestamp', 'GMD', 'BAM']
 11
 12class Timestamp(HDFSource):
 13    ''' Loads the (indicative) timestamps for each pulse train
 14    
 15        Provides the timestamp of each FLASH pulse train. Be aware that the values 
 16        might not be accurate enough to be used for synchronization with other data
 17        sources that are not based on the HDF5 files provided by FLASH.
 18    Args:
 19        date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype.
 20            default: True
 21        Other arguments: see `fab.datasources.HDFSource`
 22    '''
 23    def __init__(self, name='timestamp', 
 24                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
 25                       preload_values=True, date_dtype=True, **kwargs):
 26
 27        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
 28
 29        if date_dtype:
 30            self.dtype = 'datetime64[s]'
 31        else:
 32            self.dtype = 'float64'
 33
 34        self._preload_key_vals.append(self.dtype)
 35
 36    @staticmethod
 37    @dask.delayed
 38    def _load_key(hdf_key, fname):         #Override loading to get the 'time' field instead of 'value'
 39        return h5.File(fname)[hdf_key]['time'][:]
 40
 41    def _load_from_file(self, file_data):  #Intercept data shape and take only lenght
 42        newshape = (file_data.data_shape[0], )
 43        return super()._load_from_file(file_data._replace(data_shape=newshape))
 44
 45    def _load(self, *args, **kwargs):      #Intercept data and cast to chosen dtype
 46        return super()._load(*args, **kwargs).astype(self.dtype)
 47
 48class _FastGMD(HDFSource):
 49    ''' Helper class to load the raw GMD data from the HDF files without extra dimensions
 50    
 51    Do not use this class directly, use the `GMD` class instead. If you want to load the
 52    fast data without calibration, set the calibration_key to "".
 53
 54    This class clean up the raw HDF data from a GMD monitor and only loads pulse intensities
 55    in a 2 dimensional array with shape (train_id, shotnum).
 56
 57    Args:
 58        same as HDFSource
 59    '''
 60    def __init__(self, *args, **kwargs):
 61        super().__init__(*args, **kwargs)
 62        self.dim_names = ['shot_id']
 63
 64    @staticmethod
 65    @dask.delayed
 66    def _load_key(hdf_key, fname):         #Override loading to get only first element of second dimension
 67        return h5.File(fname)[hdf_key]['value'][:, 0, :]  
 68
 69    def _load_from_file(self, file_data):  #Intercept data shape and remove second dimension, we won't be loading it
 70        newshape = file_data.data_shape[0], file_data.data_shape[2]
 71        return super()._load_from_file(file_data._replace(data_shape=newshape))
 72
 73
 74class GMD(DataSource):
 75    """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
 76    
 77        Loads pulse resolved GMD data and calibrates it using the average GMD data.
 78        If a calibration key is provided, the data is units of uJ.
 79
 80        Args:
 81            name (str): Name of the data source
 82            data_key (str): HDF key for the pulse-resolved GMD data, 
 83                eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
 84            calibration_key (str): HDF key pointing average GMD data for calibration
 85                eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall"
 86                Set this to "" to disable calibration. Skipping calibration will
 87                increase loading performance. If you do not need absolute values
 88                for the GMD, you can safely skip calibration.
 89            block_size (int): Number of shots to average over for calibration
 90    """
 91
 92    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 93                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 94        super().__init__(*args, **kwargs)
 95        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 96        self.block_size = block_size
 97        
 98        match calibration_key:
 99            case "":
100                self.slow = None
101
102            case None:
103                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
104                self.slow = None
105
106            case _:
107                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
108
109
110    def _repr_rows(self):
111        rows = super()._repr_rows()
112        rows += [
113            ('Data key',        self.fast.hdf_key),
114            ('Calibration key', self.slow.hdf_key if self.slow else 'disabled'),
115        ]
116        if self.slow:
117            rows += [('Calibration block size', self.block_size)]
118        return rows
119
120    def load(self, *, daq_run=None):
121        fast = self.fast.load(daq_run=daq_run)
122
123        #Calibrate fast data using slow data average
124        if self.slow:
125            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
126
127            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
128            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
129            ratio = (slow_avg/fast_avg)
130            fast = fast * ratio
131
132        return fast.rename(self.name)
133
134class BAM(DataSource):
135    ''' Ad-hoc loader for the BAM data from bunch arrival monitors
136    
137    Cleans up the raw HDF data for BAM data and only loads valid data
138    in a two-dimensional array with shape (train_id, shot_id).
139
140    Args:
141        name (str): Name of the data source
142        timing_key (str): HDF key where to load the number of shots per pulse train
143            e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup`
144        data_key (str): HDF key for actual BAM data, e.g.
145            `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup`
146        fillna_method: see `fab.datasources.DataSource`
147    '''
148    def __init__(self, *args, timing_key, data_key,                        
149                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
150        super().__init__(*args, **kwargs)
151        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
152                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
153        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
154                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
155
156    def _repr_rows(self):
157        rows = super()._repr_rows()
158        rows += [
159            ('Timing key', self.timing.hdf_key),
160            ('Data key',   self.data.hdf_key),
161        ]
162        return rows
163
164    def load(self, *, daq_run=None):
165        timing = self.timing.load(daq_run=daq_run)
166        data   = self.data.load(daq_run=daq_run)
167
168        shot_num = int(timing[:,3].max())
169        return data[:, :shot_num]
class Timestamp(fab.datasources.HDFSource.HDFSource):
13class Timestamp(HDFSource):
14    ''' Loads the (indicative) timestamps for each pulse train
15    
16        Provides the timestamp of each FLASH pulse train. Be aware that the values 
17        might not be accurate enough to be used for synchronization with other data
18        sources that are not based on the HDF5 files provided by FLASH.
19    Args:
20        date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype.
21            default: True
22        Other arguments: see `fab.datasources.HDFSource`
23    '''
24    def __init__(self, name='timestamp', 
25                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
26                       preload_values=True, date_dtype=True, **kwargs):
27
28        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
29
30        if date_dtype:
31            self.dtype = 'datetime64[s]'
32        else:
33            self.dtype = 'float64'
34
35        self._preload_key_vals.append(self.dtype)
36
37    @staticmethod
38    @dask.delayed
39    def _load_key(hdf_key, fname):         #Override loading to get the 'time' field instead of 'value'
40        return h5.File(fname)[hdf_key]['time'][:]
41
42    def _load_from_file(self, file_data):  #Intercept data shape and take only lenght
43        newshape = (file_data.data_shape[0], )
44        return super()._load_from_file(file_data._replace(data_shape=newshape))
45
46    def _load(self, *args, **kwargs):      #Intercept data and cast to chosen dtype
47        return super()._load(*args, **kwargs).astype(self.dtype)

Loads the (indicative) timestamps for each pulse train

Provides the timestamp of each FLASH pulse train. Be aware that the values 
might not be accurate enough to be used for synchronization with other data
sources that are not based on the HDF5 files provided by FLASH.
Arguments:
  • date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. default: True
  • Other arguments: see fab.datasources.HDFSource
Timestamp( name='timestamp', hdf_key='/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup', *args, preload_values=True, date_dtype=True, **kwargs)
24    def __init__(self, name='timestamp', 
25                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
26                       preload_values=True, date_dtype=True, **kwargs):
27
28        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
29
30        if date_dtype:
31            self.dtype = 'datetime64[s]'
32        else:
33            self.dtype = 'float64'
34
35        self._preload_key_vals.append(self.dtype)

Base init for a data source.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
class GMD(fab.datasources.basesources.DataSource):
 75class GMD(DataSource):
 76    """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
 77    
 78        Loads pulse resolved GMD data and calibrates it using the average GMD data.
 79        If a calibration key is provided, the data is units of uJ.
 80
 81        Args:
 82            name (str): Name of the data source
 83            data_key (str): HDF key for the pulse-resolved GMD data, 
 84                eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
 85            calibration_key (str): HDF key pointing average GMD data for calibration
 86                eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall"
 87                Set this to "" to disable calibration. Skipping calibration will
 88                increase loading performance. If you do not need absolute values
 89                for the GMD, you can safely skip calibration.
 90            block_size (int): Number of shots to average over for calibration
 91    """
 92
 93    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 94                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 95        super().__init__(*args, **kwargs)
 96        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 97        self.block_size = block_size
 98        
 99        match calibration_key:
100            case "":
101                self.slow = None
102
103            case None:
104                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
105                self.slow = None
106
107            case _:
108                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
109
110
111    def _repr_rows(self):
112        rows = super()._repr_rows()
113        rows += [
114            ('Data key',        self.fast.hdf_key),
115            ('Calibration key', self.slow.hdf_key if self.slow else 'disabled'),
116        ]
117        if self.slow:
118            rows += [('Calibration block size', self.block_size)]
119        return rows
120
121    def load(self, *, daq_run=None):
122        fast = self.fast.load(daq_run=daq_run)
123
124        #Calibrate fast data using slow data average
125        if self.slow:
126            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
127
128            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
129            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
130            ratio = (slow_avg/fast_avg)
131            fast = fast * ratio
132
133        return fast.rename(self.name)

Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)

Loads pulse resolved GMD data and calibrates it using the average GMD data. If a calibration key is provided, the data is units of uJ.

Arguments:
  • name (str): Name of the data source
  • data_key (str): HDF key for the pulse-resolved GMD data, eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
  • calibration_key (str): HDF key pointing average GMD data for calibration eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" Set this to "" to disable calibration. Skipping calibration will increase loading performance. If you do not need absolute values for the GMD, you can safely skip calibration.
  • block_size (int): Number of shots to average over for calibration
GMD( *args, data_key: str, calibration_key: str = None, block_size=2000, beamtime=None, idx_path=None, hdf_path=None, **kwargs)
 93    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 94                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 95        super().__init__(*args, **kwargs)
 96        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 97        self.block_size = block_size
 98        
 99        match calibration_key:
100            case "":
101                self.slow = None
102
103            case None:
104                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
105                self.slow = None
106
107            case _:
108                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))

Base init for a data source.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
fast
block_size
def load(self, *, daq_run=None):
121    def load(self, *, daq_run=None):
122        fast = self.fast.load(daq_run=daq_run)
123
124        #Calibrate fast data using slow data average
125        if self.slow:
126            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
127
128            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
129            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
130            ratio = (slow_avg/fast_avg)
131            fast = fast * ratio
132
133        return fast.rename(self.name)

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'

class BAM(fab.datasources.basesources.DataSource):
135class BAM(DataSource):
136    ''' Ad-hoc loader for the BAM data from bunch arrival monitors
137    
138    Cleans up the raw HDF data for BAM data and only loads valid data
139    in a two-dimensional array with shape (train_id, shot_id).
140
141    Args:
142        name (str): Name of the data source
143        timing_key (str): HDF key where to load the number of shots per pulse train
144            e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup`
145        data_key (str): HDF key for actual BAM data, e.g.
146            `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup`
147        fillna_method: see `fab.datasources.DataSource`
148    '''
149    def __init__(self, *args, timing_key, data_key,                        
150                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
151        super().__init__(*args, **kwargs)
152        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
153                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
154        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
155                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
156
157    def _repr_rows(self):
158        rows = super()._repr_rows()
159        rows += [
160            ('Timing key', self.timing.hdf_key),
161            ('Data key',   self.data.hdf_key),
162        ]
163        return rows
164
165    def load(self, *, daq_run=None):
166        timing = self.timing.load(daq_run=daq_run)
167        data   = self.data.load(daq_run=daq_run)
168
169        shot_num = int(timing[:,3].max())
170        return data[:, :shot_num]

Ad-hoc loader for the BAM data from bunch arrival monitors

Cleans up the raw HDF data for BAM data and only loads valid data in a two-dimensional array with shape (train_id, shot_id).

Arguments:
  • name (str): Name of the data source
  • timing_key (str): HDF key where to load the number of shots per pulse train e.g. /zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup
  • data_key (str): HDF key for actual BAM data, e.g. /zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup
  • fillna_method: see fab.datasources.DataSource
BAM( *args, timing_key, data_key, beamtime=None, idx_path=None, hdf_path=None, **kwargs)
149    def __init__(self, *args, timing_key, data_key,                        
150                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
151        super().__init__(*args, **kwargs)
152        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
153                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
154        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
155                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)

Base init for a data source.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
timing
data
def load(self, *, daq_run=None):
165    def load(self, *, daq_run=None):
166        timing = self.timing.load(daq_run=daq_run)
167        data   = self.data.load(daq_run=daq_run)
168
169        shot_num = int(timing[:,3].max())
170        return data[:, :shot_num]

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'