Commit 0e92f6d1 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Improve dask setup (closes #110, closes #26)

* Add context manager approach
* Add commandline option to choose scheduler

# Conflicts:
#	climix/main.py
parent 9d9e4017
# -*- coding: utf-8 -*-
from collections import OrderedDict
import dask
from dask.distributed import Client, LocalCluster
# from dask_jobqueue import SLURMCluster
class DistributedLocalClusterScheduler:
def __init__(self):
self.cluster = LocalCluster(local_directory='/dev/shm/dask-worker-space')
self.client = Client(self.cluster)
def __enter__(self):
self.cluster.__enter__()
self.client.__enter__()
return self
def __exit__(self, type, value, traceback):
self.client.__exit__(type, value, traceback)
self.cluster.__exit__(type, value, traceback)
class LocalThreadsScheduler:
def __enter__(self):
dask.config.set(scheduler='threads')
return self
def __exit__(self, type, value, traceback):
pass
SCHEDULERS = OrderedDict([
('distributed-local-cluster', DistributedLocalClusterScheduler),
('threaded', LocalThreadsScheduler),
])
def setup_scheduler(args):
scheduler = SCHEDULERS[args.dask_scheduler]
return scheduler()
......@@ -5,14 +5,13 @@ import argparse
import os
import time
from dask.distributed import Client, LocalCluster
# from dask_jobqueue import SLURMCluster
import iris
import numpy as np
import pkg_resources
import yaml
import climix
from .dask_setup import SCHEDULERS, setup_scheduler
from .index import Index
from .index_functions import SUPPORTED_OPERATORS, SUPPORTED_REDUCERS
from .period import PeriodSpecification
......@@ -27,6 +26,11 @@ MISSVAL = 1.0e20
def parse_args():
parser = argparse.ArgumentParser(
description=(f'A climate index thing, version {climix.__version__}.'))
parser.add_argument('-d', '--dask-scheduler', choices=SCHEDULERS.keys(),
default='distributed-local-cluster')
parser.add_argument('-k', '--keep-open', action='store_true',
help='keep climix running until key press '
'(useful for debugging)')
parser.add_argument('-s', '--sliced-mode', action='store_true',
help='activate calculation per period to avoid memory '
'problems')
......@@ -40,14 +44,6 @@ def parse_args():
return parser.parse_args()
def setup_cluster():
cluster = LocalCluster()
# cluster = SLURMCluster()
# cluster.adapt(minimum=10, maximum=100)
# cluster.scale(10)
return cluster
def ignore_cb(cube, field, filename):
cube.attributes.pop('creation_date', None)
cube.attributes.pop('tracking_id', None)
......@@ -183,19 +179,17 @@ def do_main(requested_indices, datafiles, output_template, sliced_mode):
def main():
args = parse_args()
cluster = setup_cluster()
client = Client(cluster)
print('Cluster ready; starting main program.')
try:
with setup_scheduler(args) as scheduler:
logging.info('Scheduler ready; starting main program.')
start = time.time()
do_main(args.indices, args.datafiles,
args.output_template, args.sliced_mode)
end = time.time()
input(f'Calculation took {end-start:.4f} seconds.\n'
'Press enter to close the cluster ')
finally:
client.close()
cluster.close()
try:
do_main(args.indices, args.datafiles,
args.output_template, args.sliced_mode)
finally:
end = time.time()
logging.info(f'Calculation took {end-start:.4f} seconds.')
if args.keep_open:
input('Press enter to close the cluster ')
if __name__ == "__main__":
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment