fab.datasources.HDFSource

  1import glob
  2from collections import namedtuple
  3from contextlib import contextmanager
  4import os
  5import time
  6import functools
  7from typing import List, Iterable, Tuple
  8import warnings
  9import hashlib
 10
 11import h5py as h5
 12import numpy as np
 13import xarray as xr
 14import pandas as pd
 15from tables import NaturalNameWarning
 16from natsort import natsort_keygen
 17
 18import dask
 19import dask.array as da
 20import dask.bag as db
 21
 22from . import DataSource
 23from ..settings import cfg, cfg_context
 24from ..beamtime import beamtime_basepath
 25
 26import logging
 27logger = logging.getLogger(__name__)
 28
 29
 30@contextmanager
 31def _lock_path(path):
 32    """ Context manager that locks file access.
 33
 34    Only one process can hold the lock at any time. Function call
 35    will block until lock is aquired
 36    """
 37    lock_path = f"{path}.lock"
 38    while True:
 39        try: 
 40            lock_file = os.open(lock_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
 41            break
 42        except (IOError, OSError) as e:
 43            logger.debug(f"Waiting to aquire lock on {path}, {e}")
 44            time.sleep(1)
 45
 46    try:
 47        yield None
 48    finally:
 49        os.close(lock_file)
 50        os.remove(lock_path)
 51        logger.debug(f"Released lock on {path}")
 52
 53def _maybe_from_config(beamtime = None, hdf_path = None, idx_path = None, preload_path = None):
 54    """ Logic to determine search HDFSource paths from config and arguments 
 55    
 56        Directly passed arguments take precedence over config values.
 57        
 58        If a beamtime number is given, the paths for index files default to
 59        the beamtime scratch_cc folder. If a hdf_path is given, the index
 60        files default to the current directory.
 61    """
 62
 63    # If neiter are given, try getting them from config
 64    if not (beamtime or hdf_path):
 65        idx_path = idx_path or cfg.idx_path
 66        preload_path = preload_path or cfg.preload_path
 67
 68        if cfg.beamtime:
 69            beamtime = cfg.beamtime
 70        elif cfg.hdf_path:
 71            hdf_path = cfg.hdf_path
 72        else:
 73            raise ValueError("No beamtime or hdf_path specified")
 74
 75    #Warn if both are given, impossible request
 76    assert not (beamtime and hdf_path), "Only one of beamtime and hdf_path may be used"
 77
 78    #Build paths 
 79    if beamtime:
 80        basepath = beamtime_basepath(beamtime)
 81        hdf_path = basepath + "/raw/hdf/*/*/*.h5"
 82        idx_path = idx_path or basepath + "/scratch_cc/file_index.h5"
 83        preload_path = preload_path or basepath + "/scratch_cc/preloaded/"
 84    elif hdf_path:
 85        idx_path = idx_path or "./file_index.h5"
 86        preload_path = preload_path or "./preloaded/"
 87
 88    return (hdf_path, idx_path, preload_path)
 89
 90class HDFSource(DataSource):
 91    ''' A DataSource that loads data from the HDF files from Flash DAQ 
 92    
 93    Provides file-indepentent access to the data contained in a specific 
 94    HDF group from any of the HDF files generated by the Flash DAQ. 
 95    It provides functionalty to load the data as a xr.DataArray that is 
 96    indexed by the Flash train_id (macropulse id). 
 97    The data is loaded as a lazy dask.array to exploit parallel loading 
 98    from multiple files.
 99
100    Parameters can be defined via the toml config, and recalled by using
101    the from_name method.
102
103    Args:
104        name: a human readable name for the data source. Used as name for 
105            loaded xr.DataArrays
106        hdf_key: a string pointing to a HDF group of 'index' and 'value' 
107            datasets
108        dim_names: list of string to be used as dimension names, except 
109            for the first dimension (as that is always train_id)
110        idx_path (optional): path to a file used to cache the metadata
111            used for loading. Leave empty to use let fab dermine the 
112            path based on the config.
113        hdf_path (optional): glob pattern used to generate the list of 
114            HDF file where the data is loaded from. Leave empty to let 
115            fab dermine the path based on the config.
116        fillna_method (optional): a string specifing what filling method should
117            be used to fill missing values if the dataset needs to be reindexed.
118            This will happen when the datasource is combined with other sources
119            in an Instrument object that will require all source to have the same
120            train_id index. It should be either 'ffill' for forward filling, or one
121            of the valid method for xr.interpolate_na. Using methods other than 
122            'ffill' on large arrays might lead to extremely high memory usage.
123        preload_values (optional): if True, it disables the lazy loading behaviour.
124            values are returned already computed. Data is cached in `preload_path`
125            If None, values will be preloaded if the dataset has only one value
126            per pusle train. Default is None.
127        preload_path (optional): path to the file used to store preloaded data. 
128            Leave empty to let fab dermine the path based on the config.
129    '''
130    def __init__(self, name: str, hdf_key: str, *args,
131                       dim_names = None,
132
133                       beamtime = None,
134                       idx_path   = None,
135                       hdf_path   = None,
136                       preload_path = None,
137
138                       preload_values = None,
139  
140                       **kwargs):
141
142        super().__init__(name, *args, **kwargs)
143
144        logger.debug(f"Creating HDFSource {name}")
145
146        assert hdf_key is not None, "hdf_key must be set"
147
148        self.hdf_key = hdf_key 
149        self.dim_names = dim_names 
150
151        self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 
152                                                                                hdf_path, 
153                                                                                idx_path, 
154                                                                                preload_path)
155
156        self._preload_values = preload_values
157        self._file_idx = None           #Cache for file index
158
159        # Fields to use to generate cache key
160        self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key] 
161        
162    def __repr__(self):
163        return f"{self.__class__.__name__}(name={self.name}, hdf_key={self.hdf_key})"
164
165    @property
166    def _cache_key(self):
167        ''' Generate a indentifier for caching the source data. It should be unique for
168            each combination of hdf_key and loading config.
169         '''
170        values = ''.join( str(val) for val in self._preload_key_vals)
171        key = hashlib.sha1(values.encode()).hexdigest()[:10]
172        return key
173    
174    _FileData = namedtuple('FileData', ['file_path', 'train_ids', 'daq_run', 'data_shape', 'dtype'])
175
176    @staticmethod
177    def _get_index_from_file(hdf_path, hdf_key): # extracts data from single hdf file
178        run_num   = int(hdf_path.split('run')[1].split('_')[0]) #Get run nummber from file name
179        try:
180            with h5.File(hdf_path) as hdf_file:
181                train_ids = hdf_file[hdf_key]['index'][:]
182                shape     = hdf_file[hdf_key]['value'].shape
183                dtype     = hdf_file[hdf_key]['value'].dtype
184        except KeyError:
185            return HDFSource._FileData(hdf_path, [], run_num, None, None)
186        return HDFSource._FileData(hdf_path, train_ids, run_num, shape, dtype)
187
188    def _make_file_index(self, file_list):      
189        ''' Iterates through HDF files and builds the index dataframe'''
190        if len(file_list) < 1:
191            raise ValueError(f"No hdf files matching path: {self._hdf_path}")
192        logger.debug(f"{self.name}: Making index for {len(file_list)} files")
193
194        file_bag  = db.from_sequence(file_list)
195        index_list = file_bag.map(type(self)._get_index_from_file, hdf_key=self.hdf_key).compute()
196
197        return pd.DataFrame(index_list, columns=type(self)._FileData._fields)\
198                           .set_index('file_path')\
199                           .sort_index(key=natsort_keygen())
200    
201    def _write_file_index(self, file_index):
202        ''' Write file index dataframe to disk while suppressing useless warnings'''
203        with warnings.catch_warnings():
204            warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)
205            warnings.simplefilter("ignore", category=NaturalNameWarning)
206            with _lock_path(self._idx_path):
207                file_index.to_hdf(self._idx_path, mode='a', key=self.hdf_key)
208
209    def get_file_index(self) -> pd.DataFrame:
210        ''' Retrieves the metadata for file loading.
211
212        In order to facilitate data loading, a table with information about 
213        the data contained in each HDF file is needed. This table contains a 
214        list of all HDF files with the corresponding train_ids and data shape.
215        The data is cached in the hd5 store set by the 'path_to_index' of the
216        class constructor, and is only read from HDF if not found in the cache.
217
218        Args:
219            reload_index (optional): controls wheter or not to check for new HDF
220                files when loading the cache. If true, the cache is updated with
221                the data from the new files founds. 
222                Defaults to config.autoreload_hdf_index
223        Returns:
224            pd.DataFrame: dataframe with the file metadata
225        '''
226        try: # try using cached index if found
227            file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 
228        except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it
229            logger.info(f"File index cache not found for group {self.hdf_key}, creating it...")
230            file_idx = self._make_file_index(glob.glob(self._hdf_path))
231            if file_idx.data_shape.isnull().all():
232                raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e
233            self._write_file_index(file_idx)
234        else:
235            if cfg.autoreload_hdf: # Check for new data that might not be cached
236                file_list = glob.glob(self._hdf_path)
237                new_files = [fname for fname in file_list if fname not in file_idx.index]
238                if new_files: # If there is new data, append it
239                    logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...")
240                    file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen())
241                    self._write_file_index(file_idx)
242
243        self._file_idx = file_idx
244        return file_idx
245
246    def _should_preload(self):
247        if self._preload_values is not None:
248            return self._preload_values
249        
250        if not cfg.auto_preload:
251            return False
252        
253        #preload if only one value per pulse train
254        idx = self.get_file_index()
255        return idx.dropna().data_shape.apply(len).max() < 2
256
257    def _generate_dim_names(self, dim_count: int):
258        ''' Generates dimension names for output xarray dataarray (except first dimension)'''
259        return [self.name + f'_dim_{n}' for n in range(1, dim_count)] #Generate names
260
261    def _generate_dim_coords(self, shape):
262        return [range(dim_len) for dim_len in shape[1:]]
263
264    _ArrayData = namedtuple('ArrayData', ['train_ids', 'daq_run', 'data'])
265
266    @staticmethod
267    @dask.delayed
268    def _load_key(hdf_key: str, fname: str) -> np.ndarray:
269        ''' Loads the data from disk. Called by dask in parallel '''
270        return h5.File(fname)[hdf_key]['value'][:]
271
272    def _load_from_file(self, file_data: Tuple, *args, **kwargs) -> xr.DataArray:
273        ''' Returns a delayed xarray with the content of file fname and the given index as 
274            coordinates '''
275        data = da.from_delayed(self._load_key(self.hdf_key, file_data.Index, *args, **kwargs), 
276                               shape=file_data.data_shape, dtype=file_data.dtype)
277
278        return HDFSource._ArrayData(file_data.train_ids, np.ones_like(file_data.train_ids)*file_data.daq_run, data)
279
280    def _concat(self, arrays):
281        ''' Concatenates the delayed arrays from _load_from_file into a single
282            xarray dataArray '''
283            
284        def pad_to_shape(array, new_shape):
285            new_shape = (array.shape[0] , *new_shape)
286            if array.shape == new_shape:
287                return array
288
289            pad_width = [ (0, max_size - dim_size) 
290                          for dim_size, max_size in zip(array.shape, new_shape)]
291            return da.pad(array, pad_width, constant_values=np.nan)
292
293        try:
294            train_ids = np.concatenate([array.train_ids for array in arrays])
295            daq_run  = np.concatenate([array.daq_run for array in arrays])
296        except ValueError as e:
297            raise ValueError(f"Error loading {self.name}, Nothing to load") from e
298
299        #Expand to match larger shape if needed
300        shapes = [array.data.shape for array in arrays]
301        max_shape = tuple(map(max, zip(*shapes)))[1:]
302        data =     da.concatenate([pad_to_shape(array.data, max_shape) for array in arrays])
303
304        dim_names = ['train_id'] + (self.dim_names or self._generate_dim_names(len(data.shape)))
305        dim_coords = [train_ids] + self._generate_dim_coords(data.shape)
306
307        return xr.DataArray(data, name=self.name, dims=dim_names, coords=dim_coords) \
308               .assign_coords(daq_run=('train_id', daq_run))
309
310    def _load(self, file_idx):
311        file_list = file_idx.dropna() #Drop files with None shape
312        if file_idx.size == 0:
313            raise ValueError(f"No data found for {self.name}")
314
315        logger.debug(f"{self.name}: found {len(file_idx)} matching files to be loaded")
316        data_arrays = map(self._load_from_file, file_list.itertuples())
317        return self._concat(list(data_arrays)).drop_duplicates(dim=...)
318
319    def _get_preloaded(self):
320        logger.debug(f"Preloading {self.name}")
321        file_idx = self.get_file_index()
322        os.makedirs(self._preload_path, exist_ok=True) #Make sure preload path exists
323
324        try:
325            data = xr.open_dataarray(self._preload_path + self._cache_key, 
326                                     engine='netcdf4')
327        except (FileNotFoundError, OSError, ValueError): # cache does not exist, make index and save it
328            logger.debug(f"Preloaded data not found for {self.name}, creating it...")
329            data = self._load(file_idx).compute()
330            data.attrs['file_list'] = list(file_idx.index)
331            data.to_netcdf(self._preload_path + self._cache_key, 
332                           mode='w', engine='netcdf4')
333        else:
334            #Check if we have all data
335            missing = np.setdiff1d(file_idx.index, data.file_list, assume_unique=True)
336            if missing.size > 0:
337                logger.debug(f"Updating preloaded data for {self.name}...")
338                new_data = self._load(file_idx.loc[missing]).compute()
339                data = xr.concat([data, new_data], dim='train_id').drop_duplicates(dim=...)
340                data.attrs['file_list'] = list(file_idx.index)
341                data.to_netcdf(self._preload_path + self._cache_key, 
342                               mode='w', engine='netcdf4')
343                
344        del data.attrs['file_list']
345        return data
346
347    def load(self, *, daq_run=None) -> xr.DataArray:    
348        ''' Loads data from disk into a xr.DataArray 
349        
350            Used to construct a xr.DataArray containing the data. If preload_values is not set,
351            the returned value will be a lazy dask array, and the the data will only be read
352            from disk when a computation is triggered. 
353            The first dimensions of the returned array is always the train id of a Flash pulse 
354            train. The other dimensions will be named based on the dim_names class attribute 
355            or automatically generated from the source name. 
356
357            Args:
358                daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers
359                    that are to be loaded. Use None to load all runs. Defaults to None
360            Returns:
361                xr.DataArray: the data, indexed by train_id
362        '''
363        if type(daq_run) == int:
364            daq_run = [daq_run]
365
366        if self._should_preload():
367            data = self._get_preloaded()
368            if daq_run is not None:
369                data = data.where(data.daq_run.isin(daq_run), drop=True)
370                if data.size == 0:
371                    raise ValueError(f"No data found for {self.name} in daq_run {daq_run}")
372            
373            return data
374
375        #If not preload, we go the long way...
376        file_idx = self.get_file_index()
377        if daq_run is not None:
378            file_idx = file_idx.query('daq_run in @daq_run')
379
380        return self._load(file_idx)
logger = <Logger fab.datasources.HDFSource (INFO)>
class HDFSource(fab.datasources.basesources.DataSource):
 91class HDFSource(DataSource):
 92    ''' A DataSource that loads data from the HDF files from Flash DAQ 
 93    
 94    Provides file-indepentent access to the data contained in a specific 
 95    HDF group from any of the HDF files generated by the Flash DAQ. 
 96    It provides functionalty to load the data as a xr.DataArray that is 
 97    indexed by the Flash train_id (macropulse id). 
 98    The data is loaded as a lazy dask.array to exploit parallel loading 
 99    from multiple files.
100
101    Parameters can be defined via the toml config, and recalled by using
102    the from_name method.
103
104    Args:
105        name: a human readable name for the data source. Used as name for 
106            loaded xr.DataArrays
107        hdf_key: a string pointing to a HDF group of 'index' and 'value' 
108            datasets
109        dim_names: list of string to be used as dimension names, except 
110            for the first dimension (as that is always train_id)
111        idx_path (optional): path to a file used to cache the metadata
112            used for loading. Leave empty to use let fab dermine the 
113            path based on the config.
114        hdf_path (optional): glob pattern used to generate the list of 
115            HDF file where the data is loaded from. Leave empty to let 
116            fab dermine the path based on the config.
117        fillna_method (optional): a string specifing what filling method should
118            be used to fill missing values if the dataset needs to be reindexed.
119            This will happen when the datasource is combined with other sources
120            in an Instrument object that will require all source to have the same
121            train_id index. It should be either 'ffill' for forward filling, or one
122            of the valid method for xr.interpolate_na. Using methods other than 
123            'ffill' on large arrays might lead to extremely high memory usage.
124        preload_values (optional): if True, it disables the lazy loading behaviour.
125            values are returned already computed. Data is cached in `preload_path`
126            If None, values will be preloaded if the dataset has only one value
127            per pusle train. Default is None.
128        preload_path (optional): path to the file used to store preloaded data. 
129            Leave empty to let fab dermine the path based on the config.
130    '''
131    def __init__(self, name: str, hdf_key: str, *args,
132                       dim_names = None,
133
134                       beamtime = None,
135                       idx_path   = None,
136                       hdf_path   = None,
137                       preload_path = None,
138
139                       preload_values = None,
140  
141                       **kwargs):
142
143        super().__init__(name, *args, **kwargs)
144
145        logger.debug(f"Creating HDFSource {name}")
146
147        assert hdf_key is not None, "hdf_key must be set"
148
149        self.hdf_key = hdf_key 
150        self.dim_names = dim_names 
151
152        self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 
153                                                                                hdf_path, 
154                                                                                idx_path, 
155                                                                                preload_path)
156
157        self._preload_values = preload_values
158        self._file_idx = None           #Cache for file index
159
160        # Fields to use to generate cache key
161        self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key] 
162        
163    def __repr__(self):
164        return f"{self.__class__.__name__}(name={self.name}, hdf_key={self.hdf_key})"
165
166    @property
167    def _cache_key(self):
168        ''' Generate a indentifier for caching the source data. It should be unique for
169            each combination of hdf_key and loading config.
170         '''
171        values = ''.join( str(val) for val in self._preload_key_vals)
172        key = hashlib.sha1(values.encode()).hexdigest()[:10]
173        return key
174    
175    _FileData = namedtuple('FileData', ['file_path', 'train_ids', 'daq_run', 'data_shape', 'dtype'])
176
177    @staticmethod
178    def _get_index_from_file(hdf_path, hdf_key): # extracts data from single hdf file
179        run_num   = int(hdf_path.split('run')[1].split('_')[0]) #Get run nummber from file name
180        try:
181            with h5.File(hdf_path) as hdf_file:
182                train_ids = hdf_file[hdf_key]['index'][:]
183                shape     = hdf_file[hdf_key]['value'].shape
184                dtype     = hdf_file[hdf_key]['value'].dtype
185        except KeyError:
186            return HDFSource._FileData(hdf_path, [], run_num, None, None)
187        return HDFSource._FileData(hdf_path, train_ids, run_num, shape, dtype)
188
189    def _make_file_index(self, file_list):      
190        ''' Iterates through HDF files and builds the index dataframe'''
191        if len(file_list) < 1:
192            raise ValueError(f"No hdf files matching path: {self._hdf_path}")
193        logger.debug(f"{self.name}: Making index for {len(file_list)} files")
194
195        file_bag  = db.from_sequence(file_list)
196        index_list = file_bag.map(type(self)._get_index_from_file, hdf_key=self.hdf_key).compute()
197
198        return pd.DataFrame(index_list, columns=type(self)._FileData._fields)\
199                           .set_index('file_path')\
200                           .sort_index(key=natsort_keygen())
201    
202    def _write_file_index(self, file_index):
203        ''' Write file index dataframe to disk while suppressing useless warnings'''
204        with warnings.catch_warnings():
205            warnings.simplefilter("ignore", category=pd.errors.PerformanceWarning)
206            warnings.simplefilter("ignore", category=NaturalNameWarning)
207            with _lock_path(self._idx_path):
208                file_index.to_hdf(self._idx_path, mode='a', key=self.hdf_key)
209
210    def get_file_index(self) -> pd.DataFrame:
211        ''' Retrieves the metadata for file loading.
212
213        In order to facilitate data loading, a table with information about 
214        the data contained in each HDF file is needed. This table contains a 
215        list of all HDF files with the corresponding train_ids and data shape.
216        The data is cached in the hd5 store set by the 'path_to_index' of the
217        class constructor, and is only read from HDF if not found in the cache.
218
219        Args:
220            reload_index (optional): controls wheter or not to check for new HDF
221                files when loading the cache. If true, the cache is updated with
222                the data from the new files founds. 
223                Defaults to config.autoreload_hdf_index
224        Returns:
225            pd.DataFrame: dataframe with the file metadata
226        '''
227        try: # try using cached index if found
228            file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 
229        except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it
230            logger.info(f"File index cache not found for group {self.hdf_key}, creating it...")
231            file_idx = self._make_file_index(glob.glob(self._hdf_path))
232            if file_idx.data_shape.isnull().all():
233                raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e
234            self._write_file_index(file_idx)
235        else:
236            if cfg.autoreload_hdf: # Check for new data that might not be cached
237                file_list = glob.glob(self._hdf_path)
238                new_files = [fname for fname in file_list if fname not in file_idx.index]
239                if new_files: # If there is new data, append it
240                    logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...")
241                    file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen())
242                    self._write_file_index(file_idx)
243
244        self._file_idx = file_idx
245        return file_idx
246
247    def _should_preload(self):
248        if self._preload_values is not None:
249            return self._preload_values
250        
251        if not cfg.auto_preload:
252            return False
253        
254        #preload if only one value per pulse train
255        idx = self.get_file_index()
256        return idx.dropna().data_shape.apply(len).max() < 2
257
258    def _generate_dim_names(self, dim_count: int):
259        ''' Generates dimension names for output xarray dataarray (except first dimension)'''
260        return [self.name + f'_dim_{n}' for n in range(1, dim_count)] #Generate names
261
262    def _generate_dim_coords(self, shape):
263        return [range(dim_len) for dim_len in shape[1:]]
264
265    _ArrayData = namedtuple('ArrayData', ['train_ids', 'daq_run', 'data'])
266
267    @staticmethod
268    @dask.delayed
269    def _load_key(hdf_key: str, fname: str) -> np.ndarray:
270        ''' Loads the data from disk. Called by dask in parallel '''
271        return h5.File(fname)[hdf_key]['value'][:]
272
273    def _load_from_file(self, file_data: Tuple, *args, **kwargs) -> xr.DataArray:
274        ''' Returns a delayed xarray with the content of file fname and the given index as 
275            coordinates '''
276        data = da.from_delayed(self._load_key(self.hdf_key, file_data.Index, *args, **kwargs), 
277                               shape=file_data.data_shape, dtype=file_data.dtype)
278
279        return HDFSource._ArrayData(file_data.train_ids, np.ones_like(file_data.train_ids)*file_data.daq_run, data)
280
281    def _concat(self, arrays):
282        ''' Concatenates the delayed arrays from _load_from_file into a single
283            xarray dataArray '''
284            
285        def pad_to_shape(array, new_shape):
286            new_shape = (array.shape[0] , *new_shape)
287            if array.shape == new_shape:
288                return array
289
290            pad_width = [ (0, max_size - dim_size) 
291                          for dim_size, max_size in zip(array.shape, new_shape)]
292            return da.pad(array, pad_width, constant_values=np.nan)
293
294        try:
295            train_ids = np.concatenate([array.train_ids for array in arrays])
296            daq_run  = np.concatenate([array.daq_run for array in arrays])
297        except ValueError as e:
298            raise ValueError(f"Error loading {self.name}, Nothing to load") from e
299
300        #Expand to match larger shape if needed
301        shapes = [array.data.shape for array in arrays]
302        max_shape = tuple(map(max, zip(*shapes)))[1:]
303        data =     da.concatenate([pad_to_shape(array.data, max_shape) for array in arrays])
304
305        dim_names = ['train_id'] + (self.dim_names or self._generate_dim_names(len(data.shape)))
306        dim_coords = [train_ids] + self._generate_dim_coords(data.shape)
307
308        return xr.DataArray(data, name=self.name, dims=dim_names, coords=dim_coords) \
309               .assign_coords(daq_run=('train_id', daq_run))
310
311    def _load(self, file_idx):
312        file_list = file_idx.dropna() #Drop files with None shape
313        if file_idx.size == 0:
314            raise ValueError(f"No data found for {self.name}")
315
316        logger.debug(f"{self.name}: found {len(file_idx)} matching files to be loaded")
317        data_arrays = map(self._load_from_file, file_list.itertuples())
318        return self._concat(list(data_arrays)).drop_duplicates(dim=...)
319
320    def _get_preloaded(self):
321        logger.debug(f"Preloading {self.name}")
322        file_idx = self.get_file_index()
323        os.makedirs(self._preload_path, exist_ok=True) #Make sure preload path exists
324
325        try:
326            data = xr.open_dataarray(self._preload_path + self._cache_key, 
327                                     engine='netcdf4')
328        except (FileNotFoundError, OSError, ValueError): # cache does not exist, make index and save it
329            logger.debug(f"Preloaded data not found for {self.name}, creating it...")
330            data = self._load(file_idx).compute()
331            data.attrs['file_list'] = list(file_idx.index)
332            data.to_netcdf(self._preload_path + self._cache_key, 
333                           mode='w', engine='netcdf4')
334        else:
335            #Check if we have all data
336            missing = np.setdiff1d(file_idx.index, data.file_list, assume_unique=True)
337            if missing.size > 0:
338                logger.debug(f"Updating preloaded data for {self.name}...")
339                new_data = self._load(file_idx.loc[missing]).compute()
340                data = xr.concat([data, new_data], dim='train_id').drop_duplicates(dim=...)
341                data.attrs['file_list'] = list(file_idx.index)
342                data.to_netcdf(self._preload_path + self._cache_key, 
343                               mode='w', engine='netcdf4')
344                
345        del data.attrs['file_list']
346        return data
347
348    def load(self, *, daq_run=None) -> xr.DataArray:    
349        ''' Loads data from disk into a xr.DataArray 
350        
351            Used to construct a xr.DataArray containing the data. If preload_values is not set,
352            the returned value will be a lazy dask array, and the the data will only be read
353            from disk when a computation is triggered. 
354            The first dimensions of the returned array is always the train id of a Flash pulse 
355            train. The other dimensions will be named based on the dim_names class attribute 
356            or automatically generated from the source name. 
357
358            Args:
359                daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers
360                    that are to be loaded. Use None to load all runs. Defaults to None
361            Returns:
362                xr.DataArray: the data, indexed by train_id
363        '''
364        if type(daq_run) == int:
365            daq_run = [daq_run]
366
367        if self._should_preload():
368            data = self._get_preloaded()
369            if daq_run is not None:
370                data = data.where(data.daq_run.isin(daq_run), drop=True)
371                if data.size == 0:
372                    raise ValueError(f"No data found for {self.name} in daq_run {daq_run}")
373            
374            return data
375
376        #If not preload, we go the long way...
377        file_idx = self.get_file_index()
378        if daq_run is not None:
379            file_idx = file_idx.query('daq_run in @daq_run')
380
381        return self._load(file_idx)

A DataSource that loads data from the HDF files from Flash DAQ

Provides file-indepentent access to the data contained in a specific HDF group from any of the HDF files generated by the Flash DAQ. It provides functionalty to load the data as a xr.DataArray that is indexed by the Flash train_id (macropulse id). The data is loaded as a lazy dask.array to exploit parallel loading from multiple files.

Parameters can be defined via the toml config, and recalled by using the from_name method.

Arguments:
  • name: a human readable name for the data source. Used as name for loaded xr.DataArrays
  • hdf_key: a string pointing to a HDF group of 'index' and 'value' datasets
  • dim_names: list of string to be used as dimension names, except for the first dimension (as that is always train_id)
  • idx_path (optional): path to a file used to cache the metadata used for loading. Leave empty to use let fab dermine the path based on the config.
  • hdf_path (optional): glob pattern used to generate the list of HDF file where the data is loaded from. Leave empty to let fab dermine the path based on the config.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
  • preload_values (optional): if True, it disables the lazy loading behaviour. values are returned already computed. Data is cached in preload_path If None, values will be preloaded if the dataset has only one value per pusle train. Default is None.
  • preload_path (optional): path to the file used to store preloaded data. Leave empty to let fab dermine the path based on the config.
HDFSource( name: str, hdf_key: str, *args, dim_names=None, beamtime=None, idx_path=None, hdf_path=None, preload_path=None, preload_values=None, **kwargs)
131    def __init__(self, name: str, hdf_key: str, *args,
132                       dim_names = None,
133
134                       beamtime = None,
135                       idx_path   = None,
136                       hdf_path   = None,
137                       preload_path = None,
138
139                       preload_values = None,
140  
141                       **kwargs):
142
143        super().__init__(name, *args, **kwargs)
144
145        logger.debug(f"Creating HDFSource {name}")
146
147        assert hdf_key is not None, "hdf_key must be set"
148
149        self.hdf_key = hdf_key 
150        self.dim_names = dim_names 
151
152        self._hdf_path, self._idx_path, self._preload_path = _maybe_from_config(beamtime, 
153                                                                                hdf_path, 
154                                                                                idx_path, 
155                                                                                preload_path)
156
157        self._preload_values = preload_values
158        self._file_idx = None           #Cache for file index
159
160        # Fields to use to generate cache key
161        self._preload_key_vals = [type(self).__name__, self.name, self.hdf_key] 

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
hdf_key
dim_names
def get_file_index(self) -> pandas.core.frame.DataFrame:
210    def get_file_index(self) -> pd.DataFrame:
211        ''' Retrieves the metadata for file loading.
212
213        In order to facilitate data loading, a table with information about 
214        the data contained in each HDF file is needed. This table contains a 
215        list of all HDF files with the corresponding train_ids and data shape.
216        The data is cached in the hd5 store set by the 'path_to_index' of the
217        class constructor, and is only read from HDF if not found in the cache.
218
219        Args:
220            reload_index (optional): controls wheter or not to check for new HDF
221                files when loading the cache. If true, the cache is updated with
222                the data from the new files founds. 
223                Defaults to config.autoreload_hdf_index
224        Returns:
225            pd.DataFrame: dataframe with the file metadata
226        '''
227        try: # try using cached index if found
228            file_idx = self._file_idx if self._file_idx is not None else pd.read_hdf(self._idx_path, key=self.hdf_key) 
229        except (FileNotFoundError, KeyError) as e: # cache does not exist, make index and save it
230            logger.info(f"File index cache not found for group {self.hdf_key}, creating it...")
231            file_idx = self._make_file_index(glob.glob(self._hdf_path))
232            if file_idx.data_shape.isnull().all():
233                raise KeyError(f"Group {self.hdf_key} not found in any HDF file") from e
234            self._write_file_index(file_idx)
235        else:
236            if cfg.autoreload_hdf: # Check for new data that might not be cached
237                file_list = glob.glob(self._hdf_path)
238                new_files = [fname for fname in file_list if fname not in file_idx.index]
239                if new_files: # If there is new data, append it
240                    logger.info(f"Found {len(new_files)} new hdf files for {self.hdf_key}, appending...")
241                    file_idx = pd.concat([file_idx, self._make_file_index(new_files)]).sort_index(key=natsort_keygen())
242                    self._write_file_index(file_idx)
243
244        self._file_idx = file_idx
245        return file_idx

Retrieves the metadata for file loading.

In order to facilitate data loading, a table with information about the data contained in each HDF file is needed. This table contains a list of all HDF files with the corresponding train_ids and data shape. The data is cached in the hd5 store set by the 'path_to_index' of the class constructor, and is only read from HDF if not found in the cache.

Arguments:
  • reload_index (optional): controls wheter or not to check for new HDF files when loading the cache. If true, the cache is updated with the data from the new files founds. Defaults to config.autoreload_hdf_index
Returns:

pd.DataFrame: dataframe with the file metadata

def load(self, *, daq_run=None) -> xarray.core.dataarray.DataArray:
348    def load(self, *, daq_run=None) -> xr.DataArray:    
349        ''' Loads data from disk into a xr.DataArray 
350        
351            Used to construct a xr.DataArray containing the data. If preload_values is not set,
352            the returned value will be a lazy dask array, and the the data will only be read
353            from disk when a computation is triggered. 
354            The first dimensions of the returned array is always the train id of a Flash pulse 
355            train. The other dimensions will be named based on the dim_names class attribute 
356            or automatically generated from the source name. 
357
358            Args:
359                daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers
360                    that are to be loaded. Use None to load all runs. Defaults to None
361            Returns:
362                xr.DataArray: the data, indexed by train_id
363        '''
364        if type(daq_run) == int:
365            daq_run = [daq_run]
366
367        if self._should_preload():
368            data = self._get_preloaded()
369            if daq_run is not None:
370                data = data.where(data.daq_run.isin(daq_run), drop=True)
371                if data.size == 0:
372                    raise ValueError(f"No data found for {self.name} in daq_run {daq_run}")
373            
374            return data
375
376        #If not preload, we go the long way...
377        file_idx = self.get_file_index()
378        if daq_run is not None:
379            file_idx = file_idx.query('daq_run in @daq_run')
380
381        return self._load(file_idx)

Loads data from disk into a xr.DataArray

Used to construct a xr.DataArray containing the data. If preload_values is not set, the returned value will be a lazy dask array, and the the data will only be read from disk when a computation is triggered. The first dimensions of the returned array is always the train id of a Flash pulse train. The other dimensions will be named based on the dim_names class attribute or automatically generated from the source name.

Arguments:
  • daq_run (optional): Integer or iterable of integers to specify the DAQ run numbers that are to be loaded. Use None to load all runs. Defaults to None
Returns:

xr.DataArray: the data, indexed by train_id