fab.settings.setup
1import os 2import glob 3import importlib 4import warnings 5from pprint import pformat 6 7from config import config, ConfigurationSet 8ConfigurationSet.__str__ = lambda self: pformat(self.as_dict()) #Monkeypatch __str__ for better printing of config 9ConfigurationSet.__repr__ = lambda self: pformat(self.as_dict()) #Monkeypatch __repr__ for better printing of config 10 11#Loads config from main package folder 12default_config = os.path.join(os.path.dirname(__file__), '../config.toml') 13 14#Set up global variables 15cfg = config(default_config) 16cfg_context = {} 17dask_client = None 18dask_cluster = None 19 20 21#Set up logging 22from .log_handlers import get_console_handler, get_telemetry_handler 23import logging 24 25logger = logging.getLogger('fab') 26logger.setLevel(cfg.logging_level) 27logger.addHandler(get_console_handler()) 28 29# Empty logger for telemetry, since when this is exectued the user config is not loaded yet 30# Proper setup is done only when update_config is called 31tele_log = logging.getLogger('fab.telemetry') 32tele_log.propagate = False # Don't propagate to root logge, telemetry is separated 33tele_log.addHandler(logging.NullHandler()) 34 35 36#turns a list of functions or classes into a dictionary of {name: function} 37def _get_context_from_list(objects): 38 if objects is None: 39 return {} 40 elif isinstance(objects, dict): 41 return objects 42 elif isinstance(objects, list): 43 return {f.__name__: f for f in objects} 44 else: 45 raise TypeError(f"objects must be a list or a dict, not {type(objects)}") 46 47def get_config_object(obj_name): 48 try: # Try looking up the name in the configuration context 49 return cfg_context[obj_name] 50 except KeyError: 51 pass 52 53 tele_log.info(f"Config lookup for {obj_name}") #Only log lookups internal to fab 54 55 #If not, check for the name in one of fab submodules 56 path = obj_name.split('.') 57 if path[0] != 'fab': 58 raise ValueError(f"{obj_name} was not found in context and is not in fab namespace") 59 60 return getattr( importlib.import_module('.'.join(path[:-1])), 61 path[-1]) 62 63def update_context(context): 64 ''' Updates the global context with the given dictionary or list of objects 65 ''' 66 tele_log.info("") 67 68 global cfg_context 69 cfg_context.update(_get_context_from_list(context)) 70 71def update_config(cfg_path): 72 ''' Updates the global config with the given toml file 73 ''' 74 global cfg 75 cfg.update(config(str(cfg_path), dict(cfg))) 76 logger.setLevel(cfg.logging_level) 77 logger.info(f"Loading config from {cfg_path}") 78 79 #Set up telemetry logger 80 if cfg.telemetry: 81 tele_log.addHandler(get_telemetry_handler(cfg.telemetry_path)) 82 83def maxwell_start_if_not_already(): 84 ''' Starts a dask cluster if not already running 85 ''' 86 global dask_client 87 global dask_cluster 88 89 if dask_client is None: 90 from ..maxwell import dask_maxwell_setup 91 with warnings.catch_warnings(): #Suppress dask "cluster already running?" warning 92 warnings.simplefilter("ignore") 93 dask_cluster, dask_client = dask_maxwell_setup() 94 95 if cfg.display_dask_cluster: 96 try: 97 display(dask_client) # Raises NameError outside of ipython 98 except NameError: 99 print(f"Started dask: {dask_client}") 100 101#Loads additional config from toml 102def fab_setup(cfg_path=None, *args, beamtime=None, 103 context: dict | list = None, maxwell_setup = True): 104 ''' Sets the configuration specified in cfg_path to the existing config 105 All paramters are optional. 106 107 Args: 108 cfg_path: A path pointing to a toml file containg the config to be 109 loaed. 110 beamtime: Integer id of a flash beamtime. If specifed, the hdf_path 111 values is set automatically to read data from that beamtime HDFs. 112 If 'auto' is given, the beamtime path is autodetected from the 113 current working directory (if possible) 114 context (dict or list): a mapping of names to python objects to be used 115 when instantiating sources and instruments. A list can also be given 116 instead, provided that all objects in the list have a __name__ 117 attribute. This can be used to specify custom DataSource types or 118 preprocessor callables that are not part of the fab library. 119 See the `fab.preprocessing` doc for more. 120 maxwell_setup: wheteher to start a dask distributed cluster. If running 121 on the maxwell HPC cluster, it will use it. Default True 122 ''' 123 124 global cfg 125 global cfg_context 126 global dask_client 127 global dask_cluster 128 129 if cfg_path: 130 update_config(cfg_path) 131 132 logger.debug(f"Fab setup running on {os.uname().nodename}") 133 134 tele_log.warning("") # Are users still using this? 135 136 if beamtime: 137 from ..beamtime import autodetect_beamtime 138 139 logger.warning(f"Setting beamtime via fab_setup is deprectaed and will be removed in future versions. Set the beamtime parameter in the configuration file or use from fab.magic import beamtime to autodetect it") 140 if beamtime == 'auto' or beamtime is ...: 141 beamtime = autodetect_beamtime() 142 cfg['beamtime'] = beamtime 143 144 if context: 145 update_context(context) 146 147 if maxwell_setup: 148 maxwell_start_if_not_already() 149 150 logger.debug(f"Loaded config:\n {cfg}")
default_config =
'/home/leverfab/flashanalysis/fab/settings/../config.toml'
cfg =
{'auto_preload': True,
'autoreload_hdf': True,
'beamtime': '',
'btm.styler.collapse_cols': True,
'btm.styler.hide_run_thr': 0,
'btm.styler.mark_aborted': True,
'btm.styler.mark_in_progress': True,
'btm.styler.mark_unique': ['type'],
'btm.styler.max_col_width': '30ch',
'btm.styler.shot_count_hist': True,
'btm.styler.show_on_hover': True,
'dask.split_large_chunks': False,
'display_dask_cluster': True,
'export_daq_run': True,
'hdf_path': '',
'idx_path': '',
'instruments.null.null': nan,
'logging_level': 'INFO',
'maxwell.adapt': False,
'maxwell.job_extra_params': ['--output=fab-slurm.out', '-J fab-worker'],
'maxwell.log_directory': '',
'maxwell.login_hostnames': ['max-display*',
'max-wgse*',
'max-cssb-display*',
'max-fs-display*',
'max-hzgg*'],
'maxwell.max_nodes': 1,
'maxwell.memory': '512G',
'maxwell.num_cores': 40,
'maxwell.num_proc': 40,
'maxwell.partition': '',
'maxwell.walltime': '08:00:00',
'preload_path': '',
'sources.null.null': nan,
'telemetry': True,
'telemetry_path': '/gpfs/petra3/scratch/fab/'}
cfg_context =
{}
dask_client =
<Client: 'tcp://131.169.168.144:44302' processes=0 threads=0, memory=0 B>
dask_cluster =
SLURMCluster(e4ee2cf4, 'tcp://131.169.168.144:44302', workers=0, threads=0, memory=0 B)
logger =
<Logger fab (INFO)>
tele_log =
<Logger fab.telemetry (INFO)>
def
get_config_object(obj_name):
48def get_config_object(obj_name): 49 try: # Try looking up the name in the configuration context 50 return cfg_context[obj_name] 51 except KeyError: 52 pass 53 54 tele_log.info(f"Config lookup for {obj_name}") #Only log lookups internal to fab 55 56 #If not, check for the name in one of fab submodules 57 path = obj_name.split('.') 58 if path[0] != 'fab': 59 raise ValueError(f"{obj_name} was not found in context and is not in fab namespace") 60 61 return getattr( importlib.import_module('.'.join(path[:-1])), 62 path[-1])
def
update_context(context):
64def update_context(context): 65 ''' Updates the global context with the given dictionary or list of objects 66 ''' 67 tele_log.info("") 68 69 global cfg_context 70 cfg_context.update(_get_context_from_list(context))
Updates the global context with the given dictionary or list of objects
def
update_config(cfg_path):
72def update_config(cfg_path): 73 ''' Updates the global config with the given toml file 74 ''' 75 global cfg 76 cfg.update(config(str(cfg_path), dict(cfg))) 77 logger.setLevel(cfg.logging_level) 78 logger.info(f"Loading config from {cfg_path}") 79 80 #Set up telemetry logger 81 if cfg.telemetry: 82 tele_log.addHandler(get_telemetry_handler(cfg.telemetry_path))
Updates the global config with the given toml file
def
maxwell_start_if_not_already():
84def maxwell_start_if_not_already(): 85 ''' Starts a dask cluster if not already running 86 ''' 87 global dask_client 88 global dask_cluster 89 90 if dask_client is None: 91 from ..maxwell import dask_maxwell_setup 92 with warnings.catch_warnings(): #Suppress dask "cluster already running?" warning 93 warnings.simplefilter("ignore") 94 dask_cluster, dask_client = dask_maxwell_setup() 95 96 if cfg.display_dask_cluster: 97 try: 98 display(dask_client) # Raises NameError outside of ipython 99 except NameError: 100 print(f"Started dask: {dask_client}")
Starts a dask cluster if not already running
def
fab_setup( cfg_path=None, *args, beamtime=None, context: dict | list = None, maxwell_setup=True):
103def fab_setup(cfg_path=None, *args, beamtime=None, 104 context: dict | list = None, maxwell_setup = True): 105 ''' Sets the configuration specified in cfg_path to the existing config 106 All paramters are optional. 107 108 Args: 109 cfg_path: A path pointing to a toml file containg the config to be 110 loaed. 111 beamtime: Integer id of a flash beamtime. If specifed, the hdf_path 112 values is set automatically to read data from that beamtime HDFs. 113 If 'auto' is given, the beamtime path is autodetected from the 114 current working directory (if possible) 115 context (dict or list): a mapping of names to python objects to be used 116 when instantiating sources and instruments. A list can also be given 117 instead, provided that all objects in the list have a __name__ 118 attribute. This can be used to specify custom DataSource types or 119 preprocessor callables that are not part of the fab library. 120 See the `fab.preprocessing` doc for more. 121 maxwell_setup: wheteher to start a dask distributed cluster. If running 122 on the maxwell HPC cluster, it will use it. Default True 123 ''' 124 125 global cfg 126 global cfg_context 127 global dask_client 128 global dask_cluster 129 130 if cfg_path: 131 update_config(cfg_path) 132 133 logger.debug(f"Fab setup running on {os.uname().nodename}") 134 135 tele_log.warning("") # Are users still using this? 136 137 if beamtime: 138 from ..beamtime import autodetect_beamtime 139 140 logger.warning(f"Setting beamtime via fab_setup is deprectaed and will be removed in future versions. Set the beamtime parameter in the configuration file or use from fab.magic import beamtime to autodetect it") 141 if beamtime == 'auto' or beamtime is ...: 142 beamtime = autodetect_beamtime() 143 cfg['beamtime'] = beamtime 144 145 if context: 146 update_context(context) 147 148 if maxwell_setup: 149 maxwell_start_if_not_already() 150 151 logger.debug(f"Loaded config:\n {cfg}")
Sets the configuration specified in cfg_path to the existing config All paramters are optional.
Arguments:
- cfg_path: A path pointing to a toml file containg the config to be loaed.
- beamtime: Integer id of a flash beamtime. If specifed, the hdf_path values is set automatically to read data from that beamtime HDFs. If 'auto' is given, the beamtime path is autodetected from the current working directory (if possible)
- context (dict or list): a mapping of names to python objects to be used
when instantiating sources and instruments. A list can also be given
instead, provided that all objects in the list have a __name__
attribute. This can be used to specify custom DataSource types or
preprocessor callables that are not part of the fab library.
See the
fab.preprocessing
doc for more. - maxwell_setup: wheteher to start a dask distributed cluster. If running on the maxwell HPC cluster, it will use it. Default True