fab.datasources.special
1from . import DataSource, HDFSource 2import h5py as h5 3import dask 4 5from ..settings import cfg 6 7import logging 8logger = logging.getLogger(__name__) 9 10__all__ = ['Timestamp', 'GMD', 'BAM'] 11 12class Timestamp(HDFSource): 13 ''' Loads the (indicative) timestamps for each pulse train 14 15 Provides the timestamp of each FLASH pulse train. Be aware that the values 16 might not be accurate enough to be used for synchronization with other data 17 sources that are not based on the HDF5 files provided by FLASH. 18 Args: 19 date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. 20 default: True 21 Other arguments: see `fab.datasources.HDFSource` 22 ''' 23 def __init__(self, name='timestamp', 24 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 25 preload_values=True, date_dtype=True, **kwargs): 26 27 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 28 29 if date_dtype: 30 self.dtype = 'datetime64[s]' 31 else: 32 self.dtype = 'float64' 33 34 self._preload_key_vals.append(self.dtype) 35 36 @staticmethod 37 @dask.delayed 38 def _load_key(hdf_key, fname): #Override loading to get the 'time' field instead of 'value' 39 return h5.File(fname)[hdf_key]['time'][:] 40 41 def _load_from_file(self, file_data): #Intercept data shape and take only lenght 42 newshape = (file_data.data_shape[0], ) 43 return super()._load_from_file(file_data._replace(data_shape=newshape)) 44 45 def _load(self, *args, **kwargs): #Intercept data and cast to chosen dtype 46 return super()._load(*args, **kwargs).astype(self.dtype) 47 48class _FastGMD(HDFSource): 49 ''' Helper class to load the raw GMD data from the HDF files without extra dimensions 50 51 Do not use this class directly, use the `GMD` class instead. If you want to load the 52 fast data without calibration, set the calibration_key to "". 53 54 This class clean up the raw HDF data from a GMD monitor and only loads pulse intensities 55 in a 2 dimensional array with shape (train_id, shotnum). 56 57 Args: 58 same as HDFSource 59 ''' 60 def __init__(self, *args, **kwargs): 61 super().__init__(*args, **kwargs) 62 self.dim_names = ['shot_id'] 63 64 @staticmethod 65 @dask.delayed 66 def _load_key(hdf_key, fname): #Override loading to get only first element of second dimension 67 return h5.File(fname)[hdf_key]['value'][:, 0, :] 68 69 def _load_from_file(self, file_data): #Intercept data shape and remove second dimension, we won't be loading it 70 newshape = file_data.data_shape[0], file_data.data_shape[2] 71 return super()._load_from_file(file_data._replace(data_shape=newshape)) 72 73 74class GMD(DataSource): 75 """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved) 76 77 Loads pulse resolved GMD data and calibrates it using the average GMD data. 78 If a calibration key is provided, the data is units of uJ. 79 80 Args: 81 name (str): Name of the data source 82 data_key (str): HDF key for the pulse-resolved GMD data, 83 eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall" 84 calibration_key (str): HDF key pointing average GMD data for calibration 85 eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" 86 Set this to "" to disable calibration. Skipping calibration will 87 increase loading performance. If you do not need absolute values 88 for the GMD, you can safely skip calibration. 89 block_size (int): Number of shots to average over for calibration 90 """ 91 92 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 93 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 94 super().__init__(*args, **kwargs) 95 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 96 self.block_size = block_size 97 98 match calibration_key: 99 case "": 100 self.slow = None 101 102 case None: 103 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 104 self.slow = None 105 106 case _: 107 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None)) 108 109 110 def _repr_rows(self): 111 rows = super()._repr_rows() 112 rows += [ 113 ('Data key', self.fast.hdf_key), 114 ('Calibration key', self.slow.hdf_key if self.slow else 'disabled'), 115 ] 116 if self.slow: 117 rows += [('Calibration block size', self.block_size)] 118 return rows 119 120 def load(self, *, daq_run=None): 121 fast = self.fast.load(daq_run=daq_run) 122 123 #Calibrate fast data using slow data average 124 if self.slow: 125 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 126 127 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 128 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 129 ratio = (slow_avg/fast_avg) 130 fast = fast * ratio 131 132 return fast.rename(self.name) 133 134class BAM(DataSource): 135 ''' Ad-hoc loader for the BAM data from bunch arrival monitors 136 137 Cleans up the raw HDF data for BAM data and only loads valid data 138 in a two-dimensional array with shape (train_id, shot_id). 139 140 Args: 141 name (str): Name of the data source 142 timing_key (str): HDF key where to load the number of shots per pulse train 143 e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup` 144 data_key (str): HDF key for actual BAM data, e.g. 145 `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup` 146 fillna_method: see `fab.datasources.DataSource` 147 ''' 148 def __init__(self, *args, timing_key, data_key, 149 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 150 super().__init__(*args, **kwargs) 151 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 152 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 153 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 154 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 155 156 def _repr_rows(self): 157 rows = super()._repr_rows() 158 rows += [ 159 ('Timing key', self.timing.hdf_key), 160 ('Data key', self.data.hdf_key), 161 ] 162 return rows 163 164 def load(self, *, daq_run=None): 165 timing = self.timing.load(daq_run=daq_run) 166 data = self.data.load(daq_run=daq_run) 167 168 shot_num = int(timing[:,3].max()) 169 return data[:, :shot_num]
13class Timestamp(HDFSource): 14 ''' Loads the (indicative) timestamps for each pulse train 15 16 Provides the timestamp of each FLASH pulse train. Be aware that the values 17 might not be accurate enough to be used for synchronization with other data 18 sources that are not based on the HDF5 files provided by FLASH. 19 Args: 20 date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. 21 default: True 22 Other arguments: see `fab.datasources.HDFSource` 23 ''' 24 def __init__(self, name='timestamp', 25 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 26 preload_values=True, date_dtype=True, **kwargs): 27 28 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 29 30 if date_dtype: 31 self.dtype = 'datetime64[s]' 32 else: 33 self.dtype = 'float64' 34 35 self._preload_key_vals.append(self.dtype) 36 37 @staticmethod 38 @dask.delayed 39 def _load_key(hdf_key, fname): #Override loading to get the 'time' field instead of 'value' 40 return h5.File(fname)[hdf_key]['time'][:] 41 42 def _load_from_file(self, file_data): #Intercept data shape and take only lenght 43 newshape = (file_data.data_shape[0], ) 44 return super()._load_from_file(file_data._replace(data_shape=newshape)) 45 46 def _load(self, *args, **kwargs): #Intercept data and cast to chosen dtype 47 return super()._load(*args, **kwargs).astype(self.dtype)
Loads the (indicative) timestamps for each pulse train
Provides the timestamp of each FLASH pulse train. Be aware that the values
might not be accurate enough to be used for synchronization with other data
sources that are not based on the HDF5 files provided by FLASH.
Arguments:
- date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. default: True
- Other arguments: see
fab.datasources.HDFSource
24 def __init__(self, name='timestamp', 25 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 26 preload_values=True, date_dtype=True, **kwargs): 27 28 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 29 30 if date_dtype: 31 self.dtype = 'datetime64[s]' 32 else: 33 self.dtype = 'float64' 34 35 self._preload_key_vals.append(self.dtype)
Base init for a data source.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
75class GMD(DataSource): 76 """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved) 77 78 Loads pulse resolved GMD data and calibrates it using the average GMD data. 79 If a calibration key is provided, the data is units of uJ. 80 81 Args: 82 name (str): Name of the data source 83 data_key (str): HDF key for the pulse-resolved GMD data, 84 eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall" 85 calibration_key (str): HDF key pointing average GMD data for calibration 86 eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" 87 Set this to "" to disable calibration. Skipping calibration will 88 increase loading performance. If you do not need absolute values 89 for the GMD, you can safely skip calibration. 90 block_size (int): Number of shots to average over for calibration 91 """ 92 93 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 94 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 95 super().__init__(*args, **kwargs) 96 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 97 self.block_size = block_size 98 99 match calibration_key: 100 case "": 101 self.slow = None 102 103 case None: 104 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 105 self.slow = None 106 107 case _: 108 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None)) 109 110 111 def _repr_rows(self): 112 rows = super()._repr_rows() 113 rows += [ 114 ('Data key', self.fast.hdf_key), 115 ('Calibration key', self.slow.hdf_key if self.slow else 'disabled'), 116 ] 117 if self.slow: 118 rows += [('Calibration block size', self.block_size)] 119 return rows 120 121 def load(self, *, daq_run=None): 122 fast = self.fast.load(daq_run=daq_run) 123 124 #Calibrate fast data using slow data average 125 if self.slow: 126 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 127 128 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 129 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 130 ratio = (slow_avg/fast_avg) 131 fast = fast * ratio 132 133 return fast.rename(self.name)
Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
Loads pulse resolved GMD data and calibrates it using the average GMD data. If a calibration key is provided, the data is units of uJ.
Arguments:
- name (str): Name of the data source
- data_key (str): HDF key for the pulse-resolved GMD data, eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
- calibration_key (str): HDF key pointing average GMD data for calibration eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" Set this to "" to disable calibration. Skipping calibration will increase loading performance. If you do not need absolute values for the GMD, you can safely skip calibration.
- block_size (int): Number of shots to average over for calibration
93 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 94 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 95 super().__init__(*args, **kwargs) 96 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 97 self.block_size = block_size 98 99 match calibration_key: 100 case "": 101 self.slow = None 102 103 case None: 104 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 105 self.slow = None 106 107 case _: 108 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
Base init for a data source.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
121 def load(self, *, daq_run=None): 122 fast = self.fast.load(daq_run=daq_run) 123 124 #Calibrate fast data using slow data average 125 if self.slow: 126 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 127 128 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 129 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 130 ratio = (slow_avg/fast_avg) 131 fast = fast * ratio 132 133 return fast.rename(self.name)
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'
Inherited Members
135class BAM(DataSource): 136 ''' Ad-hoc loader for the BAM data from bunch arrival monitors 137 138 Cleans up the raw HDF data for BAM data and only loads valid data 139 in a two-dimensional array with shape (train_id, shot_id). 140 141 Args: 142 name (str): Name of the data source 143 timing_key (str): HDF key where to load the number of shots per pulse train 144 e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup` 145 data_key (str): HDF key for actual BAM data, e.g. 146 `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup` 147 fillna_method: see `fab.datasources.DataSource` 148 ''' 149 def __init__(self, *args, timing_key, data_key, 150 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 151 super().__init__(*args, **kwargs) 152 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 153 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 154 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 155 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 156 157 def _repr_rows(self): 158 rows = super()._repr_rows() 159 rows += [ 160 ('Timing key', self.timing.hdf_key), 161 ('Data key', self.data.hdf_key), 162 ] 163 return rows 164 165 def load(self, *, daq_run=None): 166 timing = self.timing.load(daq_run=daq_run) 167 data = self.data.load(daq_run=daq_run) 168 169 shot_num = int(timing[:,3].max()) 170 return data[:, :shot_num]
Ad-hoc loader for the BAM data from bunch arrival monitors
Cleans up the raw HDF data for BAM data and only loads valid data in a two-dimensional array with shape (train_id, shot_id).
Arguments:
- name (str): Name of the data source
- timing_key (str): HDF key where to load the number of shots per pulse train
e.g.
/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup - data_key (str): HDF key for actual BAM data, e.g.
/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup - fillna_method: see
fab.datasources.DataSource
149 def __init__(self, *args, timing_key, data_key, 150 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 151 super().__init__(*args, **kwargs) 152 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 153 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 154 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 155 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
Base init for a data source.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
165 def load(self, *, daq_run=None): 166 timing = self.timing.load(daq_run=daq_run) 167 data = self.data.load(daq_run=daq_run) 168 169 shot_num = int(timing[:,3].max()) 170 return data[:, :shot_num]
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'