fab.maxwell

Takes care of automatically setting up dask in a way to use the maxwell cluster resources, when that makes sense. Otherwise a local distributed cluster is used.

The configuration file can be used to customize the cluster setup

Here you find a list of the available configuration parameters. You can override one or all of them in through your custom config file

[maxwell]
# Control wheter to scale the size of the cluster depending 
# on need. If false, one node will be requested to slurm at
# module config and will always remain 'taken'
adapt = false 
# Maximum number of maxwell nodes to use
max_nodes = 1
# Max running time of slrum job to be requested
walltime = "04:00:00", 
# Maxwell partion to use when scheduling jobs. Make sure that
# you are using a partition for which you have permissions
partition  = 'ps,psx'
# Minimum number of cores in the requested nodes
num_cores  = 40
# Number of workes per node. Should be <= than num_cores
num_proc   = 20           
# Minumum amount of memory on the requested node
memory     = '512G'
# Extra argument to be passed on to sbatch directly. Use to
# further configure slurm
job_extra_params = ['--output=slurm.out']
  1'''
  2Takes care of automatically setting up dask in a way to use
  3the maxwell cluster resources, when that makes sense. Otherwise
  4a local distributed cluster is used.
  5
  6The configuration file can be used to customize the cluster setup
  7
  8Here you find a list of the available configuration parameters.
  9You can override one or all of them in through your custom config file
 10
 11
 12```toml
 13[maxwell]
 14# Control wheter to scale the size of the cluster depending 
 15# on need. If false, one node will be requested to slurm at
 16# module config and will always remain 'taken'
 17adapt = false 
 18# Maximum number of maxwell nodes to use
 19max_nodes = 1
 20# Max running time of slrum job to be requested
 21walltime = "04:00:00", 
 22# Maxwell partion to use when scheduling jobs. Make sure that
 23# you are using a partition for which you have permissions
 24partition  = 'ps,psx'
 25# Minimum number of cores in the requested nodes
 26num_cores  = 40
 27# Number of workes per node. Should be <= than num_cores
 28num_proc   = 20           
 29# Minumum amount of memory on the requested node
 30memory     = '512G'
 31# Extra argument to be passed on to sbatch directly. Use to
 32# further configure slurm
 33job_extra_params = ['--output=slurm.out']
 34```
 35
 36'''
 37
 38import dask
 39import dask.distributed as dd
 40from .settings import cfg
 41import socket
 42import subprocess
 43import fnmatch
 44
 45import logging
 46logger = logging.getLogger(__name__)
 47tele_log = logging.getLogger('fab.telemetry')
 48
 49def _match_any(string, patterns):
 50    for pattern in patterns:
 51        if fnmatch.fnmatch(string, pattern):
 52            return True
 53    return False
 54
 55def _get_user_partition():
 56    ''' Tries to figre out if the current user is part of ps or psx'''
 57    ps = int(subprocess.check_output('getent netgroup max-ps2-users | grep ",$USER," | wc -l', shell=True))
 58    psx = int(subprocess.check_output('getent netgroup max-psx2-users | grep ",$USER," | wc -l', shell=True))
 59    
 60    if ps:
 61        logger.debug('Autodetected partion ps')
 62        tele_log.info('Autodetected partion ps')
 63        return 'ps'
 64    if psx:
 65        logger.debug('Autodetected partion psx')
 66        tele_log.info('Autodetected partion psx')
 67        return 'psx'
 68    raise ValueError("Could not detect user partion. Please specify it manually in config")
 69
 70def dask_maxwell_setup():
 71    ''' set up dask cluster to use maxwell resources, according to loaded configuration '''
 72    hostname = socket.gethostname()
 73    #Check if we are running on a maxwell login node
 74    if _match_any(hostname, cfg.maxwell.login_hostnames):
 75        logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...")
 76
 77        from dask_jobqueue import SLURMCluster
 78
 79        cluster = SLURMCluster(
 80            queue = cfg.maxwell.partition or _get_user_partition(),
 81            cores = cfg.maxwell.num_cores,
 82            processes = cfg.maxwell.num_proc or None,
 83            memory = cfg.maxwell.memory,
 84            #local_directory = "/tmp/dask",
 85            log_directory = cfg.maxwell.log_directory or None,
 86            walltime = cfg.maxwell.walltime, 
 87            worker_extra_args=['--memory-limit=0'],
 88            #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"],
 89            job_extra_directives=cfg.maxwell.job_extra_params
 90        )
 91        cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 
 92                      maximum_jobs= 1,
 93                      interval='10s', wait_count=12)
 94
 95        client = dd.Client(cluster)
 96    else:
 97        import multiprocessing
 98        logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler")
 99
100        if hostname.startswith('max-'): 
101            # We are running on a maxwell node, use the proper hostname so that the daskboard link works
102            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname)
103        else:
104            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count())
105        client = dd.Client(cluster)
106
107    #Dask config
108    dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks})
109
110    return cluster, client
logger = <Logger fab.maxwell (INFO)>
tele_log = <Logger fab.telemetry (INFO)>
def dask_maxwell_setup():
 71def dask_maxwell_setup():
 72    ''' set up dask cluster to use maxwell resources, according to loaded configuration '''
 73    hostname = socket.gethostname()
 74    #Check if we are running on a maxwell login node
 75    if _match_any(hostname, cfg.maxwell.login_hostnames):
 76        logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...")
 77
 78        from dask_jobqueue import SLURMCluster
 79
 80        cluster = SLURMCluster(
 81            queue = cfg.maxwell.partition or _get_user_partition(),
 82            cores = cfg.maxwell.num_cores,
 83            processes = cfg.maxwell.num_proc or None,
 84            memory = cfg.maxwell.memory,
 85            #local_directory = "/tmp/dask",
 86            log_directory = cfg.maxwell.log_directory or None,
 87            walltime = cfg.maxwell.walltime, 
 88            worker_extra_args=['--memory-limit=0'],
 89            #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"],
 90            job_extra_directives=cfg.maxwell.job_extra_params
 91        )
 92        cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 
 93                      maximum_jobs= 1,
 94                      interval='10s', wait_count=12)
 95
 96        client = dd.Client(cluster)
 97    else:
 98        import multiprocessing
 99        logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler")
100
101        if hostname.startswith('max-'): 
102            # We are running on a maxwell node, use the proper hostname so that the daskboard link works
103            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname)
104        else:
105            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count())
106        client = dd.Client(cluster)
107
108    #Dask config
109    dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks})
110
111    return cluster, client

set up dask cluster to use maxwell resources, according to loaded configuration