Commit 0d047211 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Add hpc schedulers (closes #200)

parent 00809602
......@@ -85,8 +85,21 @@ class DistributedLocalClusterScheduler:
self.cluster.__exit__(type, value, traceback)
class ExternalScheduler:
def __init__(self, scheduler_file, **kwargs):
self.scheduler_file = scheduler_file
self.client = Client(scheduler_file=scheduler_file)
def __enter__(self):
self.client.__enter__()
return self
def __exit__(self, type, value, traceback):
self.client.__exit__(type, value, traceback)
class LocalThreadsScheduler:
def __init__(self):
def __init__(self, **kwargs):
self.client = None
def __enter__(self):
......@@ -97,8 +110,28 @@ class LocalThreadsScheduler:
pass
class MPIScheduler:
def __init__(self, **kwargs):
from dask_mpi import initialize
n_workers = 4 # tasks-per-node from scheduler
n_threads = 4 # cpus-per-task from scheduler
memory_limit = (system.MEMORY_LIMIT*.9) / n_workers
initialize('ib0',
nthreads=n_threads,
local_directory='/scratch/local',
memory_limit=memory_limit,)
self.client = Client()
def __enter__(self):
self.client.__enter__()
return self
def __exit__(self, type, value, traceback):
self.client.__exit__(type, value, traceback)
class SingleThreadedScheduler:
def __init__(self):
def __init__(self, **kwargs):
self.client = None
def __enter__(self):
......@@ -111,11 +144,17 @@ class SingleThreadedScheduler:
SCHEDULERS = OrderedDict([
('distributed-local-cluster', DistributedLocalClusterScheduler),
('external', ExternalScheduler),
('threaded', LocalThreadsScheduler),
('mpi', MPIScheduler),
('single-threaded', SingleThreadedScheduler),
])
def setup_scheduler(args):
scheduler = SCHEDULERS[args.dask_scheduler]
return scheduler()
scheduler_spec = args.dask_scheduler.split(':')
scheduler_name = scheduler_spec[0]
scheduler_kwargs = {k: v for k, v in (e.split('=')
for e in scheduler_spec[1:])}
scheduler = SCHEDULERS[scheduler_name]
return scheduler(**scheduler_kwargs)
......@@ -35,7 +35,7 @@ def parse_args():
parser.add_argument('-e', '--deactivate-error-tracking',
action='store_true',
help='deactivate sentry based error tracking')
parser.add_argument('-d', '--dask-scheduler', choices=SCHEDULERS.keys(),
parser.add_argument('-d', '--dask-scheduler',
default='distributed-local-cluster')
parser.add_argument('-k', '--keep-open', action='store_true',
help='keep climix running until key press '
......
#!/bin/bash
#
#SBATCH -J climix-test
#SBATCH -t 06:00:00
#SBATCH -n 64
#SBATCH --exclusive
#SBATCH --cpus-per-task=8
#
# SLURM_CPUS_ON_NODE
# SLURM_CPUS_PER_TASK
# SLURM_JOB_ID
# SLURM_JOB_CPUS_PER_NODE
# SLURM_JOB_NAME
# SLURM_JOB_NODELIST
# SLURM_JOB_NUM_NODES
# SLURM_LOCALID
# SLURM_MEM_PER_NODE
# SLURM_NODEID
# SLURM_NTASKS
# SLURM_NTASKS_PER_CORE
# SLURM_NTASKS_PER_NODE
# SLURM_NTASKS_PER_SOCKET
# SLURM_HET_SIZE
# SLURM_PROCID
# SLURM_TASKS_PER_NODE
# SLURM_TASK_PID
# SLURMD_NODENAME
NO_SCHEDULERS=1
NO_PROGRAM=1
NO_WORKERS=$(($SLURM_NTASKS - $NO_SCHEDULERS - $NO_PROGRAM))
MEM_PER_WORKER=$(echo "$SLURM_CPUS_PER_TASK * $SLURM_MEM_PER_CPU * .9" |bc -l)
echo "Number of workers: $NO_WORKERS, memory: $MEM_PER_WORKER"
__conda_setup="$(CONDA_REPORT_ERRORS=false '/nobackup/rossby20/rossby/software/conda/bi/miniconda3/bin/conda' shell.bash hook 2> /dev/null)"
if [ $? -eq 0 ]; then
\eval "$__conda_setup"
else
if [ -f "/nobackup/rossby20/rossby/software/conda/bi/miniconda3/etc/profile.d/conda.sh" ]; then
. "/nobackup/rossby20/rossby/software/conda/bi/miniconda3/etc/profile.d/conda.sh"
CONDA_CHANGEPS1=false conda activate base
else
\export PATH="/nobackup/rossby20/rossby/software/conda/bi/miniconda3/bin:$PATH"
fi
fi
unset __conda_setup
conda activate climix-devel-4
COORDINATE_DIR=/nobackup/rossby24/users/sm_klazi/test_climix/mpi_external
cd $COORDINATE_DIR
SCHEDULER_FILE=$COORDINATE_DIR/scheduler-$SLURM_JOB_ID.json
# Start scheduler
srun -N 1 --ntasks $NO_SCHEDULERS \
dask-scheduler \
--interface ib0 \
--scheduler-file $SCHEDULER_FILE &
# Start workers
srun --ntasks $NO_WORKERS \
dask-worker \
--interface ib0 \
--scheduler-file $SCHEDULER_FILE \
--memory-limit "${MEM_PER_WORKER}MB" \
--nthreads $(($SLURM_CPUS_PER_TASK / 2 - 1)) &
# Start program
srun -N 1 --ntasks $NO_PROGRAM \
climix -i -e -d external:scheduler_file=$SCHEDULER_FILE -x tn10p \
/home/rossby/imports/obs/EOBS/EOBS19/orig/tn_ens_mean_0.1deg_reg_v19.0e.nc
# wait
# Script ends here
#!/bin/bash
#
#SBATCH -J climix-test
#SBATCH -t 06:00:00
#SBATCH -N 16
#SBATCH --exclusive
#SBATCH --ntasks-per-node=4
#
__conda_setup="$(CONDA_REPORT_ERRORS=false '/nobackup/rossby20/rossby/software/conda/bi/miniconda3/bin/conda' shell.bash hook 2> /dev/null)"
if [ $? -eq 0 ]; then
\eval "$__conda_setup"
else
if [ -f "/nobackup/rossby20/rossby/software/conda/bi/miniconda3/etc/profile.d/conda.sh" ]; then
. "/nobackup/rossby20/rossby/software/conda/bi/miniconda3/etc/profile.d/conda.sh"
CONDA_CHANGEPS1=false conda activate base
else
\export PATH="/nobackup/rossby20/rossby/software/conda/bi/miniconda3/bin:$PATH"
fi
fi
unset __conda_setup
conda activate climix-devel-4
cd /nobackup/rossby24/users/sm_klazi/test_climix/mpi
mpirun climix -i -e -d mpi -x tn10p \
/home/rossby/imports/obs/EOBS/EOBS19/orig/tn_ens_mean_0.1deg_reg_v19.0e.nc
# Script ends here
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment