fab.datasources.special

  1from . import DataSource, HDFSource
  2import h5py as h5
  3import dask
  4
  5from ..settings import cfg
  6
  7import logging
  8logger = logging.getLogger(__name__)
  9
 10
 11class Timestamp(HDFSource):
 12    ''' Loads the (indicative) timestamps for each pulse train
 13    
 14        Provids the timestamp of each FLASH pulse train. Be aware that the values 
 15        might not be accurate enough to be used for syncronization with other data
 16        sources that are not based on the HDF5 files provided by FLASH.
 17    Args:
 18        date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype.
 19            default: True
 20        Other arguments: see `fab.datasources.HDFSource`
 21    '''
 22    def __init__(self, name='timestamp', 
 23                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
 24                       preload_values=True, date_dtype=True, **kwargs):
 25
 26        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
 27
 28        if date_dtype:
 29            self.dtype = 'datetime64[s]'
 30        else:
 31            self.dtype = 'float64'
 32
 33        self._preload_key_vals.append(self.dtype)
 34
 35    @staticmethod
 36    @dask.delayed
 37    def _load_key(hdf_key, fname):         #Override loading to get the 'time' field instead of 'value'
 38        return h5.File(fname)[hdf_key]['time'][:]
 39
 40    def _load_from_file(self, file_data):  #Intercept data shape and take only lenght
 41        newshape = (file_data.data_shape[0], )
 42        return super()._load_from_file(file_data._replace(data_shape=newshape))
 43
 44    def _load(self, *args, **kwargs):      #Intercept data and cast to chosen dtype
 45        return super()._load(*args, **kwargs).astype(self.dtype)
 46
 47class _FastGMD(HDFSource):
 48    ''' Helper class to load the raw GMD data from the HDF files without extra dimensions
 49    
 50    Do not use this class directly, use the `GMD` class instead. If you want to load the
 51    fast data without calibration, set the avea
 52
 53    This class clean up the raw HDF data from a GMD monitor and only loads pulse intensities
 54    in a 2 dimensional array with shape (train_id, shotnum).
 55
 56    Args:
 57        same as HDFSource
 58    '''
 59    def __init__(self, *args, **kwargs):
 60        super().__init__(*args, **kwargs)
 61        self.dim_names = ['shot_id']
 62
 63    @staticmethod
 64    @dask.delayed
 65    def _load_key(hdf_key, fname):         #Override loading to get only first element of second dimension
 66        return h5.File(fname)[hdf_key]['value'][:, 0, :]  
 67
 68    def _load_from_file(self, file_data):  #Intercept data shape and remove second dimension, we won't be loading it
 69        newshape = file_data.data_shape[0], file_data.data_shape[2]
 70        return super()._load_from_file(file_data._replace(data_shape=newshape))
 71
 72
 73class GMD(DataSource):
 74    """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
 75    
 76        Loads pulse resolved GMD data and calibrates it using the average GMD data.
 77        If a calibration key is provided, the data is units of uJ.
 78
 79        Args:
 80            name (str): Name of the data source
 81            data_key (str): HDF key for the pulse-resolved GMD data, 
 82                eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
 83            calibration_key (str): HDF key pointing average GMD data for calibration
 84                eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall"
 85                Set this to "" to disable calibration. Skipping calibration will
 86                increase loading performance. If you do not need aboslute values
 87                for the GMD, you can safely skip calibration.
 88            block_size (int): Number of shots to average over for calibration
 89    """
 90
 91    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 92                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 93        super().__init__(*args, **kwargs)
 94        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 95        self.block_size = block_size
 96        
 97        match calibration_key:
 98            case "":
 99                self.slow = None
100
101            case None:
102                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
103                self.slow = None
104
105            case _:
106                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
107
108
109    def load(self, *, daq_run=None):
110        fast = self.fast.load(daq_run=daq_run)
111
112        #Calibrate fast data using slow data average
113        if self.slow:
114            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
115
116            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
117            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
118            ratio = (slow_avg/fast_avg)
119            fast = fast * ratio
120
121        return fast.rename(self.name)
122
123class BAM(DataSource):
124    ''' Ad-hoc loader for the BAM data from bunch arrival monitors
125    
126    Cleans up the raw HDF data for BAM data and only loads valid data
127    in a two-dimensional array with shape (train_id, shot_id).
128
129    Args:
130        name (str): Name of the data source
131        timing_key (str): HDF key where to load the number of shots per pulse train
132            e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup`
133        data_key (str): HDF key for actual BAM data, e.g.
134            `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup`
135        fillna_method: see `fab.datasources.DataSource`
136    '''
137    def __init__(self, *args, timing_key, data_key,                        
138                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
139        super().__init__(*args, **kwargs)
140        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
141                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
142        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
143                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
144
145    def load(self, *, daq_run=None):
146        timing = self.timing.load(daq_run=daq_run)
147        data   = self.data.load(daq_run=daq_run)
148
149        shot_num = int(timing[:,3].max())
150        return data[:, :shot_num]
logger = <Logger fab.datasources.special (INFO)>
class Timestamp(fab.datasources.HDFSource.HDFSource):
12class Timestamp(HDFSource):
13    ''' Loads the (indicative) timestamps for each pulse train
14    
15        Provids the timestamp of each FLASH pulse train. Be aware that the values 
16        might not be accurate enough to be used for syncronization with other data
17        sources that are not based on the HDF5 files provided by FLASH.
18    Args:
19        date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype.
20            default: True
21        Other arguments: see `fab.datasources.HDFSource`
22    '''
23    def __init__(self, name='timestamp', 
24                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
25                       preload_values=True, date_dtype=True, **kwargs):
26
27        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
28
29        if date_dtype:
30            self.dtype = 'datetime64[s]'
31        else:
32            self.dtype = 'float64'
33
34        self._preload_key_vals.append(self.dtype)
35
36    @staticmethod
37    @dask.delayed
38    def _load_key(hdf_key, fname):         #Override loading to get the 'time' field instead of 'value'
39        return h5.File(fname)[hdf_key]['time'][:]
40
41    def _load_from_file(self, file_data):  #Intercept data shape and take only lenght
42        newshape = (file_data.data_shape[0], )
43        return super()._load_from_file(file_data._replace(data_shape=newshape))
44
45    def _load(self, *args, **kwargs):      #Intercept data and cast to chosen dtype
46        return super()._load(*args, **kwargs).astype(self.dtype)

Loads the (indicative) timestamps for each pulse train

Provids the timestamp of each FLASH pulse train. Be aware that the values 
might not be accurate enough to be used for syncronization with other data
sources that are not based on the HDF5 files provided by FLASH.
Arguments:
  • date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. default: True
  • Other arguments: see fab.datasources.HDFSource
Timestamp( name='timestamp', hdf_key='/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup', *args, preload_values=True, date_dtype=True, **kwargs)
23    def __init__(self, name='timestamp', 
24                       hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args,
25                       preload_values=True, date_dtype=True, **kwargs):
26
27        super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs)
28
29        if date_dtype:
30            self.dtype = 'datetime64[s]'
31        else:
32            self.dtype = 'float64'
33
34        self._preload_key_vals.append(self.dtype)

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
class GMD(fab.datasources.basesources.DataSource):
 74class GMD(DataSource):
 75    """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
 76    
 77        Loads pulse resolved GMD data and calibrates it using the average GMD data.
 78        If a calibration key is provided, the data is units of uJ.
 79
 80        Args:
 81            name (str): Name of the data source
 82            data_key (str): HDF key for the pulse-resolved GMD data, 
 83                eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
 84            calibration_key (str): HDF key pointing average GMD data for calibration
 85                eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall"
 86                Set this to "" to disable calibration. Skipping calibration will
 87                increase loading performance. If you do not need aboslute values
 88                for the GMD, you can safely skip calibration.
 89            block_size (int): Number of shots to average over for calibration
 90    """
 91
 92    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 93                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 94        super().__init__(*args, **kwargs)
 95        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 96        self.block_size = block_size
 97        
 98        match calibration_key:
 99            case "":
100                self.slow = None
101
102            case None:
103                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
104                self.slow = None
105
106            case _:
107                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
108
109
110    def load(self, *, daq_run=None):
111        fast = self.fast.load(daq_run=daq_run)
112
113        #Calibrate fast data using slow data average
114        if self.slow:
115            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
116
117            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
118            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
119            ratio = (slow_avg/fast_avg)
120            fast = fast * ratio
121
122        return fast.rename(self.name)

Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)

Loads pulse resolved GMD data and calibrates it using the average GMD data. If a calibration key is provided, the data is units of uJ.

Arguments:
  • name (str): Name of the data source
  • data_key (str): HDF key for the pulse-resolved GMD data, eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
  • calibration_key (str): HDF key pointing average GMD data for calibration eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" Set this to "" to disable calibration. Skipping calibration will increase loading performance. If you do not need aboslute values for the GMD, you can safely skip calibration.
  • block_size (int): Number of shots to average over for calibration
GMD( *args, data_key: str, calibration_key: str = None, block_size=2000, beamtime=None, idx_path=None, hdf_path=None, **kwargs)
 92    def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 
 93                                             beamtime = None, idx_path = None, hdf_path = None, **kwargs):
 94        super().__init__(*args, **kwargs)
 95        self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
 96        self.block_size = block_size
 97        
 98        match calibration_key:
 99            case "":
100                self.slow = None
101
102            case None:
103                logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.')
104                self.slow = None
105
106            case _:
107                self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
fast
block_size
def load(self, *, daq_run=None):
110    def load(self, *, daq_run=None):
111        fast = self.fast.load(daq_run=daq_run)
112
113        #Calibrate fast data using slow data average
114        if self.slow:
115            slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id')
116
117            slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean()
118            fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id')
119            ratio = (slow_avg/fast_avg)
120            fast = fast * ratio
121
122        return fast.rename(self.name)

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'

class BAM(fab.datasources.basesources.DataSource):
124class BAM(DataSource):
125    ''' Ad-hoc loader for the BAM data from bunch arrival monitors
126    
127    Cleans up the raw HDF data for BAM data and only loads valid data
128    in a two-dimensional array with shape (train_id, shot_id).
129
130    Args:
131        name (str): Name of the data source
132        timing_key (str): HDF key where to load the number of shots per pulse train
133            e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup`
134        data_key (str): HDF key for actual BAM data, e.g.
135            `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup`
136        fillna_method: see `fab.datasources.DataSource`
137    '''
138    def __init__(self, *args, timing_key, data_key,                        
139                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
140        super().__init__(*args, **kwargs)
141        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
142                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
143        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
144                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
145
146    def load(self, *, daq_run=None):
147        timing = self.timing.load(daq_run=daq_run)
148        data   = self.data.load(daq_run=daq_run)
149
150        shot_num = int(timing[:,3].max())
151        return data[:, :shot_num]

Ad-hoc loader for the BAM data from bunch arrival monitors

Cleans up the raw HDF data for BAM data and only loads valid data in a two-dimensional array with shape (train_id, shot_id).

Arguments:
  • name (str): Name of the data source
  • timing_key (str): HDF key where to load the number of shots per pulse train e.g. /zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup
  • data_key (str): HDF key for actual BAM data, e.g. /zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup
  • fillna_method: see fab.datasources.DataSource
BAM( *args, timing_key, data_key, beamtime=None, idx_path=None, hdf_path=None, **kwargs)
138    def __init__(self, *args, timing_key, data_key,                        
139                              beamtime = None, idx_path = None, hdf_path = None, **kwargs):
140        super().__init__(*args, **kwargs)
141        self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True),
142                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
143        self.data   = HDFSource(self.name, data_key, dim_names=['shot_id'],
144                                beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
timing
data
def load(self, *, daq_run=None):
146    def load(self, *, daq_run=None):
147        timing = self.timing.load(daq_run=daq_run)
148        data   = self.data.load(daq_run=daq_run)
149
150        shot_num = int(timing[:,3].max())
151        return data[:, :shot_num]

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'