fab.maxwell
Takes care of automatically setting up dask in a way to use the maxwell cluster resources, when that makes sense. Otherwise a local distributed cluster is used.
The configuration file can be used to customize the cluster setup
Here you find a list of the available configuration parameters. You can override one or all of them in through your custom config file
[maxwell]
# Control wheter to scale the size of the cluster depending
# on need. If false, one node will be requested to slurm at
# module config and will always remain 'taken'
adapt = false
# Maximum number of maxwell nodes to use
max_nodes = 1
# Max running time of slrum job to be requested
walltime = "04:00:00",
# Maxwell partion to use when scheduling jobs. Make sure that
# you are using a partition for which you have permissions
partition = 'ps,psx'
# Minimum number of cores in the requested nodes
num_cores = 40
# Number of workes per node. Should be <= than num_cores
num_proc = 20
# Minumum amount of memory on the requested node
memory = '512G'
# Extra argument to be passed on to sbatch directly. Use to
# further configure slurm
job_extra_params = ['--output=slurm.out']
1''' 2Takes care of automatically setting up dask in a way to use 3the maxwell cluster resources, when that makes sense. Otherwise 4a local distributed cluster is used. 5 6The configuration file can be used to customize the cluster setup 7 8Here you find a list of the available configuration parameters. 9You can override one or all of them in through your custom config file 10 11 12```toml 13[maxwell] 14# Control wheter to scale the size of the cluster depending 15# on need. If false, one node will be requested to slurm at 16# module config and will always remain 'taken' 17adapt = false 18# Maximum number of maxwell nodes to use 19max_nodes = 1 20# Max running time of slrum job to be requested 21walltime = "04:00:00", 22# Maxwell partion to use when scheduling jobs. Make sure that 23# you are using a partition for which you have permissions 24partition = 'ps,psx' 25# Minimum number of cores in the requested nodes 26num_cores = 40 27# Number of workes per node. Should be <= than num_cores 28num_proc = 20 29# Minumum amount of memory on the requested node 30memory = '512G' 31# Extra argument to be passed on to sbatch directly. Use to 32# further configure slurm 33job_extra_params = ['--output=slurm.out'] 34``` 35 36''' 37 38import dask 39import dask.distributed as dd 40from .settings import cfg 41import socket 42import subprocess 43import fnmatch 44 45import logging 46logger = logging.getLogger(__name__) 47tele_log = logging.getLogger('fab.telemetry') 48 49def _match_any(string, patterns): 50 for pattern in patterns: 51 if fnmatch.fnmatch(string, pattern): 52 return True 53 return False 54 55def _get_user_partition(): 56 ''' Tries to figre out if the current user is part of ps or psx''' 57 ps = int(subprocess.check_output('getent netgroup max-ps2-users | grep ",$USER," | wc -l', shell=True)) 58 psx = int(subprocess.check_output('getent netgroup max-psx2-users | grep ",$USER," | wc -l', shell=True)) 59 60 if ps: 61 logger.debug('Autodetected partion ps') 62 tele_log.info('Autodetected partion ps') 63 return 'ps' 64 if psx: 65 logger.debug('Autodetected partion psx') 66 tele_log.info('Autodetected partion psx') 67 return 'psx' 68 raise ValueError("Could not detect user partion. Please specify it manually in config") 69 70def dask_maxwell_setup(): 71 ''' set up dask cluster to use maxwell resources, according to loaded configuration ''' 72 hostname = socket.gethostname() 73 #Check if we are running on a maxwell login node 74 if _match_any(hostname, cfg.maxwell.login_hostnames): 75 logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...") 76 77 from dask_jobqueue import SLURMCluster 78 79 cluster = SLURMCluster( 80 queue = cfg.maxwell.partition or _get_user_partition(), 81 cores = cfg.maxwell.num_cores, 82 processes = cfg.maxwell.num_proc or None, 83 memory = cfg.maxwell.memory, 84 #local_directory = "/tmp/dask", 85 log_directory = cfg.maxwell.log_directory or None, 86 walltime = cfg.maxwell.walltime, 87 worker_extra_args=['--memory-limit=0'], 88 #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"], 89 job_extra_directives=cfg.maxwell.job_extra_params 90 ) 91 cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 92 maximum_jobs= 1, 93 interval='10s', wait_count=12) 94 95 client = dd.Client(cluster) 96 else: 97 import multiprocessing 98 logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler") 99 100 if hostname.startswith('max-'): 101 # We are running on a maxwell node, use the proper hostname so that the daskboard link works 102 cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname) 103 else: 104 cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count()) 105 client = dd.Client(cluster) 106 107 #Dask config 108 dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks}) 109 110 return cluster, client
logger =
<Logger fab.maxwell (INFO)>
tele_log =
<Logger fab.telemetry (INFO)>
def
dask_maxwell_setup():
71def dask_maxwell_setup(): 72 ''' set up dask cluster to use maxwell resources, according to loaded configuration ''' 73 hostname = socket.gethostname() 74 #Check if we are running on a maxwell login node 75 if _match_any(hostname, cfg.maxwell.login_hostnames): 76 logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...") 77 78 from dask_jobqueue import SLURMCluster 79 80 cluster = SLURMCluster( 81 queue = cfg.maxwell.partition or _get_user_partition(), 82 cores = cfg.maxwell.num_cores, 83 processes = cfg.maxwell.num_proc or None, 84 memory = cfg.maxwell.memory, 85 #local_directory = "/tmp/dask", 86 log_directory = cfg.maxwell.log_directory or None, 87 walltime = cfg.maxwell.walltime, 88 worker_extra_args=['--memory-limit=0'], 89 #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"], 90 job_extra_directives=cfg.maxwell.job_extra_params 91 ) 92 cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 93 maximum_jobs= 1, 94 interval='10s', wait_count=12) 95 96 client = dd.Client(cluster) 97 else: 98 import multiprocessing 99 logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler") 100 101 if hostname.startswith('max-'): 102 # We are running on a maxwell node, use the proper hostname so that the daskboard link works 103 cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname) 104 else: 105 cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count()) 106 client = dd.Client(cluster) 107 108 #Dask config 109 dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks}) 110 111 return cluster, client
set up dask cluster to use maxwell resources, according to loaded configuration