fab.datasources.basesources
1from abc import ABC, abstractmethod 2import xarray as xr 3 4from ..settings import cfg, cfg_context, get_config_object 5 6import logging 7logger = logging.getLogger(__name__) 8 9 10#TODO: more type hints 11#TODO: move ADC slicer baseline and inversion to preprocessor 12 13class DataSource(ABC): 14 ''' Represents a source of data. 15 16 Abstract class for describing a data source. It should implement 17 the load() method for loading data. The data must be indexable 18 by train_id 19 ''' 20 21 def __init__(self, name, *args, fillna_method = None, **kwargs): 22 ''' Base constructors. All Sources must have a name. 23 24 Args: 25 name: a human readable name for the data source. 26 27 fillna_method (optional): a string specifing what filling method should 28 be used to fill missing values if the dataset needs to be reindexed. 29 This will happen when the datasource is combined with other sources 30 in an Instrument object that will require all source to have the same 31 train_id index. It should be either 'ffill' for forward filling, or one 32 of the valid method for xr.interpolate_na. Using methods other than 33 'ffill' on large arrays might lead to extremely high memory usage. 34 ''' 35 self.name = name 36 self.fillna_method = fillna_method 37 38 @staticmethod 39 def from_config(name, source_config, **kwargs): 40 """ Returns an datasource configured with the parameters in source_config. 41 42 This is not a classmethod, the class of the returned instance should be specified 43 by setting the <source_type> attribute in the configuration dictionary. 44 45 Extra keyword arguments are passed on verbatim to the source __init__ method 46 47 Args: 48 name: the name of the source to be created 49 source_config: source configuration dictionary """ 50 51 SourceClass = get_config_object(source_config.pop('__type__')) 52 preprocess = source_config.pop('__preprocess__', []) # list of preprocessors 53 54 inst = SourceClass(name = name, **source_config, **kwargs) 55 56 for preproc_config in preprocess: 57 preprocessor = get_config_object(preproc_config.pop('__name__')) 58 logger.debug(f"Source {name}, applying preprocessor: {preproc_config}") 59 inst = preprocessor(inst, **preproc_config) 60 61 return inst 62 63 @staticmethod 64 def from_name(name): 65 """ Searchs for source named <name> in the loaded configuration and initializes is 66 67 Args: 68 name: the name of the source to be looked up in the config""" 69 return DataSource.from_config(name, cfg.sources[name]) 70 71 @abstractmethod 72 def load(self) -> xr.DataArray: 73 ''' Loads thata from the datasource 74 75 Returns: 76 xr.DataArray: the loaded data, possibly represented by a lazy 77 dask.array. It must contain a dimension named 'train_id' 78 ''' 79 pass 80 81 82 def __add__(self, other): 83 from ..instruments import Instrument 84 85 if not isinstance(other, DataSource): 86 return NotImplemented 87 return Instrument([self, other])
logger =
<Logger fab.datasources.basesources (INFO)>
class
DataSource(abc.ABC):
14class DataSource(ABC): 15 ''' Represents a source of data. 16 17 Abstract class for describing a data source. It should implement 18 the load() method for loading data. The data must be indexable 19 by train_id 20 ''' 21 22 def __init__(self, name, *args, fillna_method = None, **kwargs): 23 ''' Base constructors. All Sources must have a name. 24 25 Args: 26 name: a human readable name for the data source. 27 28 fillna_method (optional): a string specifing what filling method should 29 be used to fill missing values if the dataset needs to be reindexed. 30 This will happen when the datasource is combined with other sources 31 in an Instrument object that will require all source to have the same 32 train_id index. It should be either 'ffill' for forward filling, or one 33 of the valid method for xr.interpolate_na. Using methods other than 34 'ffill' on large arrays might lead to extremely high memory usage. 35 ''' 36 self.name = name 37 self.fillna_method = fillna_method 38 39 @staticmethod 40 def from_config(name, source_config, **kwargs): 41 """ Returns an datasource configured with the parameters in source_config. 42 43 This is not a classmethod, the class of the returned instance should be specified 44 by setting the <source_type> attribute in the configuration dictionary. 45 46 Extra keyword arguments are passed on verbatim to the source __init__ method 47 48 Args: 49 name: the name of the source to be created 50 source_config: source configuration dictionary """ 51 52 SourceClass = get_config_object(source_config.pop('__type__')) 53 preprocess = source_config.pop('__preprocess__', []) # list of preprocessors 54 55 inst = SourceClass(name = name, **source_config, **kwargs) 56 57 for preproc_config in preprocess: 58 preprocessor = get_config_object(preproc_config.pop('__name__')) 59 logger.debug(f"Source {name}, applying preprocessor: {preproc_config}") 60 inst = preprocessor(inst, **preproc_config) 61 62 return inst 63 64 @staticmethod 65 def from_name(name): 66 """ Searchs for source named <name> in the loaded configuration and initializes is 67 68 Args: 69 name: the name of the source to be looked up in the config""" 70 return DataSource.from_config(name, cfg.sources[name]) 71 72 @abstractmethod 73 def load(self) -> xr.DataArray: 74 ''' Loads thata from the datasource 75 76 Returns: 77 xr.DataArray: the loaded data, possibly represented by a lazy 78 dask.array. It must contain a dimension named 'train_id' 79 ''' 80 pass 81 82 83 def __add__(self, other): 84 from ..instruments import Instrument 85 86 if not isinstance(other, DataSource): 87 return NotImplemented 88 return Instrument([self, other])
Represents a source of data.
Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id
DataSource(name, *args, fillna_method=None, **kwargs)
22 def __init__(self, name, *args, fillna_method = None, **kwargs): 23 ''' Base constructors. All Sources must have a name. 24 25 Args: 26 name: a human readable name for the data source. 27 28 fillna_method (optional): a string specifing what filling method should 29 be used to fill missing values if the dataset needs to be reindexed. 30 This will happen when the datasource is combined with other sources 31 in an Instrument object that will require all source to have the same 32 train_id index. It should be either 'ffill' for forward filling, or one 33 of the valid method for xr.interpolate_na. Using methods other than 34 'ffill' on large arrays might lead to extremely high memory usage. 35 ''' 36 self.name = name 37 self.fillna_method = fillna_method
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
@staticmethod
def
from_config(name, source_config, **kwargs):
39 @staticmethod 40 def from_config(name, source_config, **kwargs): 41 """ Returns an datasource configured with the parameters in source_config. 42 43 This is not a classmethod, the class of the returned instance should be specified 44 by setting the <source_type> attribute in the configuration dictionary. 45 46 Extra keyword arguments are passed on verbatim to the source __init__ method 47 48 Args: 49 name: the name of the source to be created 50 source_config: source configuration dictionary """ 51 52 SourceClass = get_config_object(source_config.pop('__type__')) 53 preprocess = source_config.pop('__preprocess__', []) # list of preprocessors 54 55 inst = SourceClass(name = name, **source_config, **kwargs) 56 57 for preproc_config in preprocess: 58 preprocessor = get_config_object(preproc_config.pop('__name__')) 59 logger.debug(f"Source {name}, applying preprocessor: {preproc_config}") 60 inst = preprocessor(inst, **preproc_config) 61 62 return inst
Returns an datasource configured with the parameters in source_config.
This is not a classmethod, the class of the returned instance should be specified
by setting the
Extra keyword arguments are passed on verbatim to the source __init__ method
Arguments:
- name: the name of the source to be created
- source_config: source configuration dictionary
@staticmethod
def
from_name(name):
64 @staticmethod 65 def from_name(name): 66 """ Searchs for source named <name> in the loaded configuration and initializes is 67 68 Args: 69 name: the name of the source to be looked up in the config""" 70 return DataSource.from_config(name, cfg.sources[name])
Searchs for source named
Arguments:
- name: the name of the source to be looked up in the config
@abstractmethod
def
load(self) -> xarray.core.dataarray.DataArray:
72 @abstractmethod 73 def load(self) -> xr.DataArray: 74 ''' Loads thata from the datasource 75 76 Returns: 77 xr.DataArray: the loaded data, possibly represented by a lazy 78 dask.array. It must contain a dimension named 'train_id' 79 ''' 80 pass
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'