Commit 0ca00ff5 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Improve spell length implementation (closes #129)

parent 728d9386
......@@ -9,9 +9,12 @@ from .index_functions import ( # noqa: F401
InterdayDiurnalTemperatureRange,
LastOccurrence,
Percentile,
SpellLength,
Statistics,
ThresholdedPercentile,
ThresholdedStatistics,
TemperatureSum,
)
from .spell_functions import ( # noqa: F401
SpellLength,
)
......@@ -229,52 +229,6 @@ class Percentile(IndexFunction):
return res.astype('float32')
class SpellLength(ThresholdMixin, ReducerMixin, IndexFunction):
def __init__(self, threshold, condition, reducer):
super().__init__(threshold, condition, reducer, units=Unit('days'))
def call_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
mask = np.ma.getmaskarray(data).any(axis=axis)
res = np.apply_along_axis(self, axis=axis, arr=data)
res = np.ma.masked_array(np.ma.getdata(res), mask)
return res.astype('float32')
def lazy_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
mask = da.ma.getmaskarray(data).any(axis=axis)
res = da.apply_along_axis(self, axis=axis, arr=data)
res = da.ma.masked_array(da.ma.getdata(res), mask)
return res.astype('float32')
def __call__(self, raw_data):
data = self.condition(raw_data, self.threshold.points)
where = np.flatnonzero
x = data[1:] != data[:-1]
starts = where(x) + 1
n = len(x)
no_runs = len(starts)
if no_runs == 0:
res = n if data[0] else 0
else:
candidates = []
if data[0]:
candidates.append(starts[0])
if data[starts[-1]]:
candidates.append(n - starts[-1])
if no_runs > 1:
lengths = np.diff(starts)
values = data[starts[:-1]]
if np.any(values):
lmax = self.reducer(lengths[values])
candidates.append(lmax)
if len(candidates) == 0:
res = 0
else:
res = self.reducer(candidates)
return float(res)
class Statistics(ReducerMixin, IndexFunction):
def __init__(self, reducer):
super().__init__(reducer)
......
from cf_units import Unit
import dask.array as da
import numpy as np
from .spell_kernels import make_spell_length_kernels
from .support import (normalize_axis,
IndexFunction,
ThresholdMixin, ReducerMixin)
class SpellLength(ThresholdMixin, ReducerMixin, IndexFunction):
def __init__(self, threshold, condition, reducer):
super().__init__(threshold, condition, reducer, units=Unit('days'))
kernels = make_spell_length_kernels(self.scalar_reducer)
self.chunk_kernel, self.combine_kernel = kernels
def call_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
mask = np.ma.getmaskarray(data).any(axis=axis)
res = np.apply_along_axis(self, axis=axis, arr=data)
res = np.ma.masked_array(np.ma.getdata(res), mask)
return res.astype('float32')
def lazy_func(self, data, axis, **kwargs):
axis = normalize_axis(axis, data.ndim)
mask = da.ma.getmaskarray(data).any(axis=axis)
data = da.moveaxis(data, axis, -1)
res = da.reduction(data, self.chunk, self.aggregate,
axis=-1, dtype=int, combine=self.combine,
concatenate=False)
res = da.ma.masked_array(da.ma.getdata(res), mask)
return res.astype('float32')
def chunk(self, raw_data, axis, keepdims, computing_meta=False):
if computing_meta:
return np.array((0,), ndim=1, dtype=int)
data = self.condition(raw_data, self.threshold.points)
chunk_res = self.chunk_kernel(data)
return chunk_res
def combine(self, x_chunk, axis, keepdims):
if not isinstance(x_chunk, list):
return x_chunk
return self.combine_kernel(np.array(x_chunk))
def aggregate(self, x_chunk, axis, keepdims):
res = self.combine(x_chunk, axis, keepdims)
res = self.reducer(res[..., 1:], axis=-1)
return res
from numba import jit
import numpy as np
def make_spell_length_kernels(reducer):
# The gufunc support in numba is lacking right now.
# Once numba supports NEP-20 style signatures for
# gufunc, we can use @guvectorize to vectorize this
# function, allowing for more general applicability
# in terms of dimensions and hopefully better performance,
# perhaps taking advantage of GPUs. The template is
# @guvectorize([float32, int64], '(n)->(4)')
# def chunk_column(column, res):
@jit(nopython=True)
def chunk_column(column):
res = np.empty((4,), dtype=np.int64)
n = column.shape[0]
where = np.flatnonzero
x = column[1:] != column[:-1]
starts = where(x) + 1
no_runs = len(starts)
# the length of the chunk is n
res[0] = n
# assume no internal spell
res[2] = 0
# if no change occurs, then...
if no_runs == 0:
# ...either the spell covers the entire chunk, or...
if column[0]:
res[1] = n
res[3] = n
# ...there is no spell in this chunk
else:
res[1] = 0
res[3] = 0
else:
# there is a spell from the beginning to the first change
# if the first value is part of a spell
res[1] = starts[0] if column[0] else 0
# there is a spell from the last change to the end if
# the last value is part of a spell
res[3] = n - starts[-1] if column[starts[-1]] else 0
# if there are at least two changes (range(1) = [0])
for k in range(no_runs - 1):
# if the value at the corresponding change is part
# of a spell then this spell stretches to the next
# change and is an internal spell
if column[starts[k]]:
length = starts[k+1] - starts[k]
res[2] = reducer(length, res[2])
return res
@jit(nopython=True)
def chunk(thresholded_data):
res = np.empty(thresholded_data.shape[:-1] + (4,), dtype=np.int64)
for ind in np.ndindex(*thresholded_data.shape[:-1]):
res[ind] = chunk_column(thresholded_data[ind])
return res
@jit(nopython=True)
def combine(x_chunk):
# start with the first chunk and merge all others subsequently
res = x_chunk[0].copy()
# mark where this chunk is completely covered by a spell
this_full = res[:, :, 0] == res[:, :, 1]
for k in range(1, x_chunk.shape[0]):
next_chunk = x_chunk[k]
for ind in np.ndindex(res.shape[:-1]):
ind_length = ind + (0,)
ind_head = ind + (1,)
ind_internal = ind + (2,)
ind_tail = ind + (3,)
# the next chunk is completely covered by a spell
next_full = next_chunk[ind_length] == next_chunk[ind_head]
# the length of the combined chunks is the sum of the
# lengths of the individual chunks
res[ind_length] += next_chunk[ind_length]
# if both are completely covered the merged chunk
# is completely covered too
if this_full[ind] and next_full:
res[ind_head] += next_chunk[ind_head]
res[ind_tail] += next_chunk[ind_tail]
# if the old chunk is completely covered, but the new one
# isn't, then
elif this_full[ind]:
# the head is the old head + the new head,
res[ind_head] += next_chunk[ind_head]
# the internal spell is the new internal spell,
res[ind_internal] = next_chunk[ind_internal]
# the tail is the new tail,
res[ind_tail] = next_chunk[ind_tail]
# and the resulting chunk is no longer fully covered
this_full[ind] = False
# if the old chunk is not fully covered, but the new one is
elif next_full:
# the tail is the old tail + the new head
res[ind_tail] += next_chunk[ind_head]
# if neither are fully covered
else:
# the head stays the same,
# the internal spell is the winner between
# the old internal spell, the new internal spell,
# and the internal spell resulting from merging
# old tail and new head,
res[ind_internal] = reducer(
res[ind_internal],
res[ind_tail] + next_chunk[ind_head],
next_chunk[ind_internal])
# and the tail is the new tail
res[ind_tail] = next_chunk[ind_tail]
return res
return (chunk, combine)
# -*- coding: utf-8 -*-
import operator
from cf_units import Unit
import dask.array as da
import numpy as np
......@@ -20,6 +22,18 @@ SUPPORTED_REDUCERS = [
'mean',
]
SCALAR_OPERATORS = {
'<': operator.lt,
'>': operator.gt,
'<=': operator.le,
'>=': operator.ge,
}
SCALAR_REDUCERS = {
'min': min,
'max': max,
'sum': sum,
}
NUMPY_OPERATORS = {
'<': np.less,
......@@ -94,3 +108,4 @@ class ReducerMixin:
super().__init__(*args, **kwargs)
self.reducer = NUMPY_REDUCERS[reducer]
self.lazy_reducer = DASK_REDUCERS[reducer]
self.scalar_reducer = SCALAR_REDUCERS[reducer]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment