Commit 00809602 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Account for hyperthreading in LocalCluster scheduler

parent d7254e02
# -*- coding: utf-8 -*-
from collections import OrderedDict
import glob
import os
import sys
import dask
from dask.distributed import Client, LocalCluster, wait, system
from dask.distributed import progress as distributed_progress
# from dask_jobqueue import SLURMCluster
import psutil
def progress(fs):
......@@ -17,9 +20,59 @@ def progress(fs):
return fs
def cpu_count_physical():
# Adapted from psutil
"""Return the number of physical cores in the system."""
IDS = ["physical_package_id",
# Method #1
core_ids = set()
for path in glob.glob(
core_id = []
for id in IDS:
id_path = os.path.join(path, id)
if os.path.exists(id_path):
with open(id_path) as f:
result = len(core_ids)
if result != 0:
return result
return None
def hyperthreading_info():
no_logical_cpus = psutil.cpu_count(logical=True)
no_physical_cpus = cpu_count_physical()
if no_logical_cpus is None or no_physical_cpus is None:
hyperthreading = None
hyperthreading = no_logical_cpus > no_physical_cpus
return (hyperthreading, no_logical_cpus, no_physical_cpus)
class DistributedLocalClusterScheduler:
def __init__(self):
self.cluster = LocalCluster()
def __init__(self, threads_per_worker=2, **kwargs):
no_physical_cpus) = hyperthreading_info()
if hyperthreading:
factor = no_logical_cpus // no_physical_cpus
no_available_physical_cpus = dask.system.CPU_COUNT // factor
n_workers = no_available_physical_cpus // threads_per_worker
memory_limit = (system.MEMORY_LIMIT*.9) / n_workers
# let dask figure it out
n_workers = None
memory_limit = None
self.cluster = LocalCluster(n_workers=n_workers,
self.client = Client(self.cluster)
def __enter__(self):
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment