fab.datasources.basesources
1from abc import ABC, abstractmethod 2import xarray as xr 3 4from ..settings import cfg, get_config_object, update_context 5 6import logging 7logger = logging.getLogger(__name__) 8 9 10#TODO: more type hints 11#TODO: move ADC slicer baseline and inversion to preprocessor 12 13class DataSource(ABC): 14 ''' Represents a source of data. 15 16 Abstract class for describing a data source. It should implement 17 the load() method for loading data. The data must be indexable 18 by train_id 19 ''' 20 21 def __init__(self, name, *args, fillna_method = None, **kwargs): 22 ''' Base constructors. All Sources must have a name. 23 24 Args: 25 name: a human readable name for the data source. 26 fillna_method (optional): a string specifing what filling method should 27 be used to fill missing values if the dataset needs to be reindexed. 28 This will happen when the datasource is combined with other sources 29 in an Instrument object that will require all source to have the same 30 train_id index. It should be either 'ffill' for forward filling, or one 31 of the valid method for xr.interpolate_na. Using methods other than 32 'ffill' on large arrays might lead to extremely high memory usage. 33 ''' 34 self.name = name 35 self.fillna_method = fillna_method 36 37 def __init_subclass__(cls, *args, **kwargs): 38 ''' Adds all subclasses to the context dictionary for reference in config files. 39 ''' 40 super().__init_subclass__(*args, **kwargs) 41 update_context(cls) 42 43 @staticmethod 44 def from_config(name, source_config, **kwargs): 45 """ Returns an datasource configured with the parameters in source_config. 46 47 This is not a classmethod, the class of the returned instance should be specified 48 by setting the <source_type> attribute in the configuration dictionary. 49 50 Extra keyword arguments are passed on verbatim to the source __init__ method 51 52 Args: 53 name: the name of the source to be created 54 source_config: source configuration dictionary """ 55 56 source_config = source_config.copy() # Avoid modifying the original config 57 58 SourceClass = get_config_object(source_config.pop('__type__')) 59 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 60 61 inst = SourceClass(name = name, **source_config, **kwargs) 62 63 # support for old-style preprocessors declaration 64 if isinstance(preprocess, list): 65 # if preprocess is a list, convert it to a dict with empty config 66 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 67 68 #print(f"Source {name} preprocessors: {preprocess}") 69 for preproc_name, preproc_config in preprocess.items(): 70 preprocessor = get_config_object(preproc_name) 71 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 72 inst = preprocessor(inst, **preproc_config or {}) 73 74 return inst 75 76 @staticmethod 77 def from_name(name): 78 """ Searchs for source named <name> in the loaded configuration and initializes is 79 80 Args: 81 name: the name of the source to be looked up in the config""" 82 return DataSource.from_config(name, cfg.sources[name]) 83 84 @abstractmethod 85 def load(self) -> xr.DataArray: 86 ''' Loads thata from the datasource 87 88 Returns: 89 xr.DataArray: the loaded data, possibly represented by a lazy 90 dask.array. It must contain a dimension named 'train_id' 91 ''' 92 pass 93 94 95 def __add__(self, other): 96 from ..instruments import Instrument 97 98 if not isinstance(other, DataSource): 99 return NotImplemented 100 return Instrument([self, other])
logger =
<Logger fab.datasources.basesources (INFO)>
class
DataSource(abc.ABC):
14class DataSource(ABC): 15 ''' Represents a source of data. 16 17 Abstract class for describing a data source. It should implement 18 the load() method for loading data. The data must be indexable 19 by train_id 20 ''' 21 22 def __init__(self, name, *args, fillna_method = None, **kwargs): 23 ''' Base constructors. All Sources must have a name. 24 25 Args: 26 name: a human readable name for the data source. 27 fillna_method (optional): a string specifing what filling method should 28 be used to fill missing values if the dataset needs to be reindexed. 29 This will happen when the datasource is combined with other sources 30 in an Instrument object that will require all source to have the same 31 train_id index. It should be either 'ffill' for forward filling, or one 32 of the valid method for xr.interpolate_na. Using methods other than 33 'ffill' on large arrays might lead to extremely high memory usage. 34 ''' 35 self.name = name 36 self.fillna_method = fillna_method 37 38 def __init_subclass__(cls, *args, **kwargs): 39 ''' Adds all subclasses to the context dictionary for reference in config files. 40 ''' 41 super().__init_subclass__(*args, **kwargs) 42 update_context(cls) 43 44 @staticmethod 45 def from_config(name, source_config, **kwargs): 46 """ Returns an datasource configured with the parameters in source_config. 47 48 This is not a classmethod, the class of the returned instance should be specified 49 by setting the <source_type> attribute in the configuration dictionary. 50 51 Extra keyword arguments are passed on verbatim to the source __init__ method 52 53 Args: 54 name: the name of the source to be created 55 source_config: source configuration dictionary """ 56 57 source_config = source_config.copy() # Avoid modifying the original config 58 59 SourceClass = get_config_object(source_config.pop('__type__')) 60 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 61 62 inst = SourceClass(name = name, **source_config, **kwargs) 63 64 # support for old-style preprocessors declaration 65 if isinstance(preprocess, list): 66 # if preprocess is a list, convert it to a dict with empty config 67 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 68 69 #print(f"Source {name} preprocessors: {preprocess}") 70 for preproc_name, preproc_config in preprocess.items(): 71 preprocessor = get_config_object(preproc_name) 72 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 73 inst = preprocessor(inst, **preproc_config or {}) 74 75 return inst 76 77 @staticmethod 78 def from_name(name): 79 """ Searchs for source named <name> in the loaded configuration and initializes is 80 81 Args: 82 name: the name of the source to be looked up in the config""" 83 return DataSource.from_config(name, cfg.sources[name]) 84 85 @abstractmethod 86 def load(self) -> xr.DataArray: 87 ''' Loads thata from the datasource 88 89 Returns: 90 xr.DataArray: the loaded data, possibly represented by a lazy 91 dask.array. It must contain a dimension named 'train_id' 92 ''' 93 pass 94 95 96 def __add__(self, other): 97 from ..instruments import Instrument 98 99 if not isinstance(other, DataSource): 100 return NotImplemented 101 return Instrument([self, other])
Represents a source of data.
Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id
DataSource(name, *args, fillna_method=None, **kwargs)
22 def __init__(self, name, *args, fillna_method = None, **kwargs): 23 ''' Base constructors. All Sources must have a name. 24 25 Args: 26 name: a human readable name for the data source. 27 fillna_method (optional): a string specifing what filling method should 28 be used to fill missing values if the dataset needs to be reindexed. 29 This will happen when the datasource is combined with other sources 30 in an Instrument object that will require all source to have the same 31 train_id index. It should be either 'ffill' for forward filling, or one 32 of the valid method for xr.interpolate_na. Using methods other than 33 'ffill' on large arrays might lead to extremely high memory usage. 34 ''' 35 self.name = name 36 self.fillna_method = fillna_method
Base constructors. All Sources must have a name.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifing what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
@staticmethod
def
from_config(name, source_config, **kwargs):
44 @staticmethod 45 def from_config(name, source_config, **kwargs): 46 """ Returns an datasource configured with the parameters in source_config. 47 48 This is not a classmethod, the class of the returned instance should be specified 49 by setting the <source_type> attribute in the configuration dictionary. 50 51 Extra keyword arguments are passed on verbatim to the source __init__ method 52 53 Args: 54 name: the name of the source to be created 55 source_config: source configuration dictionary """ 56 57 source_config = source_config.copy() # Avoid modifying the original config 58 59 SourceClass = get_config_object(source_config.pop('__type__')) 60 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 61 62 inst = SourceClass(name = name, **source_config, **kwargs) 63 64 # support for old-style preprocessors declaration 65 if isinstance(preprocess, list): 66 # if preprocess is a list, convert it to a dict with empty config 67 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 68 69 #print(f"Source {name} preprocessors: {preprocess}") 70 for preproc_name, preproc_config in preprocess.items(): 71 preprocessor = get_config_object(preproc_name) 72 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 73 inst = preprocessor(inst, **preproc_config or {}) 74 75 return inst
Returns an datasource configured with the parameters in source_config.
This is not a classmethod, the class of the returned instance should be specified
by setting the
Extra keyword arguments are passed on verbatim to the source __init__ method
Arguments:
- name: the name of the source to be created
- source_config: source configuration dictionary
@staticmethod
def
from_name(name):
77 @staticmethod 78 def from_name(name): 79 """ Searchs for source named <name> in the loaded configuration and initializes is 80 81 Args: 82 name: the name of the source to be looked up in the config""" 83 return DataSource.from_config(name, cfg.sources[name])
Searchs for source named
Arguments:
- name: the name of the source to be looked up in the config
@abstractmethod
def
load(self) -> xarray.core.dataarray.DataArray:
85 @abstractmethod 86 def load(self) -> xr.DataArray: 87 ''' Loads thata from the datasource 88 89 Returns: 90 xr.DataArray: the loaded data, possibly represented by a lazy 91 dask.array. It must contain a dimension named 'train_id' 92 ''' 93 pass
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'