fab.datasources.basesources
1from abc import ABC, abstractmethod 2import xarray as xr 3 4from ..settings import cfg, get_config_object, update_context 5 6import logging 7import copy 8logger = logging.getLogger(__name__) 9 10 11#TODO: more type hints 12#TODO: move ADC slicer baseline and inversion to preprocessor 13 14class DataSource(ABC): 15 ''' Represents a source of data. 16 17 Abstract class for describing a data source. It should implement 18 the load() method for loading data. The data must be indexable 19 by train_id 20 ''' 21 22 def __init__(self, name, *args, fillna_method = None, **kwargs): 23 ''' Base init for a data source. 24 25 Args: 26 name: a human readable name for the data source. 27 fillna_method (optional): a string specifying what filling method should 28 be used to fill missing values if the dataset needs to be reindexed. 29 This will happen when the datasource is combined with other sources 30 in an Instrument object that will require all source to have the same 31 train_id index. It should be either 'ffill' for forward filling, or one 32 of the valid method for xr.interpolate_na. Using methods other than 33 'ffill' on large arrays might lead to extremely high memory usage. 34 ''' 35 self.name = name 36 self.fillna_method = fillna_method 37 38 def __init_subclass__(cls, *args, **kwargs): 39 ''' Adds all subclasses to the context dictionary for reference in config files. 40 ''' 41 super().__init_subclass__(*args, **kwargs) 42 update_context(cls) 43 44 @staticmethod 45 def from_config(name, source_config, **kwargs): 46 """ Returns an datasource configured with the parameters in source_config. 47 48 This is not a classmethod, the class of the returned instance should be specified 49 by setting the <source_type> attribute in the configuration dictionary. 50 51 Extra keyword arguments are passed on verbatim to the source __init__ method 52 53 Args: 54 name: the name of the source to be created 55 source_config: source configuration dictionary """ 56 57 source_config = copy.deepcopy(source_config) # Avoid modifying the original config 58 59 SourceClass = get_config_object(source_config.pop('__type__')) 60 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 61 62 inst = SourceClass(name = name, **source_config, **kwargs) 63 64 # support for old-style preprocessors declaration 65 if isinstance(preprocess, list): 66 # if preprocess is a list, convert it to a dict with empty config 67 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 68 69 #print(f"Source {name} preprocessors: {preprocess}") 70 for preproc_name, preproc_config in preprocess.items(): 71 preprocessor = get_config_object(preproc_name) 72 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 73 inst = preprocessor(inst, **preproc_config or {}) 74 75 return inst 76 77 @staticmethod 78 def from_name(name): 79 """ Searchs for source named <name> in the loaded configuration and initializes is 80 81 Args: 82 name: the name of the source to be looked up in the config""" 83 return DataSource.from_config(name, cfg.sources[name]) 84 85 def _repr_rows(self): 86 rows = [] 87 if self.fillna_method: 88 rows.append(('Fill NA', self.fillna_method)) 89 return rows 90 91 def _repr_preprocessors(self): 92 rows = [] 93 for p in getattr(self, '_preprocessors', []): 94 parts = [] 95 if p.get('args'): 96 parts += [str(a) for a in p['args']] 97 if p.get('kwargs'): 98 parts += [f'{k}={v}' for k, v in p['kwargs'].items()] 99 rows.append((f'→ {p["name"]}', ','.join(parts))) 100 return rows 101 102 @abstractmethod 103 def load(self) -> xr.DataArray: 104 ''' Loads thata from the datasource 105 106 Returns: 107 xr.DataArray: the loaded data, possibly represented by a lazy 108 dask.array. It must contain a dimension named 'train_id' 109 ''' 110 pass 111 112 def __add__(self, other): 113 from ..instruments import Instrument 114 115 if not isinstance(other, DataSource): 116 return NotImplemented 117 return Instrument([self, other]) 118 119 def _repr_html_(self): 120 rows_html = ''.join( 121 f'<tr><td style="padding:2px 8px; opacity:0.6; white-space:nowrap">{k}</td>' 122 f'<td style="padding:2px 8px; font-family:monospace">{v}</td></tr>' 123 for k, v in self._repr_rows() + self._repr_preprocessors() 124 ) 125 return ( 126 f'<table style="border-collapse:collapse; font-size:13px; font-family:sans-serif">' 127 f'<thead><tr><th colspan="2" style="background:rgba(100,130,150,0.35); padding:4px 10px; ' 128 f'text-align:left; font-weight:normal">{type(self).__name__}</th></tr></thead>' 129 f'<tbody>{rows_html}</tbody></table>' 130 )
logger =
<Logger fab.datasources.basesources (INFO)>
class
DataSource(abc.ABC):
15class DataSource(ABC): 16 ''' Represents a source of data. 17 18 Abstract class for describing a data source. It should implement 19 the load() method for loading data. The data must be indexable 20 by train_id 21 ''' 22 23 def __init__(self, name, *args, fillna_method = None, **kwargs): 24 ''' Base init for a data source. 25 26 Args: 27 name: a human readable name for the data source. 28 fillna_method (optional): a string specifying what filling method should 29 be used to fill missing values if the dataset needs to be reindexed. 30 This will happen when the datasource is combined with other sources 31 in an Instrument object that will require all source to have the same 32 train_id index. It should be either 'ffill' for forward filling, or one 33 of the valid method for xr.interpolate_na. Using methods other than 34 'ffill' on large arrays might lead to extremely high memory usage. 35 ''' 36 self.name = name 37 self.fillna_method = fillna_method 38 39 def __init_subclass__(cls, *args, **kwargs): 40 ''' Adds all subclasses to the context dictionary for reference in config files. 41 ''' 42 super().__init_subclass__(*args, **kwargs) 43 update_context(cls) 44 45 @staticmethod 46 def from_config(name, source_config, **kwargs): 47 """ Returns an datasource configured with the parameters in source_config. 48 49 This is not a classmethod, the class of the returned instance should be specified 50 by setting the <source_type> attribute in the configuration dictionary. 51 52 Extra keyword arguments are passed on verbatim to the source __init__ method 53 54 Args: 55 name: the name of the source to be created 56 source_config: source configuration dictionary """ 57 58 source_config = copy.deepcopy(source_config) # Avoid modifying the original config 59 60 SourceClass = get_config_object(source_config.pop('__type__')) 61 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 62 63 inst = SourceClass(name = name, **source_config, **kwargs) 64 65 # support for old-style preprocessors declaration 66 if isinstance(preprocess, list): 67 # if preprocess is a list, convert it to a dict with empty config 68 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 69 70 #print(f"Source {name} preprocessors: {preprocess}") 71 for preproc_name, preproc_config in preprocess.items(): 72 preprocessor = get_config_object(preproc_name) 73 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 74 inst = preprocessor(inst, **preproc_config or {}) 75 76 return inst 77 78 @staticmethod 79 def from_name(name): 80 """ Searchs for source named <name> in the loaded configuration and initializes is 81 82 Args: 83 name: the name of the source to be looked up in the config""" 84 return DataSource.from_config(name, cfg.sources[name]) 85 86 def _repr_rows(self): 87 rows = [] 88 if self.fillna_method: 89 rows.append(('Fill NA', self.fillna_method)) 90 return rows 91 92 def _repr_preprocessors(self): 93 rows = [] 94 for p in getattr(self, '_preprocessors', []): 95 parts = [] 96 if p.get('args'): 97 parts += [str(a) for a in p['args']] 98 if p.get('kwargs'): 99 parts += [f'{k}={v}' for k, v in p['kwargs'].items()] 100 rows.append((f'→ {p["name"]}', ','.join(parts))) 101 return rows 102 103 @abstractmethod 104 def load(self) -> xr.DataArray: 105 ''' Loads thata from the datasource 106 107 Returns: 108 xr.DataArray: the loaded data, possibly represented by a lazy 109 dask.array. It must contain a dimension named 'train_id' 110 ''' 111 pass 112 113 def __add__(self, other): 114 from ..instruments import Instrument 115 116 if not isinstance(other, DataSource): 117 return NotImplemented 118 return Instrument([self, other]) 119 120 def _repr_html_(self): 121 rows_html = ''.join( 122 f'<tr><td style="padding:2px 8px; opacity:0.6; white-space:nowrap">{k}</td>' 123 f'<td style="padding:2px 8px; font-family:monospace">{v}</td></tr>' 124 for k, v in self._repr_rows() + self._repr_preprocessors() 125 ) 126 return ( 127 f'<table style="border-collapse:collapse; font-size:13px; font-family:sans-serif">' 128 f'<thead><tr><th colspan="2" style="background:rgba(100,130,150,0.35); padding:4px 10px; ' 129 f'text-align:left; font-weight:normal">{type(self).__name__}</th></tr></thead>' 130 f'<tbody>{rows_html}</tbody></table>' 131 )
Represents a source of data.
Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id
DataSource(name, *args, fillna_method=None, **kwargs)
23 def __init__(self, name, *args, fillna_method = None, **kwargs): 24 ''' Base init for a data source. 25 26 Args: 27 name: a human readable name for the data source. 28 fillna_method (optional): a string specifying what filling method should 29 be used to fill missing values if the dataset needs to be reindexed. 30 This will happen when the datasource is combined with other sources 31 in an Instrument object that will require all source to have the same 32 train_id index. It should be either 'ffill' for forward filling, or one 33 of the valid method for xr.interpolate_na. Using methods other than 34 'ffill' on large arrays might lead to extremely high memory usage. 35 ''' 36 self.name = name 37 self.fillna_method = fillna_method
Base init for a data source.
Arguments:
- name: a human readable name for the data source.
- fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
@staticmethod
def
from_config(name, source_config, **kwargs):
45 @staticmethod 46 def from_config(name, source_config, **kwargs): 47 """ Returns an datasource configured with the parameters in source_config. 48 49 This is not a classmethod, the class of the returned instance should be specified 50 by setting the <source_type> attribute in the configuration dictionary. 51 52 Extra keyword arguments are passed on verbatim to the source __init__ method 53 54 Args: 55 name: the name of the source to be created 56 source_config: source configuration dictionary """ 57 58 source_config = copy.deepcopy(source_config) # Avoid modifying the original config 59 60 SourceClass = get_config_object(source_config.pop('__type__')) 61 preprocess = source_config.pop('__preprocess__', {}) # dict of preprocessors 62 63 inst = SourceClass(name = name, **source_config, **kwargs) 64 65 # support for old-style preprocessors declaration 66 if isinstance(preprocess, list): 67 # if preprocess is a list, convert it to a dict with empty config 68 preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess} 69 70 #print(f"Source {name} preprocessors: {preprocess}") 71 for preproc_name, preproc_config in preprocess.items(): 72 preprocessor = get_config_object(preproc_name) 73 logger.debug(f"Source {name}, applying preprocessor: {preproc_name}") 74 inst = preprocessor(inst, **preproc_config or {}) 75 76 return inst
Returns an datasource configured with the parameters in source_config.
This is not a classmethod, the class of the returned instance should be specified
by setting the
Extra keyword arguments are passed on verbatim to the source __init__ method
Arguments:
- name: the name of the source to be created
- source_config: source configuration dictionary
@staticmethod
def
from_name(name):
78 @staticmethod 79 def from_name(name): 80 """ Searchs for source named <name> in the loaded configuration and initializes is 81 82 Args: 83 name: the name of the source to be looked up in the config""" 84 return DataSource.from_config(name, cfg.sources[name])
Searchs for source named
Arguments:
- name: the name of the source to be looked up in the config
@abstractmethod
def
load(self) -> xarray.core.dataarray.DataArray:
103 @abstractmethod 104 def load(self) -> xr.DataArray: 105 ''' Loads thata from the datasource 106 107 Returns: 108 xr.DataArray: the loaded data, possibly represented by a lazy 109 dask.array. It must contain a dimension named 'train_id' 110 ''' 111 pass
Loads thata from the datasource
Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'