fab.datasources.basesources

  1from abc import ABC, abstractmethod
  2import xarray as xr
  3
  4from ..settings import cfg, get_config_object, update_context
  5
  6import logging
  7logger = logging.getLogger(__name__)
  8
  9
 10#TODO: more type hints
 11#TODO: move ADC slicer baseline and inversion to preprocessor
 12
 13class DataSource(ABC):
 14    ''' Represents a source of data. 
 15    
 16    Abstract class for describing a data source. It should implement
 17    the load() method for loading data. The data must be indexable 
 18    by train_id
 19    '''
 20    
 21    def __init__(self, name, *args, fillna_method = None, **kwargs):
 22        ''' Base constructors. All Sources must have a name.
 23        
 24        Args:
 25            name: a human readable name for the data source.
 26            fillna_method (optional): a string specifing what filling method should
 27                be used to fill missing values if the dataset needs to be reindexed.
 28                This will happen when the datasource is combined with other sources
 29                in an Instrument object that will require all source to have the same
 30                train_id index. It should be either 'ffill' for forward filling, or one
 31                of the valid method for xr.interpolate_na. Using methods other than 
 32                'ffill' on large arrays might lead to extremely high memory usage.
 33        '''
 34        self.name = name
 35        self.fillna_method = fillna_method 
 36
 37    def __init_subclass__(cls, *args, **kwargs):
 38        ''' Adds all subclasses to the context dictionary for reference in config files.
 39        '''
 40        super().__init_subclass__(*args, **kwargs)
 41        update_context(cls)
 42
 43    @staticmethod
 44    def from_config(name, source_config, **kwargs):
 45        """ Returns an datasource configured with the parameters in source_config. 
 46        
 47        This is not a classmethod, the class of the returned instance should be specified
 48        by setting the <source_type> attribute in the configuration dictionary.
 49        
 50        Extra keyword arguments are passed on verbatim to the source __init__ method
 51        
 52        Args:
 53            name: the name of the source to be created
 54            source_config: source configuration dictionary """
 55        
 56        source_config = source_config.copy()  # Avoid modifying the original config
 57
 58        SourceClass = get_config_object(source_config.pop('__type__'))
 59        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
 60
 61        inst = SourceClass(name = name, **source_config, **kwargs)
 62
 63        # support for old-style preprocessors declaration
 64        if isinstance(preprocess, list):
 65            # if preprocess is a list, convert it to a dict with empty config
 66            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
 67        
 68        #print(f"Source {name} preprocessors: {preprocess}")
 69        for preproc_name, preproc_config in preprocess.items():
 70            preprocessor = get_config_object(preproc_name)
 71            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
 72            inst = preprocessor(inst, **preproc_config or {})
 73
 74        return inst
 75
 76    @staticmethod
 77    def from_name(name):
 78        """ Searchs for source named <name> in the loaded configuration and initializes is
 79        
 80        Args:
 81            name: the name of the source to be looked up in the config"""
 82        return DataSource.from_config(name, cfg.sources[name])
 83
 84    @abstractmethod
 85    def load(self) -> xr.DataArray:
 86        ''' Loads thata from the datasource
 87
 88        Returns: 
 89            xr.DataArray: the loaded data, possibly represented by a lazy
 90                dask.array. It must contain a dimension named 'train_id'
 91        '''
 92        pass
 93
 94
 95    def __add__(self, other):
 96        from ..instruments import Instrument
 97
 98        if not isinstance(other, DataSource):
 99            return NotImplemented
100        return Instrument([self, other])
logger = <Logger fab.datasources.basesources (INFO)>
class DataSource(abc.ABC):
 14class DataSource(ABC):
 15    ''' Represents a source of data. 
 16    
 17    Abstract class for describing a data source. It should implement
 18    the load() method for loading data. The data must be indexable 
 19    by train_id
 20    '''
 21    
 22    def __init__(self, name, *args, fillna_method = None, **kwargs):
 23        ''' Base constructors. All Sources must have a name.
 24        
 25        Args:
 26            name: a human readable name for the data source.
 27            fillna_method (optional): a string specifing what filling method should
 28                be used to fill missing values if the dataset needs to be reindexed.
 29                This will happen when the datasource is combined with other sources
 30                in an Instrument object that will require all source to have the same
 31                train_id index. It should be either 'ffill' for forward filling, or one
 32                of the valid method for xr.interpolate_na. Using methods other than 
 33                'ffill' on large arrays might lead to extremely high memory usage.
 34        '''
 35        self.name = name
 36        self.fillna_method = fillna_method 
 37
 38    def __init_subclass__(cls, *args, **kwargs):
 39        ''' Adds all subclasses to the context dictionary for reference in config files.
 40        '''
 41        super().__init_subclass__(*args, **kwargs)
 42        update_context(cls)
 43
 44    @staticmethod
 45    def from_config(name, source_config, **kwargs):
 46        """ Returns an datasource configured with the parameters in source_config. 
 47        
 48        This is not a classmethod, the class of the returned instance should be specified
 49        by setting the <source_type> attribute in the configuration dictionary.
 50        
 51        Extra keyword arguments are passed on verbatim to the source __init__ method
 52        
 53        Args:
 54            name: the name of the source to be created
 55            source_config: source configuration dictionary """
 56        
 57        source_config = source_config.copy()  # Avoid modifying the original config
 58
 59        SourceClass = get_config_object(source_config.pop('__type__'))
 60        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
 61
 62        inst = SourceClass(name = name, **source_config, **kwargs)
 63
 64        # support for old-style preprocessors declaration
 65        if isinstance(preprocess, list):
 66            # if preprocess is a list, convert it to a dict with empty config
 67            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
 68        
 69        #print(f"Source {name} preprocessors: {preprocess}")
 70        for preproc_name, preproc_config in preprocess.items():
 71            preprocessor = get_config_object(preproc_name)
 72            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
 73            inst = preprocessor(inst, **preproc_config or {})
 74
 75        return inst
 76
 77    @staticmethod
 78    def from_name(name):
 79        """ Searchs for source named <name> in the loaded configuration and initializes is
 80        
 81        Args:
 82            name: the name of the source to be looked up in the config"""
 83        return DataSource.from_config(name, cfg.sources[name])
 84
 85    @abstractmethod
 86    def load(self) -> xr.DataArray:
 87        ''' Loads thata from the datasource
 88
 89        Returns: 
 90            xr.DataArray: the loaded data, possibly represented by a lazy
 91                dask.array. It must contain a dimension named 'train_id'
 92        '''
 93        pass
 94
 95
 96    def __add__(self, other):
 97        from ..instruments import Instrument
 98
 99        if not isinstance(other, DataSource):
100            return NotImplemented
101        return Instrument([self, other])

Represents a source of data.

Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id

DataSource(name, *args, fillna_method=None, **kwargs)
22    def __init__(self, name, *args, fillna_method = None, **kwargs):
23        ''' Base constructors. All Sources must have a name.
24        
25        Args:
26            name: a human readable name for the data source.
27            fillna_method (optional): a string specifing what filling method should
28                be used to fill missing values if the dataset needs to be reindexed.
29                This will happen when the datasource is combined with other sources
30                in an Instrument object that will require all source to have the same
31                train_id index. It should be either 'ffill' for forward filling, or one
32                of the valid method for xr.interpolate_na. Using methods other than 
33                'ffill' on large arrays might lead to extremely high memory usage.
34        '''
35        self.name = name
36        self.fillna_method = fillna_method 

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
name
fillna_method
@staticmethod
def from_config(name, source_config, **kwargs):
44    @staticmethod
45    def from_config(name, source_config, **kwargs):
46        """ Returns an datasource configured with the parameters in source_config. 
47        
48        This is not a classmethod, the class of the returned instance should be specified
49        by setting the <source_type> attribute in the configuration dictionary.
50        
51        Extra keyword arguments are passed on verbatim to the source __init__ method
52        
53        Args:
54            name: the name of the source to be created
55            source_config: source configuration dictionary """
56        
57        source_config = source_config.copy()  # Avoid modifying the original config
58
59        SourceClass = get_config_object(source_config.pop('__type__'))
60        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
61
62        inst = SourceClass(name = name, **source_config, **kwargs)
63
64        # support for old-style preprocessors declaration
65        if isinstance(preprocess, list):
66            # if preprocess is a list, convert it to a dict with empty config
67            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
68        
69        #print(f"Source {name} preprocessors: {preprocess}")
70        for preproc_name, preproc_config in preprocess.items():
71            preprocessor = get_config_object(preproc_name)
72            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
73            inst = preprocessor(inst, **preproc_config or {})
74
75        return inst

Returns an datasource configured with the parameters in source_config.

This is not a classmethod, the class of the returned instance should be specified by setting the attribute in the configuration dictionary.

Extra keyword arguments are passed on verbatim to the source __init__ method

Arguments:
  • name: the name of the source to be created
  • source_config: source configuration dictionary
@staticmethod
def from_name(name):
77    @staticmethod
78    def from_name(name):
79        """ Searchs for source named <name> in the loaded configuration and initializes is
80        
81        Args:
82            name: the name of the source to be looked up in the config"""
83        return DataSource.from_config(name, cfg.sources[name])

Searchs for source named in the loaded configuration and initializes is

Arguments:
  • name: the name of the source to be looked up in the config
@abstractmethod
def load(self) -> xarray.core.dataarray.DataArray:
85    @abstractmethod
86    def load(self) -> xr.DataArray:
87        ''' Loads thata from the datasource
88
89        Returns: 
90            xr.DataArray: the loaded data, possibly represented by a lazy
91                dask.array. It must contain a dimension named 'train_id'
92        '''
93        pass

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'