fab.settings.setup

  1import os
  2import glob
  3import importlib
  4import warnings
  5from pprint import pformat
  6
  7from config import config, ConfigurationSet
  8ConfigurationSet.__str__  = lambda self: pformat(self.as_dict()) #Monkeypatch __str__ for better printing of config
  9ConfigurationSet.__repr__ = lambda self: pformat(self.as_dict()) #Monkeypatch __repr__ for better printing of config
 10
 11#Loads config from main package folder
 12default_config = os.path.join(os.path.dirname(__file__), '../config.toml')
 13
 14#Set up global variables
 15cfg = config(default_config)
 16cfg_context = {}
 17dask_client = None
 18dask_cluster = None
 19
 20
 21#Set up logging
 22from .log_handlers import get_console_handler, get_telemetry_handler
 23import logging
 24
 25logger = logging.getLogger('fab')
 26logger.setLevel(cfg.logging_level)
 27logger.addHandler(get_console_handler())
 28
 29# Empty logger for telemetry, since when this is exectued the user config is not loaded yet
 30# Proper setup is done only when update_config is called
 31tele_log = logging.getLogger('fab.telemetry') 
 32tele_log.propagate = False # Don't propagate to root logge, telemetry is separated
 33tele_log.addHandler(logging.NullHandler()) 
 34
 35
 36#turns a list of functions or classes into a dictionary of {name: function}
 37def _get_context_from_list(objects):
 38    if objects is None:
 39        return {}
 40    elif isinstance(objects, dict):
 41        return objects
 42    elif isinstance(objects, list):
 43        return {f.__name__: f for f in objects}
 44    else:
 45        raise TypeError(f"objects must be a list or a dict, not {type(objects)}")
 46
 47def get_config_object(obj_name):
 48    try:  # Try looking up the name in the configuration context
 49        return cfg_context[obj_name]
 50    except KeyError:
 51        pass
 52
 53    tele_log.info(f"Config lookup for {obj_name}") #Only log lookups internal to fab
 54
 55    #If not, check for the name in one of fab submodules
 56    path = obj_name.split('.')
 57    if path[0] != 'fab':
 58        raise ValueError(f"{obj_name} was not found in context and is not in fab namespace")
 59
 60    return getattr( importlib.import_module('.'.join(path[:-1])),
 61                    path[-1])
 62
 63def update_context(context):
 64    ''' Updates the global context with the given dictionary or list of objects
 65    '''
 66    tele_log.info("")
 67
 68    global cfg_context
 69    cfg_context.update(_get_context_from_list(context))
 70
 71def update_config(cfg_path):
 72    ''' Updates the global config with the given toml file
 73    '''
 74    global cfg
 75    cfg.update(config(str(cfg_path), dict(cfg)))
 76    logger.setLevel(cfg.logging_level)
 77    logger.info(f"Loading config from {cfg_path}")
 78
 79    #Set up telemetry logger
 80    if cfg.telemetry:
 81        tele_log.addHandler(get_telemetry_handler(cfg.telemetry_path))
 82
 83def maxwell_start_if_not_already():
 84    ''' Starts a dask cluster if not already running
 85    '''
 86    global dask_client
 87    global dask_cluster
 88
 89    if dask_client is None:
 90        from ..maxwell import dask_maxwell_setup
 91        with warnings.catch_warnings(): #Suppress dask "cluster already running?" warning
 92            warnings.simplefilter("ignore")
 93            dask_cluster, dask_client = dask_maxwell_setup()
 94
 95        if cfg.display_dask_cluster:
 96            try:
 97                display(dask_client) # Raises NameError outside of ipython
 98            except NameError:
 99                print(f"Started dask: {dask_client}")
100
101#Loads additional config from toml
102def fab_setup(cfg_path=None, *args, beamtime=None, 
103              context: dict | list = None, maxwell_setup = True):
104    ''' Sets the configuration specified in cfg_path to the existing config
105    All paramters are optional. 
106    
107    Args:
108        cfg_path: A path pointing to a toml file containg the config to be 
109            loaed.
110        beamtime: Integer id of a flash beamtime. If specifed, the hdf_path
111            values is set automatically to read data from that beamtime HDFs. 
112            If 'auto' is given, the beamtime path is autodetected from the
113            current working directory (if possible)
114        context (dict or list): a mapping of names to python objects to be used 
115            when instantiating sources and instruments. A list can also be given 
116            instead, provided that all objects in the list have a __name__ 
117            attribute. This can be used to specify custom DataSource types or 
118            preprocessor callables that are not part of the fab library. 
119            See the `fab.preprocessing` doc for more.
120        maxwell_setup: wheteher to start a dask distributed cluster. If running
121            on the maxwell HPC cluster, it will use it. Default True
122    '''
123
124    global cfg
125    global cfg_context
126    global dask_client
127    global dask_cluster
128
129    if cfg_path:
130        update_config(cfg_path)
131
132    logger.debug(f"Fab setup running on {os.uname().nodename}")
133
134    tele_log.warning("") # Are users still using this?
135
136    if beamtime:
137        from ..beamtime import autodetect_beamtime
138
139        logger.warning(f"Setting beamtime via fab_setup is deprectaed and will be removed in future versions. Set the beamtime parameter in the configuration file or use from fab.magic import beamtime to autodetect it")
140        if beamtime == 'auto' or beamtime is ...:
141            beamtime = autodetect_beamtime()
142        cfg['beamtime'] = beamtime
143
144    if context:
145        update_context(context)
146            
147    if maxwell_setup:
148        maxwell_start_if_not_already()
149
150    logger.debug(f"Loaded config:\n {cfg}")
default_config = '/home/leverfab/flashanalysis/fab/settings/../config.toml'
cfg = {'auto_preload': True, 'autoreload_hdf': True, 'beamtime': '', 'btm.styler.collapse_cols': True, 'btm.styler.hide_run_thr': 0, 'btm.styler.mark_aborted': True, 'btm.styler.mark_in_progress': True, 'btm.styler.mark_unique': ['type'], 'btm.styler.max_col_width': '30ch', 'btm.styler.shot_count_hist': True, 'btm.styler.show_on_hover': True, 'dask.split_large_chunks': False, 'display_dask_cluster': True, 'export_daq_run': True, 'hdf_path': '', 'idx_path': '', 'instruments.null.null': nan, 'logging_level': 'INFO', 'maxwell.adapt': False, 'maxwell.job_extra_params': ['--output=fab-slurm.out', '-J fab-worker'], 'maxwell.log_directory': '', 'maxwell.login_hostnames': ['max-display*', 'max-wgse*', 'max-cssb-display*', 'max-fs-display*', 'max-hzgg*'], 'maxwell.max_nodes': 1, 'maxwell.memory': '512G', 'maxwell.num_cores': 40, 'maxwell.num_proc': 40, 'maxwell.partition': '', 'maxwell.walltime': '08:00:00', 'preload_path': '', 'sources.null.null': nan, 'telemetry': True, 'telemetry_path': '/gpfs/petra3/scratch/fab/'}
cfg_context = {}
dask_client = <Client: 'tcp://131.169.168.144:44302' processes=0 threads=0, memory=0 B>
dask_cluster = SLURMCluster(e4ee2cf4, 'tcp://131.169.168.144:44302', workers=0, threads=0, memory=0 B)
logger = <Logger fab (INFO)>
tele_log = <Logger fab.telemetry (INFO)>
def get_config_object(obj_name):
48def get_config_object(obj_name):
49    try:  # Try looking up the name in the configuration context
50        return cfg_context[obj_name]
51    except KeyError:
52        pass
53
54    tele_log.info(f"Config lookup for {obj_name}") #Only log lookups internal to fab
55
56    #If not, check for the name in one of fab submodules
57    path = obj_name.split('.')
58    if path[0] != 'fab':
59        raise ValueError(f"{obj_name} was not found in context and is not in fab namespace")
60
61    return getattr( importlib.import_module('.'.join(path[:-1])),
62                    path[-1])
def update_context(context):
64def update_context(context):
65    ''' Updates the global context with the given dictionary or list of objects
66    '''
67    tele_log.info("")
68
69    global cfg_context
70    cfg_context.update(_get_context_from_list(context))

Updates the global context with the given dictionary or list of objects

def update_config(cfg_path):
72def update_config(cfg_path):
73    ''' Updates the global config with the given toml file
74    '''
75    global cfg
76    cfg.update(config(str(cfg_path), dict(cfg)))
77    logger.setLevel(cfg.logging_level)
78    logger.info(f"Loading config from {cfg_path}")
79
80    #Set up telemetry logger
81    if cfg.telemetry:
82        tele_log.addHandler(get_telemetry_handler(cfg.telemetry_path))

Updates the global config with the given toml file

def maxwell_start_if_not_already():
 84def maxwell_start_if_not_already():
 85    ''' Starts a dask cluster if not already running
 86    '''
 87    global dask_client
 88    global dask_cluster
 89
 90    if dask_client is None:
 91        from ..maxwell import dask_maxwell_setup
 92        with warnings.catch_warnings(): #Suppress dask "cluster already running?" warning
 93            warnings.simplefilter("ignore")
 94            dask_cluster, dask_client = dask_maxwell_setup()
 95
 96        if cfg.display_dask_cluster:
 97            try:
 98                display(dask_client) # Raises NameError outside of ipython
 99            except NameError:
100                print(f"Started dask: {dask_client}")

Starts a dask cluster if not already running

def fab_setup( cfg_path=None, *args, beamtime=None, context: dict | list = None, maxwell_setup=True):
103def fab_setup(cfg_path=None, *args, beamtime=None, 
104              context: dict | list = None, maxwell_setup = True):
105    ''' Sets the configuration specified in cfg_path to the existing config
106    All paramters are optional. 
107    
108    Args:
109        cfg_path: A path pointing to a toml file containg the config to be 
110            loaed.
111        beamtime: Integer id of a flash beamtime. If specifed, the hdf_path
112            values is set automatically to read data from that beamtime HDFs. 
113            If 'auto' is given, the beamtime path is autodetected from the
114            current working directory (if possible)
115        context (dict or list): a mapping of names to python objects to be used 
116            when instantiating sources and instruments. A list can also be given 
117            instead, provided that all objects in the list have a __name__ 
118            attribute. This can be used to specify custom DataSource types or 
119            preprocessor callables that are not part of the fab library. 
120            See the `fab.preprocessing` doc for more.
121        maxwell_setup: wheteher to start a dask distributed cluster. If running
122            on the maxwell HPC cluster, it will use it. Default True
123    '''
124
125    global cfg
126    global cfg_context
127    global dask_client
128    global dask_cluster
129
130    if cfg_path:
131        update_config(cfg_path)
132
133    logger.debug(f"Fab setup running on {os.uname().nodename}")
134
135    tele_log.warning("") # Are users still using this?
136
137    if beamtime:
138        from ..beamtime import autodetect_beamtime
139
140        logger.warning(f"Setting beamtime via fab_setup is deprectaed and will be removed in future versions. Set the beamtime parameter in the configuration file or use from fab.magic import beamtime to autodetect it")
141        if beamtime == 'auto' or beamtime is ...:
142            beamtime = autodetect_beamtime()
143        cfg['beamtime'] = beamtime
144
145    if context:
146        update_context(context)
147            
148    if maxwell_setup:
149        maxwell_start_if_not_already()
150
151    logger.debug(f"Loaded config:\n {cfg}")

Sets the configuration specified in cfg_path to the existing config All paramters are optional.

Arguments:
  • cfg_path: A path pointing to a toml file containg the config to be loaed.
  • beamtime: Integer id of a flash beamtime. If specifed, the hdf_path values is set automatically to read data from that beamtime HDFs. If 'auto' is given, the beamtime path is autodetected from the current working directory (if possible)
  • context (dict or list): a mapping of names to python objects to be used when instantiating sources and instruments. A list can also be given instead, provided that all objects in the list have a __name__ attribute. This can be used to specify custom DataSource types or preprocessor callables that are not part of the fab library. See the fab.preprocessing doc for more.
  • maxwell_setup: wheteher to start a dask distributed cluster. If running on the maxwell HPC cluster, it will use it. Default True