fab.datasources.HDFSource
1import glob 2from collections import namedtuple 3from contextlib import contextmanager 4import os 5import time 6import functools 7from typing import List, Iterable, Tuple 8import warnings 9import hashlib 10 11import h5py as h5 12import numpy as np 13import xarray as xr 14import pandas as pd 15from tables import NaturalNameWarning 16from natsort import natsort_keygen 17 18import dask 19import dask.array as da 20import dask.bag as db 21 22from . import DataSource 23from ..settings import cfg, cfg_context 24from ..beamtime import beamtime_basepath 25 26import logging 27logger = logging.getLogger(__name__) 28 29 30@contextmanager 31def _lock_path(path): 32 """ Context manager that locks file access. 33 34 Only one process can hold the lock at any time. Function call 35 will block until lock is aquired 36 """ 37 lock_path = f"{path}.lock" 38 while True: 39 try: 40 lock_file = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) 41 break 42 except (IOError, OSError) as e: 43 logger.debug(f"Waiting to aquire lock on {path}, {e}") 44 time.sleep(1) 45 46 try: 47 yield None 48 finally: 49 os.close(lock_file) 50 os.remove(lock_path) 51 logger.debug(f"Released lock on {path}") 52 53def _maybe_from_config(beamtime = None, hdf_path = None, idx_path = None, preload_path = None): 54 """ Logic to determine search HDFSource paths from config and arguments 55 56 Directly passed arguments take precedence over config values. 57 58 If a beamtime number is given, the paths for index files default to 59 the beamtime scratch_cc folder. If a hdf_path is given, the index 60 files default to the current directory. 61 """ 62 63 # If neiter are given, try getting them from config 64 if not (beamtime or hdf_path): 65 idx_path = idx_path or cfg.idx_path 66 preload_path = preload_path or cfg.preload_path 67 68 if cfg.beamtime: 69 beamtime = cfg.beamtime 70 elif cfg.hdf_path: 71 hdf_path = cfg.hdf_path 72 else: 73 raise ValueError("No beamtime or hdf_path specified") 74 75 #Warn if both are given, impossible request 76 assert not (beamtime and hdf_path), "Only one of beamtime and hdf_path may be used" 77 78 #Build paths 79 if beamtime: 80 basepath = beamtime_basepath(beamtime) 81 hdf_path = basepath + "/raw/hdf/*/*/*.h5" 82 idx_path = idx_path or basepath + "/scratch_cc/file_index.h5" 83 preload_path = preload_path or basepath + "/scratch_cc/preloaded/" 84 elif hdf_path: 85 idx_path = idx_path or "./file_index.h5" 86 preload_path = preload_path or "./preloaded/" 87 88 return (hdf_path, idx_path, preload_path) 89 90class HDFSource(DataSource): 91 ''' A DataSource that loads data from the HDF files from Flash DAQ 92 93 Provides file-indepentent access to the data contained in a specific 94 HDF group from any of the HDF files generated by the Flash DAQ. 95 It provides functionalty to load the data as a xr.DataArray that is 96 indexed by the Flash train_id (macropulse id). 97 The data is loaded as a lazy dask.array to exploit parallel loading 98 from multiple files. 99 100 Parameters can be defined via the toml config, and recalled by using 101 the from_name method. 102 103 Args: 104 name: a human readable name for the data source. Used as name for 105 loaded xr.DataArrays 106 hdf_key: a string pointing to a HDF group of 'index' and 'value' 107 datasets 108 dim_names: list of string to be used as dimension names, except 109 for the first dimension (as that is always train_id) 110 idx_path (optional): path to a file used to cache the metadata 111 used for loading. Leave empty to use let fab dermine the 112 path based on the config. 113 hdf_path (optional): glob pattern used to generate the list of 114 HDF file where the data is loaded from. Leave empty to let 115 fab dermine the path based on the config. 116 fillna_method (optional): a string specifing what filling method should 117 be used to fill missing values if the dataset needs to be reindexed. 118 This will happen when the datasource is combined with other sources 119 in an Instrument object that will require all source to have the same 120 train_id index. It should be either 'ffill' for forward filling, or one 121 of the valid method for xr.interpolate_na. Using methods other than 122 'ffill' on large arrays might lead to extremely high memory usage. 123 preload_values (optional): if True, it disables the lazy loading behaviour. 124 values are returned already computed. Data is cached in `preload_path` 125 If None, values will be preloaded if the dataset has only one value 126 per pusle train. Default is None. 127 preload_path (optional): path to the file used to store preloaded data. 128 Leave empty to let fab dermine the path based on the config. 129 ''' 130 def __init__(self, name: str, hdf_key: str, *args, 131 dim_names = None, 132 133 beamtime = None, 134 idx_path = None, 135 hdf_path = None, 136 preload_path = None, 137 138 preload_values = None, 139 140 **kwargs): 141 142 super().__init__(name, *args, **kwargs) 143 144 logger.debug(f"Creating HDFSource {name}") 145 146 assert hdf_key is not None, "hdf_key must be set" 147 148 self.hdf_key = hdf_key 149 self.dim_names = dim_names 150 151 self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 152 hdf_path, 153 idx_path, 154 preload_path) 155 156 self._preload_values = preload_values 157 self._file_idx = None #Cache for file index 158 159 # Fields to use to generate cache key 160 self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key] 161 162 def __repr__(self): 163 return f"{self.__class__.__name__}(name={self.name}, hdf_key={self.hdf_key})" 164 165 @property 166 def _cache_key(self): 167 ''' Generate a indentifier for caching the source data. It should be unique for 168 each combination of hdf_key and loading config. 169 ''' 170 values = ''.join( str(val) for val in self._preload_key_vals) 171 key = hashlib.sha1(values.encode()).hexdigest()[:10] 172 return key 173 174 _FileData = namedtuple('FileData', ['file_path', 'train_ids', 'daq_run', 'data_shape', 'dtype']) 175 176 @staticmethod 177 def _get_index_from_file(hdf_path, hdf_key): # extracts data from single hdf file 178 run_num = int(hdf_path.split('run')[1].split('_')[0]) #Get run nummber from file name 179 try: 180 with h5.File(hdf_path) as hdf_file: 181 train_ids = hdf_file[hdf_key]['index'][:] 182 shape = hdf_file[hdf_key]['value'].shape 183 dtype = hdf_file[hdf_key]['value'].dtype 184 except KeyError: 185 return HDFSource._FileData(hdf_path, [], run_num, None, None) 186 return HDFSource._FileData(hdf_path, train_ids, run_num, shape, dtype) 187 188 def _make_file_index(self, file_list): 189 ''' Iterates through HDF files and builds the index dataframe''' 190 if len(file_list) < 1: 191 raise ValueError(f"No hdf files matching path: {self._hdf_path}") 192 logger.debug(f"{self.name}: Making index for {len(file_list)} files") 193 194 file_bag = db.from_sequence(file_list) 195 index_list = file_bag.map(type(self)._get_index_from_file, hdf_key=self.hdf_key).compute() 196 197 return pd.DataFrame(index_list, columns=type(self)._FileData._fields)\ 198 .set_index('file_path')\ 199 .sort_index(key=natsort_keygen()) 200 201 def _write_file_index(self, file_index): 202 ''' Write file index dataframe to disk while suppressing useless warnings''' 203 with warnings.catch_warnings(): 204 warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) 205 warnings.simplefilter("ignore", category=NaturalNameWarning) 206 with _lock_path(self._idx_path): 207 file_index.to_hdf(self._idx_path, mode='a', key=self.hdf_key) 208 209 def get_file_index(self) -> pd.DataFrame: 210 ''' Retrieves the metadata for file loading. 211 212 In order to facilitate data loading, a table with information about 213 the data contained in each HDF file is needed. This table contains a 214 list of all HDF files with the corresponding train_ids and data shape. 215 The data is cached in the hd5 store set by the 'path_to_index' of the 216 class constructor, and is only read from HDF if not found in the cache. 217 218 Args: 219 reload_index (optional): controls wheter or not to check for new HDF 220 files when loading the cache. If true, the cache is updated with 221 the data from the new files founds. 222 Defaults to config.autoreload_hdf_index 223 Returns: 224 pd.DataFrame: dataframe with the file metadata 225 ''' 226 try: # try using cached index if found 227 file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 228 except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it 229 logger.info(f"File index cache not found for group {self.hdf_key}, creating it...") 230 file_idx = self._make_file_index(glob.glob(self._hdf_path)) 231 if file_idx.data_shape.isnull().all(): 232 raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e 233 self._write_file_index(file_idx) 234 else: 235 if cfg.autoreload_hdf: # Check for new data that might not be cached 236 file_list = glob.glob(self._hdf_path) 237 new_files = [fname for fname in file_list if fname not in file_idx.index] 238 if new_files: # If there is new data, append it 239 logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...") 240 file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen()) 241 self._write_file_index(file_idx) 242 243 self._file_idx = file_idx 244 return file_idx 245 246 def _should_preload(self): 247 if self._preload_values is not None: 248 return self._preload_values 249 250 if not cfg.auto_preload: 251 return False 252 253 #preload if only one value per pulse train 254 idx = self.get_file_index() 255 return idx.dropna().data_shape.apply(len).max() < 2 256 257 def _generate_dim_names(self, dim_count: int): 258 ''' Generates dimension names for output xarray dataarray (except first dimension)''' 259 return [self.name + f'_dim_{n}' for n in range(1, dim_count)] #Generate names 260 261 def _generate_dim_coords(self, shape): 262 return [range(dim_len) for dim_len in shape[1:]] 263 264 _ArrayData = namedtuple('ArrayData', ['train_ids', 'daq_run', 'data']) 265 266 @staticmethod 267 @dask.delayed 268 def _load_key(hdf_key: str, fname: str) -> np.ndarray: 269 ''' Loads the data from disk. Called by dask in parallel ''' 270 return h5.File(fname)[hdf_key]['value'][:] 271 272 def _load_from_file(self, file_data: Tuple, *args, **kwargs) -> xr.DataArray: 273 ''' Returns a delayed xarray with the content of file fname and the given index as 274 coordinates ''' 275 data = da.from_delayed(self._load_key(self.hdf_key, file_data.Index, *args, **kwargs), 276 shape=file_data.data_shape, dtype=file_data.dtype) 277 278 return HDFSource._ArrayData(file_data.train_ids, np.ones_like(file_data.train_ids)*file_data.daq_run, data) 279 280 def _concat(self, arrays): 281 ''' Concatenates the delayed arrays from _load_from_file into a single 282 xarray dataArray ''' 283 284 def pad_to_shape(array, new_shape): 285 new_shape = (array.shape[0] , *new_shape) 286 if array.shape == new_shape: 287 return array 288 289 pad_width = [ (0, max_size - dim_size) 290 for dim_size, max_size in zip(array.shape, new_shape)] 291 return da.pad(array, pad_width, constant_values=np.nan) 292 293 try: 294 train_ids = np.concatenate([array.train_ids for array in arrays]) 295 daq_run = np.concatenate([array.daq_run for array in arrays]) 296 except ValueError as e: 297 raise ValueError(f"Error loading {self.name}, Nothing to load") from e 298 299 #Expand to match larger shape if needed 300 shapes = [array.data.shape for array in arrays] 301 max_shape = tuple(map(max, zip(*shapes)))[1:] 302 data = da.concatenate([pad_to_shape(array.data, max_shape) for array in arrays]) 303 304 dim_names = ['train_id'] + (self.dim_names or self._generate_dim_names(len(data.shape))) 305 dim_coords = [train_ids] + self._generate_dim_coords(data.shape) 306 307 return xr.DataArray(data, name=self.name, dims=dim_names, coords=dim_coords) \ 308 .assign_coords(daq_run=('train_id', daq_run)) 309 310 def _load(self, file_idx): 311 file_list = file_idx.dropna() #Drop files with None shape 312 if file_idx.size == 0: 313 raise ValueError(f"No data found for {self.name}") 314 315 logger.debug(f"{self.name}: found {len(file_idx)} matching files to be loaded") 316 data_arrays = map(self._load_from_file, file_list.itertuples()) 317 return self._concat(list(data_arrays)).drop_duplicates(dim=...) 318 319 def _get_preloaded(self): 320 logger.debug(f"Preloading {self.name}") 321 file_idx = self.get_file_index() 322 os.makedirs(self._preload_path, exist_ok=True) #Make sure preload path exists 323 324 try: 325 data = xr.open_dataarray(self._preload_path + self._cache_key, 326 engine='netcdf4') 327 except (FileNotFoundError, OSError, ValueError): # cache does not exist, make index and save it 328 logger.debug(f"Preloaded data not found for {self.name}, creating it...") 329 data = self._load(file_idx).compute() 330 data.attrs['file_list'] = list(file_idx.index) 331 data.to_netcdf(self._preload_path + self._cache_key, 332 mode='w', engine='netcdf4') 333 else: 334 #Check if we have all data 335 missing = np.setdiff1d(file_idx.index, data.file_list, assume_unique=True) 336 if missing.size > 0: 337 logger.debug(f"Updating preloaded data for {self.name}...") 338 new_data = self._load(file_idx.loc[missing]).compute() 339 data = xr.concat([data, new_data], dim='train_id').drop_duplicates(dim=...) 340 data.attrs['file_list'] = list(file_idx.index) 341 data.to_netcdf(self._preload_path + self._cache_key, 342 mode='w', engine='netcdf4') 343 344 del data.attrs['file_list'] 345 return data 346 347 def load(self, *, daq_run=None) -> xr.DataArray: 348 ''' Loads data from disk into a xr.DataArray 349 350 Used to construct a xr.DataArray containing the data. If preload_values is not set, 351 the returned value will be a lazy dask array, and the the data will only be read 352 from disk when a computation is triggered. 353 The first dimensions of the returned array is always the train id of a Flash pulse 354 train. The other dimensions will be named based on the dim_names class attribute 355 or automatically generated from the source name. 356 357 Args: 358 daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers 359 that are to be loaded. Use None to load all runs. Defaults to None 360 Returns: 361 xr.DataArray: the data, indexed by train_id 362 ''' 363 if type(daq_run) == int: 364 daq_run = [daq_run] 365 366 if self._should_preload(): 367 data = self._get_preloaded() 368 if daq_run is not None: 369 data = data.where(data.daq_run.isin(daq_run), drop=True) 370 if data.size == 0: 371 raise ValueError(f"No data found for {self.name} in daq_run {daq_run}") 372 373 return data 374 375 #If not preload, we go the long way... 376 file_idx = self.get_file_index() 377 if daq_run is not None: 378 file_idx = file_idx.query('daq_run in @daq_run') 379 380 return self._load(file_idx)
91class HDFSource(DataSource): 92 ''' A DataSource that loads data from the HDF files from Flash DAQ 93 94 Provides file-indepentent access to the data contained in a specific 95 HDF group from any of the HDF files generated by the Flash DAQ. 96 It provides functionalty to load the data as a xr.DataArray that is 97 indexed by the Flash train_id (macropulse id). 98 The data is loaded as a lazy dask.array to exploit parallel loading 99 from multiple files. 100 101 Parameters can be defined via the toml config, and recalled by using 102 the from_name method. 103 104 Args: 105 name: a human readable name for the data source. Used as name for 106 loaded xr.DataArrays 107 hdf_key: a string pointing to a HDF group of 'index' and 'value' 108 datasets 109 dim_names: list of string to be used as dimension names, except 110 for the first dimension (as that is always train_id) 111 idx_path (optional): path to a file used to cache the metadata 112 used for loading. Leave empty to use let fab dermine the 113 path based on the config. 114 hdf_path (optional): glob pattern used to generate the list of 115 HDF file where the data is loaded from. Leave empty to let 116 fab dermine the path based on the config. 117 fillna_method (optional): a string specifing what filling method should 118 be used to fill missing values if the dataset needs to be reindexed. 119 This will happen when the datasource is combined with other sources 120 in an Instrument object that will require all source to have the same 121 train_id index. It should be either 'ffill' for forward filling, or one 122 of the valid method for xr.interpolate_na. Using methods other than 123 'ffill' on large arrays might lead to extremely high memory usage. 124 preload_values (optional): if True, it disables the lazy loading behaviour. 125 values are returned already computed. Data is cached in `preload_path` 126 If None, values will be preloaded if the dataset has only one value 127 per pusle train. Default is None. 128 preload_path (optional): path to the file used to store preloaded data. 129 Leave empty to let fab dermine the path based on the config. 130 ''' 131 def __init__(self, name: str, hdf_key: str, *args, 132 dim_names = None, 133 134 beamtime = None, 135 idx_path = None, 136 hdf_path = None, 137 preload_path = None, 138 139 preload_values = None, 140 141 **kwargs): 142 143 super().__init__(name, *args, **kwargs) 144 145 logger.debug(f"Creating HDFSource {name}") 146 147 assert hdf_key is not None, "hdf_key must be set" 148 149 self.hdf_key = hdf_key 150 self.dim_names = dim_names 151 152 self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 153 hdf_path, 154 idx_path, 155 preload_path) 156 157 self._preload_values = preload_values 158 self._file_idx = None #Cache for file index 159 160 # Fields to use to generate cache key 161 self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key] 162 163 def __repr__(self): 164 return f"{self.__class__.__name__}(name={self.name}, hdf_key={self.hdf_key})" 165 166 @property 167 def _cache_key(self): 168 ''' Generate a indentifier for caching the source data. It should be unique for 169 each combination of hdf_key and loading config. 170 ''' 171 values = ''.join( str(val) for val in self._preload_key_vals) 172 key = hashlib.sha1(values.encode()).hexdigest()[:10] 173 return key 174 175 _FileData = namedtuple('FileData', ['file_path', 'train_ids', 'daq_run', 'data_shape', 'dtype']) 176 177 @staticmethod 178 def _get_index_from_file(hdf_path, hdf_key): # extracts data from single hdf file 179 run_num = int(hdf_path.split('run')[1].split('_')[0]) #Get run nummber from file name 180 try: 181 with h5.File(hdf_path) as hdf_file: 182 train_ids = hdf_file[hdf_key]['index'][:] 183 shape = hdf_file[hdf_key]['value'].shape 184 dtype = hdf_file[hdf_key]['value'].dtype 185 except KeyError: 186 return HDFSource._FileData(hdf_path, [], run_num, None, None) 187 return HDFSource._FileData(hdf_path, train_ids, run_num, shape, dtype) 188 189 def _make_file_index(self, file_list): 190 ''' Iterates through HDF files and builds the index dataframe''' 191 if len(file_list) < 1: 192 raise ValueError(f"No hdf files matching path: {self._hdf_path}") 193 logger.debug(f"{self.name}: Making index for {len(file_list)} files") 194 195 file_bag = db.from_sequence(file_list) 196 index_list = file_bag.map(type(self)._get_index_from_file, hdf_key=self.hdf_key).compute() 197 198 return pd.DataFrame(index_list, columns=type(self)._FileData._fields)\ 199 .set_index('file_path')\ 200 .sort_index(key=natsort_keygen()) 201 202 def _write_file_index(self, file_index): 203 ''' Write file index dataframe to disk while suppressing useless warnings''' 204 with warnings.catch_warnings(): 205 warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning) 206 warnings.simplefilter("ignore", category=NaturalNameWarning) 207 with _lock_path(self._idx_path): 208 file_index.to_hdf(self._idx_path, mode='a', key=self.hdf_key) 209 210 def get_file_index(self) -> pd.DataFrame: 211 ''' Retrieves the metadata for file loading. 212 213 In order to facilitate data loading, a table with information about 214 the data contained in each HDF file is needed. This table contains a 215 list of all HDF files with the corresponding train_ids and data shape. 216 The data is cached in the hd5 store set by the 'path_to_index' of the 217 class constructor, and is only read from HDF if not found in the cache. 218 219 Args: 220 reload_index (optional): controls wheter or not to check for new HDF 221 files when loading the cache. If true, the cache is updated with 222 the data from the new files founds. 223 Defaults to config.autoreload_hdf_index 224 Returns: 225 pd.DataFrame: dataframe with the file metadata 226 ''' 227 try: # try using cached index if found 228 file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 229 except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it 230 logger.info(f"File index cache not found for group {self.hdf_key}, creating it...") 231 file_idx = self._make_file_index(glob.glob(self._hdf_path)) 232 if file_idx.data_shape.isnull().all(): 233 raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e 234 self._write_file_index(file_idx) 235 else: 236 if cfg.autoreload_hdf: # Check for new data that might not be cached 237 file_list = glob.glob(self._hdf_path) 238 new_files = [fname for fname in file_list if fname not in file_idx.index] 239 if new_files: # If there is new data, append it 240 logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...") 241 file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen()) 242 self._write_file_index(file_idx) 243 244 self._file_idx = file_idx 245 return file_idx 246 247 def _should_preload(self): 248 if self._preload_values is not None: 249 return self._preload_values 250 251 if not cfg.auto_preload: 252 return False 253 254 #preload if only one value per pulse train 255 idx = self.get_file_index() 256 return idx.dropna().data_shape.apply(len).max() < 2 257 258 def _generate_dim_names(self, dim_count: int): 259 ''' Generates dimension names for output xarray dataarray (except first dimension)''' 260 return [self.name + f'_dim_{n}' for n in range(1, dim_count)] #Generate names 261 262 def _generate_dim_coords(self, shape): 263 return [range(dim_len) for dim_len in shape[1:]] 264 265 _ArrayData = namedtuple('ArrayData', ['train_ids', 'daq_run', 'data']) 266 267 @staticmethod 268 @dask.delayed 269 def _load_key(hdf_key: str, fname: str) -> np.ndarray: 270 ''' Loads the data from disk. Called by dask in parallel ''' 271 return h5.File(fname)[hdf_key]['value'][:] 272 273 def _load_from_file(self, file_data: Tuple, *args, **kwargs) -> xr.DataArray: 274 ''' Returns a delayed xarray with the content of file fname and the given index as 275 coordinates ''' 276 data = da.from_delayed(self._load_key(self.hdf_key, file_data.Index, *args, **kwargs), 277 shape=file_data.data_shape, dtype=file_data.dtype) 278 279 return HDFSource._ArrayData(file_data.train_ids, np.ones_like(file_data.train_ids)*file_data.daq_run, data) 280 281 def _concat(self, arrays): 282 ''' Concatenates the delayed arrays from _load_from_file into a single 283 xarray dataArray ''' 284 285 def pad_to_shape(array, new_shape): 286 new_shape = (array.shape[0] , *new_shape) 287 if array.shape == new_shape: 288 return array 289 290 pad_width = [ (0, max_size - dim_size) 291 for dim_size, max_size in zip(array.shape, new_shape)] 292 return da.pad(array, pad_width, constant_values=np.nan) 293 294 try: 295 train_ids = np.concatenate([array.train_ids for array in arrays]) 296 daq_run = np.concatenate([array.daq_run for array in arrays]) 297 except ValueError as e: 298 raise ValueError(f"Error loading {self.name}, Nothing to load") from e 299 300 #Expand to match larger shape if needed 301 shapes = [array.data.shape for array in arrays] 302 max_shape = tuple(map(max, zip(*shapes)))[1:] 303 data = da.concatenate([pad_to_shape(array.data, max_shape) for array in arrays]) 304 305 dim_names = ['train_id'] + (self.dim_names or self._generate_dim_names(len(data.shape))) 306 dim_coords = [train_ids] + self._generate_dim_coords(data.shape) 307 308 return xr.DataArray(data, name=self.name, dims=dim_names, coords=dim_coords) \ 309 .assign_coords(daq_run=('train_id', daq_run)) 310 311 def _load(self, file_idx): 312 file_list = file_idx.dropna() #Drop files with None shape 313 if file_idx.size == 0: 314 raise ValueError(f"No data found for {self.name}") 315 316 logger.debug(f"{self.name}: found {len(file_idx)} matching files to be loaded") 317 data_arrays = map(self._load_from_file, file_list.itertuples()) 318 return self._concat(list(data_arrays)).drop_duplicates(dim=...) 319 320 def _get_preloaded(self): 321 logger.debug(f"Preloading {self.name}") 322 file_idx = self.get_file_index() 323 os.makedirs(self._preload_path, exist_ok=True) #Make sure preload path exists 324 325 try: 326 data = xr.open_dataarray(self._preload_path + self._cache_key, 327 engine='netcdf4') 328 except (FileNotFoundError, OSError, ValueError): # cache does not exist, make index and save it 329 logger.debug(f"Preloaded data not found for {self.name}, creating it...") 330 data = self._load(file_idx).compute() 331 data.attrs['file_list'] = list(file_idx.index) 332 data.to_netcdf(self._preload_path + self._cache_key, 333 mode='w', engine='netcdf4') 334 else: 335 #Check if we have all data 336 missing = np.setdiff1d(file_idx.index, data.file_list, assume_unique=True) 337 if missing.size > 0: 338 logger.debug(f"Updating preloaded data for {self.name}...") 339 new_data = self._load(file_idx.loc[missing]).compute() 340 data = xr.concat([data, new_data], dim='train_id').drop_duplicates(dim=...) 341 data.attrs['file_list'] = list(file_idx.index) 342 data.to_netcdf(self._preload_path + self._cache_key, 343 mode='w', engine='netcdf4') 344 345 del data.attrs['file_list'] 346 return data 347 348 def load(self, *, daq_run=None) -> xr.DataArray: 349 ''' Loads data from disk into a xr.DataArray 350 351 Used to construct a xr.DataArray containing the data. If preload_values is not set, 352 the returned value will be a lazy dask array, and the the data will only be read 353 from disk when a computation is triggered. 354 The first dimensions of the returned array is always the train id of a Flash pulse 355 train. The other dimensions will be named based on the dim_names class attribute 356 or automatically generated from the source name. 357 358 Args: 359 daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers 360 that are to be loaded. Use None to load all runs. Defaults to None 361 Returns: 362 xr.DataArray: the data, indexed by train_id 363 ''' 364 if type(daq_run) == int: 365 daq_run = [daq_run] 366 367 if self._should_preload(): 368 data = self._get_preloaded() 369 if daq_run is not None: 370 data = data.where(data.daq_run.isin(daq_run), drop=True) 371 if data.size == 0: 372 raise ValueError(f"No data found for {self.name} in daq_run {daq_run}") 373 374 return data 375 376 #If not preload, we go the long way... 377 file_idx = self.get_file_index() 378 if daq_run is not None: 379 file_idx = file_idx.query('daq_run in @daq_run') 380 381 return self._load(file_idx)
A DataSource that loads data from the HDF files from Flash DAQ
Provides file-indepentent access to the data contained in a specific HDF group from any of the HDF files generated by the Flash DAQ. It provides functionalty to load the data as a xr.DataArray that is indexed by the Flash train_id (macropulse id). The data is loaded as a lazy dask.array to exploit parallel loading from multiple files.
Parameters can be defined via the toml config, and recalled by using the from_name method.
Arguments:
- name: a human readable name for the data source. Used as name for loaded xr.DataArrays
- hdf_key: a string pointing to a HDF group of 'index' and 'value' datasets
- dim_names: list of string to be used as dimension names, except for the first dimension (as that is always train_id)
- idx_path (optional): path to a file used to cache the metadata used for loading. Leave empty to use let fab dermine the path based on the config.
- hdf_path (optional): glob pattern used to generate the list of HDF file where the data is loaded from. Leave empty to let fab dermine the path based on the config.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
- preload_values (optional): if True, it disables the lazy loading behaviour.
values are returned already computed. Data is cached in
preload_path
If None, values will be preloaded if the dataset has only one value per pusle train. Default is None. - preload_path (optional): path to the file used to store preloaded data. Leave empty to let fab dermine the path based on the config.
131 def __init__(self, name: str, hdf_key: str, *args, 132 dim_names = None, 133 134 beamtime = None, 135 idx_path = None, 136 hdf_path = None, 137 preload_path = None, 138 139 preload_values = None, 140 141 **kwargs): 142 143 super().__init__(name, *args, **kwargs) 144 145 logger.debug(f"Creating HDFSource {name}") 146 147 assert hdf_key is not None, "hdf_key must be set" 148 149 self.hdf_key = hdf_key 150 self.dim_names = dim_names 151 152 self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 153 hdf_path, 154 idx_path, 155 preload_path) 156 157 self._preload_values = preload_values 158 self._file_idx = None #Cache for file index 159 160 # Fields to use to generate cache key 161 self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key]
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
210 def get_file_index(self) -> pd.DataFrame: 211 ''' Retrieves the metadata for file loading. 212 213 In order to facilitate data loading, a table with information about 214 the data contained in each HDF file is needed. This table contains a 215 list of all HDF files with the corresponding train_ids and data shape. 216 The data is cached in the hd5 store set by the 'path_to_index' of the 217 class constructor, and is only read from HDF if not found in the cache. 218 219 Args: 220 reload_index (optional): controls wheter or not to check for new HDF 221 files when loading the cache. If true, the cache is updated with 222 the data from the new files founds. 223 Defaults to config.autoreload_hdf_index 224 Returns: 225 pd.DataFrame: dataframe with the file metadata 226 ''' 227 try: # try using cached index if found 228 file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 229 except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it 230 logger.info(f"File index cache not found for group {self.hdf_key}, creating it...") 231 file_idx = self._make_file_index(glob.glob(self._hdf_path)) 232 if file_idx.data_shape.isnull().all(): 233 raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e 234 self._write_file_index(file_idx) 235 else: 236 if cfg.autoreload_hdf: # Check for new data that might not be cached 237 file_list = glob.glob(self._hdf_path) 238 new_files = [fname for fname in file_list if fname not in file_idx.index] 239 if new_files: # If there is new data, append it 240 logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...") 241 file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen()) 242 self._write_file_index(file_idx) 243 244 self._file_idx = file_idx 245 return file_idx
Retrieves the metadata for file loading.
In order to facilitate data loading, a table with information about the data contained in each HDF file is needed. This table contains a list of all HDF files with the corresponding train_ids and data shape. The data is cached in the hd5 store set by the 'path_to_index' of the class constructor, and is only read from HDF if not found in the cache.
Arguments:
- reload_index (optional): controls wheter or not to check for new HDF files when loading the cache. If true, the cache is updated with the data from the new files founds. Defaults to config.autoreload_hdf_index
Returns:
pd.DataFrame: dataframe with the file metadata
348 def load(self, *, daq_run=None) -> xr.DataArray: 349 ''' Loads data from disk into a xr.DataArray 350 351 Used to construct a xr.DataArray containing the data. If preload_values is not set, 352 the returned value will be a lazy dask array, and the the data will only be read 353 from disk when a computation is triggered. 354 The first dimensions of the returned array is always the train id of a Flash pulse 355 train. The other dimensions will be named based on the dim_names class attribute 356 or automatically generated from the source name. 357 358 Args: 359 daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers 360 that are to be loaded. Use None to load all runs. Defaults to None 361 Returns: 362 xr.DataArray: the data, indexed by train_id 363 ''' 364 if type(daq_run) == int: 365 daq_run = [daq_run] 366 367 if self._should_preload(): 368 data = self._get_preloaded() 369 if daq_run is not None: 370 data = data.where(data.daq_run.isin(daq_run), drop=True) 371 if data.size == 0: 372 raise ValueError(f"No data found for {self.name} in daq_run {daq_run}") 373 374 return data 375 376 #If not preload, we go the long way... 377 file_idx = self.get_file_index() 378 if daq_run is not None: 379 file_idx = file_idx.query('daq_run in @daq_run') 380 381 return self._load(file_idx)
Loads data from disk into a xr.DataArray
Used to construct a xr.DataArray containing the data. If preload_values is not set, the returned value will be a lazy dask array, and the the data will only be read from disk when a computation is triggered. The first dimensions of the returned array is always the train id of a Flash pulse train. The other dimensions will be named based on the dim_names class attribute or automatically generated from the source name.
Arguments:
- daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers that are to be loaded. Use None to load all runs. Defaults to None
Returns:
xr.DataArray: the data, indexed by train_id