fab.datasources.special
1from . import DataSource, HDFSource 2import h5py as h5 3import dask 4 5from ..settings import cfg 6 7import logging 8logger = logging.getLogger(__name__) 9 10 11class Timestamp(HDFSource): 12 ''' Loads the (indicative) timestamps for each pulse train 13 14 Provids the timestamp of each FLASH pulse train. Be aware that the values 15 might not be accurate enough to be used for syncronization with other data 16 sources that are not based on the HDF5 files provided by FLASH. 17 Args: 18 date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. 19 default: True 20 Other arguments: see `fab.datasources.HDFSource` 21 ''' 22 def __init__(self, name='timestamp', 23 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 24 preload_values=True, date_dtype=True, **kwargs): 25 26 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 27 28 if date_dtype: 29 self.dtype = 'datetime64[s]' 30 else: 31 self.dtype = 'float64' 32 33 self._preload_key_vals.append(self.dtype) 34 35 @staticmethod 36 @dask.delayed 37 def _load_key(hdf_key, fname): #Override loading to get the 'time' field instead of 'value' 38 return h5.File(fname)[hdf_key]['time'][:] 39 40 def _load_from_file(self, file_data): #Intercept data shape and take only lenght 41 newshape = (file_data.data_shape[0], ) 42 return super()._load_from_file(file_data._replace(data_shape=newshape)) 43 44 def _load(self, *args, **kwargs): #Intercept data and cast to chosen dtype 45 return super()._load(*args, **kwargs).astype(self.dtype) 46 47class _FastGMD(HDFSource): 48 ''' Helper class to load the raw GMD data from the HDF files without extra dimensions 49 50 Do not use this class directly, use the `GMD` class instead. If you want to load the 51 fast data without calibration, set the avea 52 53 This class clean up the raw HDF data from a GMD monitor and only loads pulse intensities 54 in a 2 dimensional array with shape (train_id, shotnum). 55 56 Args: 57 same as HDFSource 58 ''' 59 def __init__(self, *args, **kwargs): 60 super().__init__(*args, **kwargs) 61 self.dim_names = ['shot_id'] 62 63 @staticmethod 64 @dask.delayed 65 def _load_key(hdf_key, fname): #Override loading to get only first element of second dimension 66 return h5.File(fname)[hdf_key]['value'][:, 0, :] 67 68 def _load_from_file(self, file_data): #Intercept data shape and remove second dimension, we won't be loading it 69 newshape = file_data.data_shape[0], file_data.data_shape[2] 70 return super()._load_from_file(file_data._replace(data_shape=newshape)) 71 72 73class GMD(DataSource): 74 """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved) 75 76 Loads pulse resolved GMD data and calibrates it using the average GMD data. 77 If a calibration key is provided, the data is units of uJ. 78 79 Args: 80 name (str): Name of the data source 81 data_key (str): HDF key for the pulse-resolved GMD data, 82 eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall" 83 calibration_key (str): HDF key pointing average GMD data for calibration 84 eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" 85 Set this to "" to disable calibration. Skipping calibration will 86 increase loading performance. If you do not need aboslute values 87 for the GMD, you can safely skip calibration. 88 block_size (int): Number of shots to average over for calibration 89 """ 90 91 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 92 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 93 super().__init__(*args, **kwargs) 94 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 95 self.block_size = block_size 96 97 match calibration_key: 98 case "": 99 self.slow = None 100 101 case None: 102 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 103 self.slow = None 104 105 case _: 106 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None)) 107 108 109 def load(self, *, daq_run=None): 110 fast = self.fast.load(daq_run=daq_run) 111 112 #Calibrate fast data using slow data average 113 if self.slow: 114 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 115 116 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 117 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 118 ratio = (slow_avg/fast_avg) 119 fast = fast * ratio 120 121 return fast.rename(self.name) 122 123class BAM(DataSource): 124 ''' Ad-hoc loader for the BAM data from bunch arrival monitors 125 126 Cleans up the raw HDF data for BAM data and only loads valid data 127 in a two-dimensional array with shape (train_id, shot_id). 128 129 Args: 130 name (str): Name of the data source 131 timing_key (str): HDF key where to load the number of shots per pulse train 132 e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup` 133 data_key (str): HDF key for actual BAM data, e.g. 134 `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup` 135 fillna_method: see `fab.datasources.DataSource` 136 ''' 137 def __init__(self, *args, timing_key, data_key, 138 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 139 super().__init__(*args, **kwargs) 140 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 141 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 142 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 143 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 144 145 def load(self, *, daq_run=None): 146 timing = self.timing.load(daq_run=daq_run) 147 data = self.data.load(daq_run=daq_run) 148 149 shot_num = int(timing[:,3].max()) 150 return data[:, :shot_num]
12class Timestamp(HDFSource): 13 ''' Loads the (indicative) timestamps for each pulse train 14 15 Provids the timestamp of each FLASH pulse train. Be aware that the values 16 might not be accurate enough to be used for syncronization with other data 17 sources that are not based on the HDF5 files provided by FLASH. 18 Args: 19 date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. 20 default: True 21 Other arguments: see `fab.datasources.HDFSource` 22 ''' 23 def __init__(self, name='timestamp', 24 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 25 preload_values=True, date_dtype=True, **kwargs): 26 27 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 28 29 if date_dtype: 30 self.dtype = 'datetime64[s]' 31 else: 32 self.dtype = 'float64' 33 34 self._preload_key_vals.append(self.dtype) 35 36 @staticmethod 37 @dask.delayed 38 def _load_key(hdf_key, fname): #Override loading to get the 'time' field instead of 'value' 39 return h5.File(fname)[hdf_key]['time'][:] 40 41 def _load_from_file(self, file_data): #Intercept data shape and take only lenght 42 newshape = (file_data.data_shape[0], ) 43 return super()._load_from_file(file_data._replace(data_shape=newshape)) 44 45 def _load(self, *args, **kwargs): #Intercept data and cast to chosen dtype 46 return super()._load(*args, **kwargs).astype(self.dtype)
Loads the (indicative) timestamps for each pulse train
Provids the timestamp of each FLASH pulse train. Be aware that the values
might not be accurate enough to be used for syncronization with other data
sources that are not based on the HDF5 files provided by FLASH.
Arguments:
- date_dtype (bool): If True, the timestamps are converted to a datetime64[s] dtype. default: True
- Other arguments: see
fab.datasources.HDFSource
23 def __init__(self, name='timestamp', 24 hdf_key = "/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup", *args, 25 preload_values=True, date_dtype=True, **kwargs): 26 27 super().__init__(name, hdf_key, *args, preload_values=preload_values, **kwargs) 28 29 if date_dtype: 30 self.dtype = 'datetime64[s]' 31 else: 32 self.dtype = 'float64' 33 34 self._preload_key_vals.append(self.dtype)
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
74class GMD(DataSource): 75 """ Ad-hoc loader for the GMD data for gmd monitors (pulse resolved) 76 77 Loads pulse resolved GMD data and calibrates it using the average GMD data. 78 If a calibration key is provided, the data is units of uJ. 79 80 Args: 81 name (str): Name of the data source 82 data_key (str): HDF key for the pulse-resolved GMD data, 83 eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall" 84 calibration_key (str): HDF key pointing average GMD data for calibration 85 eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" 86 Set this to "" to disable calibration. Skipping calibration will 87 increase loading performance. If you do not need aboslute values 88 for the GMD, you can safely skip calibration. 89 block_size (int): Number of shots to average over for calibration 90 """ 91 92 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 93 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 94 super().__init__(*args, **kwargs) 95 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 96 self.block_size = block_size 97 98 match calibration_key: 99 case "": 100 self.slow = None 101 102 case None: 103 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 104 self.slow = None 105 106 case _: 107 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None)) 108 109 110 def load(self, *, daq_run=None): 111 fast = self.fast.load(daq_run=daq_run) 112 113 #Calibrate fast data using slow data average 114 if self.slow: 115 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 116 117 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 118 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 119 ratio = (slow_avg/fast_avg) 120 fast = fast * ratio 121 122 return fast.rename(self.name)
Ad-hoc loader for the GMD data for gmd monitors (pulse resolved)
Loads pulse resolved GMD data and calibrates it using the average GMD data. If a calibration key is provided, the data is units of uJ.
Arguments:
- name (str): Name of the data source
- data_key (str): HDF key for the pulse-resolved GMD data, eg: "/FL2/Photon Diagnostic/GMD/Pulse resolved energy/energy hall"
- calibration_key (str): HDF key pointing average GMD data for calibration eg: "/FL2/Photon Diagnostic/GMD/Average energy/energy hall" Set this to "" to disable calibration. Skipping calibration will increase loading performance. If you do not need aboslute values for the GMD, you can safely skip calibration.
- block_size (int): Number of shots to average over for calibration
92 def __init__(self, *args, data_key: str, calibration_key: str = None, block_size = 2000, 93 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 94 super().__init__(*args, **kwargs) 95 self.fast = _FastGMD(f"_{self.name}_fast", data_key, beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 96 self.block_size = block_size 97 98 match calibration_key: 99 case "": 100 self.slow = None 101 102 case None: 103 logger.warn('GMD data will not be calibrated. Please provide an average_key argument or set it to "" to disable this warning.') 104 self.slow = None 105 106 case _: 107 self.slow = HDFSource(f"_{self.name}_slow", calibration_key, preload_values=kwargs.get('preload_values', None))
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
110 def load(self, *, daq_run=None): 111 fast = self.fast.load(daq_run=daq_run) 112 113 #Calibrate fast data using slow data average 114 if self.slow: 115 slow = self.slow.load(daq_run=daq_run).reindex_like(fast).ffill(dim='train_id') 116 117 slow_avg = slow.rolling(train_id = self.block_size, min_periods=2).mean() 118 fast_avg = fast.rolling(train_id = self.block_size, min_periods=2).mean().mean(dim='shot_id') 119 ratio = (slow_avg/fast_avg) 120 fast = fast * ratio 121 122 return fast.rename(self.name)
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'
Inherited Members
124class BAM(DataSource): 125 ''' Ad-hoc loader for the BAM data from bunch arrival monitors 126 127 Cleans up the raw HDF data for BAM data and only loads valid data 128 in a two-dimensional array with shape (train_id, shot_id). 129 130 Args: 131 name (str): Name of the data source 132 timing_key (str): HDF key where to load the number of shots per pulse train 133 e.g. `/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup` 134 data_key (str): HDF key for actual BAM data, e.g. 135 `/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup` 136 fillna_method: see `fab.datasources.DataSource` 137 ''' 138 def __init__(self, *args, timing_key, data_key, 139 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 140 super().__init__(*args, **kwargs) 141 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 142 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 143 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 144 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 145 146 def load(self, *, daq_run=None): 147 timing = self.timing.load(daq_run=daq_run) 148 data = self.data.load(daq_run=daq_run) 149 150 shot_num = int(timing[:,3].max()) 151 return data[:, :shot_num]
Ad-hoc loader for the BAM data from bunch arrival monitors
Cleans up the raw HDF data for BAM data and only loads valid data in a two-dimensional array with shape (train_id, shot_id).
Arguments:
- name (str): Name of the data source
- timing_key (str): HDF key where to load the number of shots per pulse train
e.g.
/zraw/FLASH.DIAG/TIMINGINFO/TIME1.BUNCH_FIRST_INDEX.2/dGroup
- data_key (str): HDF key for actual BAM data, e.g.
/zraw/FLASH.SDIAG/BAM.DAQ/FL2.SEED5.ARRIVAL_TIME.ABSOLUTE.SA2.COMP/dGroup
- fillna_method: see
fab.datasources.DataSource
138 def __init__(self, *args, timing_key, data_key, 139 beamtime = None, idx_path = None, hdf_path = None, **kwargs): 140 super().__init__(*args, **kwargs) 141 self.timing = HDFSource(f"_{self.name}_timing", timing_key, preload_values=kwargs.get('preload_values', True), 142 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path) 143 self.data = HDFSource(self.name, data_key, dim_names=['shot_id'], 144 beamtime=beamtime, hdf_path=hdf_path, idx_path=idx_path)
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
146 def load(self, *, daq_run=None): 147 timing = self.timing.load(daq_run=daq_run) 148 data = self.data.load(daq_run=daq_run) 149 150 shot_num = int(timing[:,3].max()) 151 return data[:, :shot_num]
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'