fab.datasources.basesources

 1from abc import ABC, abstractmethod
 2import xarray as xr
 3
 4from ..settings import cfg, cfg_context, get_config_object
 5
 6import logging
 7logger = logging.getLogger(__name__)
 8
 9
10#TODO: more type hints
11#TODO: move ADC slicer baseline and inversion to preprocessor
12
13class DataSource(ABC):
14    ''' Represents a source of data. 
15    
16    Abstract class for describing a data source. It should implement
17    the load() method for loading data. The data must be indexable 
18    by train_id
19    '''
20    
21    def __init__(self, name, *args, fillna_method = None, **kwargs):
22        ''' Base constructors. All Sources must have a name.
23        
24        Args:
25            name: a human readable name for the data source.
26        
27            fillna_method (optional): a string specifing what filling method should
28                be used to fill missing values if the dataset needs to be reindexed.
29                This will happen when the datasource is combined with other sources
30                in an Instrument object that will require all source to have the same
31                train_id index. It should be either 'ffill' for forward filling, or one
32                of the valid method for xr.interpolate_na. Using methods other than 
33                'ffill' on large arrays might lead to extremely high memory usage.
34        '''
35        self.name = name
36        self.fillna_method = fillna_method 
37
38    @staticmethod
39    def from_config(name, source_config, **kwargs):
40        """ Returns an datasource configured with the parameters in source_config. 
41        
42        This is not a classmethod, the class of the returned instance should be specified
43        by setting the <source_type> attribute in the configuration dictionary.
44        
45        Extra keyword arguments are passed on verbatim to the source __init__ method
46        
47        Args:
48            name: the name of the source to be created
49            source_config: source configuration dictionary """
50        
51        SourceClass = get_config_object(source_config.pop('__type__'))
52        preprocess = source_config.pop('__preprocess__', [])   # list of preprocessors
53
54        inst = SourceClass(name = name, **source_config, **kwargs)
55        
56        for preproc_config in preprocess:
57            preprocessor = get_config_object(preproc_config.pop('__name__'))
58            logger.debug(f"Source {name}, applying preprocessor: {preproc_config}")
59            inst = preprocessor(inst, **preproc_config)
60
61        return inst
62
63    @staticmethod
64    def from_name(name):
65        """ Searchs for source named <name> in the loaded configuration and initializes is
66        
67        Args:
68            name: the name of the source to be looked up in the config"""
69        return DataSource.from_config(name, cfg.sources[name])
70
71    @abstractmethod
72    def load(self) -> xr.DataArray:
73        ''' Loads thata from the datasource
74
75        Returns: 
76            xr.DataArray: the loaded data, possibly represented by a lazy
77                dask.array. It must contain a dimension named 'train_id'
78        '''
79        pass
80
81
82    def __add__(self, other):
83        from ..instruments import Instrument
84
85        if not isinstance(other, DataSource):
86            return NotImplemented
87        return Instrument([self, other])
logger = <Logger fab.datasources.basesources (INFO)>
class DataSource(abc.ABC):
14class DataSource(ABC):
15    ''' Represents a source of data. 
16    
17    Abstract class for describing a data source. It should implement
18    the load() method for loading data. The data must be indexable 
19    by train_id
20    '''
21    
22    def __init__(self, name, *args, fillna_method = None, **kwargs):
23        ''' Base constructors. All Sources must have a name.
24        
25        Args:
26            name: a human readable name for the data source.
27        
28            fillna_method (optional): a string specifing what filling method should
29                be used to fill missing values if the dataset needs to be reindexed.
30                This will happen when the datasource is combined with other sources
31                in an Instrument object that will require all source to have the same
32                train_id index. It should be either 'ffill' for forward filling, or one
33                of the valid method for xr.interpolate_na. Using methods other than 
34                'ffill' on large arrays might lead to extremely high memory usage.
35        '''
36        self.name = name
37        self.fillna_method = fillna_method 
38
39    @staticmethod
40    def from_config(name, source_config, **kwargs):
41        """ Returns an datasource configured with the parameters in source_config. 
42        
43        This is not a classmethod, the class of the returned instance should be specified
44        by setting the <source_type> attribute in the configuration dictionary.
45        
46        Extra keyword arguments are passed on verbatim to the source __init__ method
47        
48        Args:
49            name: the name of the source to be created
50            source_config: source configuration dictionary """
51        
52        SourceClass = get_config_object(source_config.pop('__type__'))
53        preprocess = source_config.pop('__preprocess__', [])   # list of preprocessors
54
55        inst = SourceClass(name = name, **source_config, **kwargs)
56        
57        for preproc_config in preprocess:
58            preprocessor = get_config_object(preproc_config.pop('__name__'))
59            logger.debug(f"Source {name}, applying preprocessor: {preproc_config}")
60            inst = preprocessor(inst, **preproc_config)
61
62        return inst
63
64    @staticmethod
65    def from_name(name):
66        """ Searchs for source named <name> in the loaded configuration and initializes is
67        
68        Args:
69            name: the name of the source to be looked up in the config"""
70        return DataSource.from_config(name, cfg.sources[name])
71
72    @abstractmethod
73    def load(self) -> xr.DataArray:
74        ''' Loads thata from the datasource
75
76        Returns: 
77            xr.DataArray: the loaded data, possibly represented by a lazy
78                dask.array. It must contain a dimension named 'train_id'
79        '''
80        pass
81
82
83    def __add__(self, other):
84        from ..instruments import Instrument
85
86        if not isinstance(other, DataSource):
87            return NotImplemented
88        return Instrument([self, other])

Represents a source of data.

Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id

DataSource(name, *args, fillna_method=None, **kwargs)
22    def __init__(self, name, *args, fillna_method = None, **kwargs):
23        ''' Base constructors. All Sources must have a name.
24        
25        Args:
26            name: a human readable name for the data source.
27        
28            fillna_method (optional): a string specifing what filling method should
29                be used to fill missing values if the dataset needs to be reindexed.
30                This will happen when the datasource is combined with other sources
31                in an Instrument object that will require all source to have the same
32                train_id index. It should be either 'ffill' for forward filling, or one
33                of the valid method for xr.interpolate_na. Using methods other than 
34                'ffill' on large arrays might lead to extremely high memory usage.
35        '''
36        self.name = name
37        self.fillna_method = fillna_method 

Base constructors. All Sources must have a name.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
name
fillna_method
@staticmethod
def from_config(name, source_config, **kwargs):
39    @staticmethod
40    def from_config(name, source_config, **kwargs):
41        """ Returns an datasource configured with the parameters in source_config. 
42        
43        This is not a classmethod, the class of the returned instance should be specified
44        by setting the <source_type> attribute in the configuration dictionary.
45        
46        Extra keyword arguments are passed on verbatim to the source __init__ method
47        
48        Args:
49            name: the name of the source to be created
50            source_config: source configuration dictionary """
51        
52        SourceClass = get_config_object(source_config.pop('__type__'))
53        preprocess = source_config.pop('__preprocess__', [])   # list of preprocessors
54
55        inst = SourceClass(name = name, **source_config, **kwargs)
56        
57        for preproc_config in preprocess:
58            preprocessor = get_config_object(preproc_config.pop('__name__'))
59            logger.debug(f"Source {name}, applying preprocessor: {preproc_config}")
60            inst = preprocessor(inst, **preproc_config)
61
62        return inst

Returns an datasource configured with the parameters in source_config.

This is not a classmethod, the class of the returned instance should be specified by setting the attribute in the configuration dictionary.

Extra keyword arguments are passed on verbatim to the source __init__ method

Arguments:
  • name: the name of the source to be created
  • source_config: source configuration dictionary
@staticmethod
def from_name(name):
64    @staticmethod
65    def from_name(name):
66        """ Searchs for source named <name> in the loaded configuration and initializes is
67        
68        Args:
69            name: the name of the source to be looked up in the config"""
70        return DataSource.from_config(name, cfg.sources[name])

Searchs for source named in the loaded configuration and initializes is

Arguments:
  • name: the name of the source to be looked up in the config
@abstractmethod
def load(self) -> xarray.core.dataarray.DataArray:
72    @abstractmethod
73    def load(self) -> xr.DataArray:
74        ''' Loads thata from the datasource
75
76        Returns: 
77            xr.DataArray: the loaded data, possibly represented by a lazy
78                dask.array. It must contain a dimension named 'train_id'
79        '''
80        pass

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'