Commit 6ddf6358 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Move complete lazy and non-lazy calculations into index functions (closes #58)

parent 459c52f0
......@@ -13,26 +13,9 @@ class PointLocalAggregator(Aggregator):
self.index_function = index_function
self.output_metadata = output_metadata
super().__init__(cell_method=cell_method,
call_func=self.non_lazy_func,
lazy_func=self.lazy_func, **kwargs)
def lazy_func(self, data, axis, **kwargs):
if isinstance(axis, list) and len(axis) == 1:
axis = axis[0]
if axis < 0:
# just cope with negative axis numbers
axis += data.ndim
return da.apply_along_axis(self.index_function,
axis=axis, arr=data, **kwargs)
def non_lazy_func(self, data, axis, **kwargs):
if isinstance(axis, list) and len(axis) == 1:
axis = axis[0]
if axis < 0:
# just cope with negative axis numbers
axis += data.ndim
return np.apply_along_axis(self.index_function,
axis=axis, arr=data, **kwargs)
call_func=self.index_function.call_func,
lazy_func=self.index_function.lazy_func,
**kwargs)
def update_metadata(self, cube, coords, **kwargs):
super().update_metadata(cube, coords, **kwargs)
......
# -*- coding: utf-8 -*-
from cf_units import Unit
import dask.array as da
import numpy as np
from .util import change_units
SUPPORTED_OPERATORS = [
'<',
'>',
'<=',
'>=',
]
SUPPORTED_REDUCERS = [
'min',
'max',
'sum',
'mean',
]
NUMPY_OPERATORS = {
'<': np.less,
'>': np.greater,
'<=': np.less_equal,
'>=': np.greater_equal,
}
NUMPY_REDUCERS = {
'min': np.min,
'max': np.max,
'sum': np.sum,
'mean': np.mean,
}
DASK_OPERATORS = {
'<': da.less,
'>': da.greater,
'<=': da.less_equal,
'>=': da.greater_equal,
}
DASK_REDUCERS = {
'min': da.min,
'max': da.max,
'sum': da.sum,
'mean': da.mean,
}
def normalize_axis(axis, ndim):
if isinstance(axis, list) and len(axis) == 1:
axis = axis[0]
if axis < 0:
# just cope with negative axis numbers
axis += ndim
return axis
class CountOccurences:
def __init__(self, threshold, condition):
self.threshold = threshold
self.condition = NUMPY_OPERATORS[condition]
self.lazy_condition = DASK_OPERATORS[condition]
self.condition = condition
self.standard_name = None
self.units = Unit('days')
......@@ -16,10 +70,17 @@ class CountOccurences:
def prepare(self, input_cube):
change_units(self.threshold, input_cube.units, input_cube.standard_name)
def __call__(self, raw_data):
data = self.condition(raw_data, self.threshold.data)
res = np.count_nonzero(data)
return float(res)
def call_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
cond = self.condition(data, self.threshold.data)
res = np.count_nonzero(cond, axis=axis)
return res.astype('float32')
def lazy_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
cond = self.lazy_condition(data, self.threshold.data)
res = da.count_nonzero(cond, axis=axis)
return res.astype('float32')
class SpellLength:
......@@ -33,6 +94,16 @@ class SpellLength:
def prepare(self, input_cube):
change_units(self.threshold, input_cube.units, input_cube.standard_name)
def call_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
return np.apply_along_axis(self,
axis=axis, arr=data, **kwargs)
def lazy_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
return da.apply_along_axis(self,
axis=axis, arr=data, **kwargs)
def __call__(self, raw_data):
data = self.condition(raw_data, self.threshold.data)
where = np.flatnonzero
......@@ -64,15 +135,24 @@ class SpellLength:
class ThresholdedStatistics:
def __init__(self, threshold, condition, reducer):
self.threshold = threshold
self.condition = condition
self.reducer = reducer
self.condition = NUMPY_OPERATORS[condition]
self.reducer = NUMPY_REDUCERS[reducer]
self.lazy_condition = DASK_OPERATORS[condition]
self.lazy_reducer = DASK_REDUCERS[reducer]
def prepare(self, input_cube):
change_units(self.threshold, input_cube.units, input_cube.standard_name)
self.standard_name = input_cube.standard_name
self.units = input_cube.units
def __call__(self, raw_data):
comb = self.condition(raw_data, self.threshold.data)
res = self.reducer(raw_data[comb])
return float(res)
def call_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
comb = self.condition(data, self.threshold.data)
res = self.reducer(np.ma.masked_where(~comb, data), axis=axis)
return res.astype('float32')
def lazy_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
comb = self.lazy_condition(data, self.threshold.data)
res = self.lazy_reducer(da.ma.masked_where(~comb, data), axis=axis)
return res.astype('float32')
......@@ -14,6 +14,7 @@ import yaml
import climix
from .index import Index
from .index_functions import SUPPORTED_OPERATORS, SUPPORTED_REDUCERS
from .period import PeriodSpecification
......@@ -59,26 +60,21 @@ def load_metadata():
def build_parameters(parameters_metadata):
OPERATORS = {
'<': np.less,
'>': np.greater,
'<=': np.less_equal,
'>=': np.greater_equal,
}
REDUCERS = {
'min': np.min,
'max': np.max,
'sum': np.sum,
}
parameters = {}
for name, md in parameters_metadata.items():
kind = md.pop('kind')
if kind == 'quantity':
parameter = iris.cube.Cube(**md)
elif kind == 'operator':
parameter = OPERATORS[md['operator']]
op = md['operator']
if op not in SUPPORTED_OPERATORS:
raise ValueError('Unknown operator <{}>'.format(op))
parameter = op
elif kind == 'reducer':
parameter = REDUCERS[md['reducer']]
red = md['reducer']
if red not in SUPPORTED_REDUCERS:
raise ValueError('Unknown reducer <{}>'.format(red))
parameter = red
else:
raise ValueError('Unknown parameter kind <{}>'.format(kind))
parameters[name] = parameter
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment