fab.datasources.basesources

  1from abc import ABC, abstractmethod
  2import xarray as xr
  3
  4from ..settings import cfg, get_config_object, update_context
  5
  6import logging
  7import copy
  8logger = logging.getLogger(__name__)
  9
 10
 11#TODO: more type hints
 12#TODO: move ADC slicer baseline and inversion to preprocessor
 13
 14class DataSource(ABC):
 15    ''' Represents a source of data. 
 16    
 17    Abstract class for describing a data source. It should implement
 18    the load() method for loading data. The data must be indexable 
 19    by train_id
 20    '''
 21    
 22    def __init__(self, name, *args, fillna_method = None, **kwargs):
 23        ''' Base init for a data source. 
 24        
 25        Args:
 26            name: a human readable name for the data source.
 27            fillna_method (optional): a string specifying what filling method should
 28                be used to fill missing values if the dataset needs to be reindexed.
 29                This will happen when the datasource is combined with other sources
 30                in an Instrument object that will require all source to have the same
 31                train_id index. It should be either 'ffill' for forward filling, or one
 32                of the valid method for xr.interpolate_na. Using methods other than 
 33                'ffill' on large arrays might lead to extremely high memory usage.
 34        '''
 35        self.name = name
 36        self.fillna_method = fillna_method 
 37
 38    def __init_subclass__(cls, *args, **kwargs):
 39        ''' Adds all subclasses to the context dictionary for reference in config files.
 40        '''
 41        super().__init_subclass__(*args, **kwargs)
 42        update_context(cls)
 43
 44    @staticmethod
 45    def from_config(name, source_config, **kwargs):
 46        """ Returns an datasource configured with the parameters in source_config. 
 47        
 48        This is not a classmethod, the class of the returned instance should be specified
 49        by setting the <source_type> attribute in the configuration dictionary.
 50        
 51        Extra keyword arguments are passed on verbatim to the source __init__ method
 52        
 53        Args:
 54            name: the name of the source to be created
 55            source_config: source configuration dictionary """
 56        
 57        source_config = copy.deepcopy(source_config) # Avoid modifying the original config
 58
 59        SourceClass = get_config_object(source_config.pop('__type__'))
 60        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
 61
 62        inst = SourceClass(name = name, **source_config, **kwargs)
 63
 64        # support for old-style preprocessors declaration
 65        if isinstance(preprocess, list):
 66            # if preprocess is a list, convert it to a dict with empty config
 67            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
 68        
 69        #print(f"Source {name} preprocessors: {preprocess}")
 70        for preproc_name, preproc_config in preprocess.items():
 71            preprocessor = get_config_object(preproc_name)
 72            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
 73            inst = preprocessor(inst, **preproc_config or {})
 74
 75        return inst
 76
 77    @staticmethod
 78    def from_name(name):
 79        """ Searchs for source named <name> in the loaded configuration and initializes is
 80        
 81        Args:
 82            name: the name of the source to be looked up in the config"""
 83        return DataSource.from_config(name, cfg.sources[name])
 84
 85    def _repr_rows(self):
 86        rows = []
 87        if self.fillna_method:
 88            rows.append(('Fill NA', self.fillna_method))
 89        return rows
 90
 91    def _repr_preprocessors(self):
 92        rows = []
 93        for p in getattr(self, '_preprocessors', []):
 94            parts = []
 95            if p.get('args'):
 96                parts += [str(a) for a in p['args']]
 97            if p.get('kwargs'):
 98                parts += [f'{k}={v}' for k, v in p['kwargs'].items()]
 99            rows.append((f'→ {p["name"]}', ','.join(parts)))
100        return rows
101
102    @abstractmethod
103    def load(self) -> xr.DataArray:
104        ''' Loads thata from the datasource
105
106        Returns: 
107            xr.DataArray: the loaded data, possibly represented by a lazy
108                dask.array. It must contain a dimension named 'train_id'
109        '''
110        pass
111
112    def __add__(self, other):
113        from ..instruments import Instrument
114
115        if not isinstance(other, DataSource):
116            return NotImplemented
117        return Instrument([self, other])
118
119    def _repr_html_(self):
120        rows_html = ''.join(
121            f'<tr><td style="padding:2px 8px; opacity:0.6; white-space:nowrap">{k}</td>'
122            f'<td style="padding:2px 8px; font-family:monospace">{v}</td></tr>'
123            for k, v in self._repr_rows() + self._repr_preprocessors()
124        )
125        return (
126            f'<table style="border-collapse:collapse; font-size:13px; font-family:sans-serif">'
127            f'<thead><tr><th colspan="2" style="background:rgba(100,130,150,0.35); padding:4px 10px; '
128            f'text-align:left; font-weight:normal">{type(self).__name__}</th></tr></thead>'
129            f'<tbody>{rows_html}</tbody></table>'
130        )
logger = <Logger fab.datasources.basesources (INFO)>
class DataSource(abc.ABC):
 15class DataSource(ABC):
 16    ''' Represents a source of data. 
 17    
 18    Abstract class for describing a data source. It should implement
 19    the load() method for loading data. The data must be indexable 
 20    by train_id
 21    '''
 22    
 23    def __init__(self, name, *args, fillna_method = None, **kwargs):
 24        ''' Base init for a data source. 
 25        
 26        Args:
 27            name: a human readable name for the data source.
 28            fillna_method (optional): a string specifying what filling method should
 29                be used to fill missing values if the dataset needs to be reindexed.
 30                This will happen when the datasource is combined with other sources
 31                in an Instrument object that will require all source to have the same
 32                train_id index. It should be either 'ffill' for forward filling, or one
 33                of the valid method for xr.interpolate_na. Using methods other than 
 34                'ffill' on large arrays might lead to extremely high memory usage.
 35        '''
 36        self.name = name
 37        self.fillna_method = fillna_method 
 38
 39    def __init_subclass__(cls, *args, **kwargs):
 40        ''' Adds all subclasses to the context dictionary for reference in config files.
 41        '''
 42        super().__init_subclass__(*args, **kwargs)
 43        update_context(cls)
 44
 45    @staticmethod
 46    def from_config(name, source_config, **kwargs):
 47        """ Returns an datasource configured with the parameters in source_config. 
 48        
 49        This is not a classmethod, the class of the returned instance should be specified
 50        by setting the <source_type> attribute in the configuration dictionary.
 51        
 52        Extra keyword arguments are passed on verbatim to the source __init__ method
 53        
 54        Args:
 55            name: the name of the source to be created
 56            source_config: source configuration dictionary """
 57        
 58        source_config = copy.deepcopy(source_config) # Avoid modifying the original config
 59
 60        SourceClass = get_config_object(source_config.pop('__type__'))
 61        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
 62
 63        inst = SourceClass(name = name, **source_config, **kwargs)
 64
 65        # support for old-style preprocessors declaration
 66        if isinstance(preprocess, list):
 67            # if preprocess is a list, convert it to a dict with empty config
 68            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
 69        
 70        #print(f"Source {name} preprocessors: {preprocess}")
 71        for preproc_name, preproc_config in preprocess.items():
 72            preprocessor = get_config_object(preproc_name)
 73            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
 74            inst = preprocessor(inst, **preproc_config or {})
 75
 76        return inst
 77
 78    @staticmethod
 79    def from_name(name):
 80        """ Searchs for source named <name> in the loaded configuration and initializes is
 81        
 82        Args:
 83            name: the name of the source to be looked up in the config"""
 84        return DataSource.from_config(name, cfg.sources[name])
 85
 86    def _repr_rows(self):
 87        rows = []
 88        if self.fillna_method:
 89            rows.append(('Fill NA', self.fillna_method))
 90        return rows
 91
 92    def _repr_preprocessors(self):
 93        rows = []
 94        for p in getattr(self, '_preprocessors', []):
 95            parts = []
 96            if p.get('args'):
 97                parts += [str(a) for a in p['args']]
 98            if p.get('kwargs'):
 99                parts += [f'{k}={v}' for k, v in p['kwargs'].items()]
100            rows.append((f'→ {p["name"]}', ','.join(parts)))
101        return rows
102
103    @abstractmethod
104    def load(self) -> xr.DataArray:
105        ''' Loads thata from the datasource
106
107        Returns: 
108            xr.DataArray: the loaded data, possibly represented by a lazy
109                dask.array. It must contain a dimension named 'train_id'
110        '''
111        pass
112
113    def __add__(self, other):
114        from ..instruments import Instrument
115
116        if not isinstance(other, DataSource):
117            return NotImplemented
118        return Instrument([self, other])
119
120    def _repr_html_(self):
121        rows_html = ''.join(
122            f'<tr><td style="padding:2px 8px; opacity:0.6; white-space:nowrap">{k}</td>'
123            f'<td style="padding:2px 8px; font-family:monospace">{v}</td></tr>'
124            for k, v in self._repr_rows() + self._repr_preprocessors()
125        )
126        return (
127            f'<table style="border-collapse:collapse; font-size:13px; font-family:sans-serif">'
128            f'<thead><tr><th colspan="2" style="background:rgba(100,130,150,0.35); padding:4px 10px; '
129            f'text-align:left; font-weight:normal">{type(self).__name__}</th></tr></thead>'
130            f'<tbody>{rows_html}</tbody></table>'
131        )

Represents a source of data.

Abstract class for describing a data source. It should implement the load() method for loading data. The data must be indexable by train_id

DataSource(name, *args, fillna_method=None, **kwargs)
23    def __init__(self, name, *args, fillna_method = None, **kwargs):
24        ''' Base init for a data source. 
25        
26        Args:
27            name: a human readable name for the data source.
28            fillna_method (optional): a string specifying what filling method should
29                be used to fill missing values if the dataset needs to be reindexed.
30                This will happen when the datasource is combined with other sources
31                in an Instrument object that will require all source to have the same
32                train_id index. It should be either 'ffill' for forward filling, or one
33                of the valid method for xr.interpolate_na. Using methods other than 
34                'ffill' on large arrays might lead to extremely high memory usage.
35        '''
36        self.name = name
37        self.fillna_method = fillna_method 

Base init for a data source.

Arguments:
  • name: a human readable name for the data source.
  • fillna_method (optional): a string specifying what filling method should be used to fill missing values if the dataset needs to be reindexed. This will happen when the datasource is combined with other sources in an Instrument object that will require all source to have the same train_id index. It should be either 'ffill' for forward filling, or one of the valid method for xr.interpolate_na. Using methods other than 'ffill' on large arrays might lead to extremely high memory usage.
name
fillna_method
@staticmethod
def from_config(name, source_config, **kwargs):
45    @staticmethod
46    def from_config(name, source_config, **kwargs):
47        """ Returns an datasource configured with the parameters in source_config. 
48        
49        This is not a classmethod, the class of the returned instance should be specified
50        by setting the <source_type> attribute in the configuration dictionary.
51        
52        Extra keyword arguments are passed on verbatim to the source __init__ method
53        
54        Args:
55            name: the name of the source to be created
56            source_config: source configuration dictionary """
57        
58        source_config = copy.deepcopy(source_config) # Avoid modifying the original config
59
60        SourceClass = get_config_object(source_config.pop('__type__'))
61        preprocess = source_config.pop('__preprocess__', {})   # dict of preprocessors
62
63        inst = SourceClass(name = name, **source_config, **kwargs)
64
65        # support for old-style preprocessors declaration
66        if isinstance(preprocess, list):
67            # if preprocess is a list, convert it to a dict with empty config
68            preprocess = {preproc.pop('__name__'): {**preproc} for preproc in preprocess}
69        
70        #print(f"Source {name} preprocessors: {preprocess}")
71        for preproc_name, preproc_config in preprocess.items():
72            preprocessor = get_config_object(preproc_name)
73            logger.debug(f"Source {name}, applying preprocessor: {preproc_name}")
74            inst = preprocessor(inst, **preproc_config or {})
75
76        return inst

Returns an datasource configured with the parameters in source_config.

This is not a classmethod, the class of the returned instance should be specified by setting the attribute in the configuration dictionary.

Extra keyword arguments are passed on verbatim to the source __init__ method

Arguments:
  • name: the name of the source to be created
  • source_config: source configuration dictionary
@staticmethod
def from_name(name):
78    @staticmethod
79    def from_name(name):
80        """ Searchs for source named <name> in the loaded configuration and initializes is
81        
82        Args:
83            name: the name of the source to be looked up in the config"""
84        return DataSource.from_config(name, cfg.sources[name])

Searchs for source named in the loaded configuration and initializes is

Arguments:
  • name: the name of the source to be looked up in the config
@abstractmethod
def load(self) -> xarray.core.dataarray.DataArray:
103    @abstractmethod
104    def load(self) -> xr.DataArray:
105        ''' Loads thata from the datasource
106
107        Returns: 
108            xr.DataArray: the loaded data, possibly represented by a lazy
109                dask.array. It must contain a dimension named 'train_id'
110        '''
111        pass

Loads thata from the datasource

Returns: xr.DataArray: the loaded data, possibly represented by a lazy dask.array. It must contain a dimension named 'train_id'