
Takes care of automatically setting up dask in a way to use the maxwell cluster resources, when that makes sense. Otherwise a local distributed cluster is used.

The configuration file can be used to customize the cluster setup

Here you find a list of the available configuration parameters. You can override one or all of them in through your custom config file

# Control wheter to scale the size of the cluster depending 
# on need. If false, one node will be requested to slurm at
# module config and will always remain 'taken'
adapt = false 
# Maximum number of maxwell nodes to use
max_nodes = 1
# Max running time of slrum job to be requested
walltime = "04:00:00", 
# Maxwell partion to use when scheduling jobs. Make sure that
# you are using a partition for which you have permissions
partition  = 'ps,psx'
# Minimum number of cores in the requested nodes
num_cores  = 40
# Number of workes per node. Should be <= than num_cores
num_proc   = 20           
# Minumum amount of memory on the requested node
memory     = '512G'
# Extra argument to be passed on to sbatch directly. Use to
# further configure slurm
job_extra_params = ['--output=slurm.out']
  2Takes care of automatically setting up dask in a way to use
  3the maxwell cluster resources, when that makes sense. Otherwise
  4a local distributed cluster is used.
  6The configuration file can be used to customize the cluster setup
  8Here you find a list of the available configuration parameters.
  9You can override one or all of them in through your custom config file
 14# Control wheter to scale the size of the cluster depending 
 15# on need. If false, one node will be requested to slurm at
 16# module config and will always remain 'taken'
 17adapt = false 
 18# Maximum number of maxwell nodes to use
 19max_nodes = 1
 20# Max running time of slrum job to be requested
 21walltime = "04:00:00", 
 22# Maxwell partion to use when scheduling jobs. Make sure that
 23# you are using a partition for which you have permissions
 24partition  = 'ps,psx'
 25# Minimum number of cores in the requested nodes
 26num_cores  = 40
 27# Number of workes per node. Should be <= than num_cores
 28num_proc   = 20           
 29# Minumum amount of memory on the requested node
 30memory     = '512G'
 31# Extra argument to be passed on to sbatch directly. Use to
 32# further configure slurm
 33job_extra_params = ['--output=slurm.out']
 38import dask
 39import dask.distributed as dd
 40from .settings import cfg
 41import socket
 42import subprocess
 43import fnmatch
 45import logging
 46logger = logging.getLogger(__name__)
 47tele_log = logging.getLogger('fab.telemetry')
 49def _match_any(string, patterns):
 50    for pattern in patterns:
 51        if fnmatch.fnmatch(string, pattern):
 52            return True
 53    return False
 55def _get_user_partition():
 56    ''' Tries to figre out if the current user is part of ps or psx'''
 57    ps = int(subprocess.check_output('getent netgroup max-ps2-users | grep ",$USER," | wc -l', shell=True))
 58    psx = int(subprocess.check_output('getent netgroup max-psx2-users | grep ",$USER," | wc -l', shell=True))
 60    if ps:
 61        logger.debug('Autodetected partion ps')
 62        tele_log.info('Autodetected partion ps')
 63        return 'ps'
 64    if psx:
 65        logger.debug('Autodetected partion psx')
 66        tele_log.info('Autodetected partion psx')
 67        return 'psx'
 68    raise ValueError("Could not detect user partion. Please specify it manually in config")
 70def dask_maxwell_setup():
 71    ''' set up dask cluster to use maxwell resources, according to loaded configuration '''
 72    hostname = socket.gethostname()
 73    #Check if we are running on a maxwell login node
 74    if _match_any(hostname, cfg.maxwell.login_hostnames):
 75        logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...")
 77        from dask_jobqueue import SLURMCluster
 79        cluster = SLURMCluster(
 80            queue = cfg.maxwell.partition or _get_user_partition(),
 81            cores = cfg.maxwell.num_cores,
 82            processes = cfg.maxwell.num_proc or None,
 83            memory = cfg.maxwell.memory,
 84            #local_directory = "/tmp/dask",
 85            log_directory = cfg.maxwell.log_directory or None,
 86            walltime = cfg.maxwell.walltime, 
 87            worker_extra_args=['--memory-limit=0'],
 88            #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"],
 89            job_extra_directives=cfg.maxwell.job_extra_params
 90        )
 91        cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 
 92                      maximum_jobs= 1,
 93                      interval='10s', wait_count=12)
 95        client = dd.Client(cluster)
 96    else:
 97        import multiprocessing
 98        logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler")
100        if hostname.startswith('max-'): 
101            # We are running on a maxwell node, use the proper hostname so that the daskboard link works
102            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname)
103        else:
104            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count())
105        client = dd.Client(cluster)
107    #Dask config
108    dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks})
110    return cluster, client
logger = <Logger fab.maxwell (INFO)>
tele_log = <Logger fab.telemetry (INFO)>
def dask_maxwell_setup():
 71def dask_maxwell_setup():
 72    ''' set up dask cluster to use maxwell resources, according to loaded configuration '''
 73    hostname = socket.gethostname()
 74    #Check if we are running on a maxwell login node
 75    if _match_any(hostname, cfg.maxwell.login_hostnames):
 76        logger.info("Looks like we are running on a maxwell submission node, configuring dask slurm cluster...")
 78        from dask_jobqueue import SLURMCluster
 80        cluster = SLURMCluster(
 81            queue = cfg.maxwell.partition or _get_user_partition(),
 82            cores = cfg.maxwell.num_cores,
 83            processes = cfg.maxwell.num_proc or None,
 84            memory = cfg.maxwell.memory,
 85            #local_directory = "/tmp/dask",
 86            log_directory = cfg.maxwell.log_directory or None,
 87            walltime = cfg.maxwell.walltime, 
 88            worker_extra_args=['--memory-limit=0'],
 89            #worker_extra_args=["--lifetime", "3m", "--lifetime-stagger", "1m"],
 90            job_extra_directives=cfg.maxwell.job_extra_params
 91        )
 92        cluster.adapt(minimum_jobs = 0 if cfg.maxwell.adapt else 1, 
 93                      maximum_jobs= 1,
 94                      interval='10s', wait_count=12)
 96        client = dd.Client(cluster)
 97    else:
 98        import multiprocessing
 99        logger.info("Maxwell submission node not detected, configuring local dask distributed scheduler")
101        if hostname.startswith('max-'): 
102            # We are running on a maxwell node, use the proper hostname so that the daskboard link works
103            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count(), host=hostname)
104        else:
105            cluster = dd.LocalCluster(memory_limit=None, n_workers=multiprocessing.cpu_count())
106        client = dd.Client(cluster)
108    #Dask config
109    dask.config.set(**{'array.slicing.split_large_chunks': cfg.dask.split_large_chunks})
111    return cluster, client

set up dask cluster to use maxwell resources, according to loaded configuration