Commit e2dabd16 by Klaus Zimmermann

### Fix percentiles (fixes #205)

 ... ... @@ -21,51 +21,115 @@ def calc_doy_indices(day_of_year): yield doy, inds def calc_index(prob, n): def hyndman_fan_params(prob, n): r""" Compute coefficient and index for quantile calculation. Following Hyndman and Fan (1996), the quantile for probability :math:p, :math:Q(p), given the order statistics of a sample with size :math:n, :math:X_{(1)} < \dots < X_{(n)} can be calculated as a linear combination of two neighbouring order statistics according to .. math:: Q(p) = (1-\gamma)X_{(j)} + \gamma X_{(j+1)}, where :math:j = \lfloor pn + m \rfloor and :math:\gamma = pn + m - j, i.e. :math:j and :math:\gamma are the integer and the fractional part of :math:pn + m, respectively, with :math:m \in \mathbb{R} some number such that .. math:: \frac{j - m}{n} \le p \le \frac{j - m + 1}{n} holds. Choosing :math:m now allows one to define different ways to calculate the quantiles. In particular, Hyndman and Fan go on to define a class of such approaches with .. math:: m = \alpha + p(1 - \alpha - \beta) characterized by the choice of the constants :math:\alpha and :math:\beta. For consistency with previous methods, here we follow there Definition 8 with :math:\alpha = \frac{1}{3}, :math:\beta = \frac{1}{3}. Defining :math:\delta(\alpha, \beta) = pn + m = p(n + 1 - \alpha - \beta) + \alpha, we find :math:\delta(\frac{1}{3}, \frac{1}{3}) = p(n + \frac{1}{3}) + \frac{1}{3}, which allows us to calculate :math:j and :math:\gamma. """ delta = prob * (n + 1./3.) + 1./3. j = int(delta) gamma = delta - j return (j, gamma) def hyndman_fan_to_topk(n, j, window_size): r""" Calculate topk argument for Hyndman and Fan order statistics index. We need order statistics :math:j and :math:j + 1 to calculate the quantile in question. In a parallel setting, it is more efficient to avoid a full sorting of the input data and instead to only extract the :math:j largest or :math:j + 1 smallest data points, particularly if :math:j is small or close to :math:n as is the case for the often sought extreme quantiles. If :math:j > \frac{n}{2} we want to calculate the :math:n - j largest points, else we want to calculate the :math:j + 1 smallest points, indicated by the negative k (cf :func:da.topk). """ if j > n/2: k = n - j k_ext = k + window_size else: k = -j - 1 return (k, gamma) k = -(j + 1) k_ext = k - window_size return (k, k_ext) class BootstrapQuantiles: def __init__(self, data, prob, first_year, window_size, client): self.first_year = first_year n = data.shape[-1] self.k, self.gamma = calc_index(prob, n) self.order_indices = client.persist(da.argtopk(data, self.k - window_size, axis=-1)) progress(self.order_indices) self.j, self.gamma = hyndman_fan_params(prob, n) self.k, k_ext = hyndman_fan_to_topk(n, self.j, window_size) order_indices = client.persist(da.argtopk(data, k_ext, axis=-1)) progress(order_indices) self.order_statistics = client.persist( dask_take_along_axis(data, self.order_indices, axis=-1)) dask_take_along_axis(data, order_indices, axis=-1).rechunk()) progress(self.order_statistics) self.years = client.persist( first_year + da.floor(self.order_indices / window_size)) first_year + da.floor(order_indices / window_size).rechunk()) progress(self.years) def quantiles(self, ignore_year=None, duplicate_year=None): k = abs(self.k) abs_k = abs(self.k) if ignore_year is None and duplicate_year is None: qi = self.order_statistics[..., k-2:k] if abs_k < 1: quantiles = self.order_statistics[..., abs_k] else: if self.k >= 0: ind_j = abs_k ind_j_plus_one = abs_k - 1 else: ind_j = abs_k - 1 ind_j_plus_one = abs_k quantiles = ((1.-self.gamma)*self.order_statistics[..., ind_j] + self.gamma*self.order_statistics[..., ind_j_plus_one]) else: offset = (da.sum(self.years[..., :k] == ignore_year, axis=-1) - da.sum(self.years[..., :k] == duplicate_year, axis=-1)) offset = da.sum(self.years[..., :abs_k] == ignore_year, axis=-1) offset -= da.sum(self.years[..., :abs_k] == duplicate_year, axis=-1) offset += abs_k if self.k >= 0: ind_j = offset ind_j_plus_one = da.where(offset >= 1, offset - 1, offset) else: ind_j = da.where(offset >= 1, offset - 1, offset) ind_j_plus_one = offset qi = dask_take_along_axis( self.order_statistics, da.stack([k-2+offset, k-1+offset], axis=-1), da.stack([ind_j, ind_j_plus_one], axis=-1), axis=-1) quantiles = (1.-self.gamma)*qi[..., 0] + self.gamma*qi[..., 1] quantiles = (1.-self.gamma)*qi[..., 0] + self.gamma*qi[..., 1] return quantiles class TimesHelper: def __init__(self, time): self.times = time.points self.times = time.core_points() self.units = str(time.units) def __getattr__(self, name): ... ... @@ -130,6 +194,7 @@ class CountPercentileOccurrences(IndexFunction): cube = cubes[0] time = cube.coord('time') time_units = time.units logging.info("Determining time indices") times = TimesHelper(time) idx_0 = cftime.date2index(self.base.start, times, calendar=time_units.calendar, ... ... @@ -149,24 +214,39 @@ class CountPercentileOccurrences(IndexFunction): self.last_year = self.base.end.year - 1 max_doy = 365 self.k = calc_index(self.percentile, max_doy) self.years = {y: np.arange(i*window_size, (i+1)*window_size) for i, y in enumerate(range(self.first_year, self.last_year + 1))} logging.info("Building indices") np_indices = build_indices(time[idx_0:idx_n+1], max_doy, len(self.years), window_size) logging.info("Arranging data") all_data = da.moveaxis( cube.core_data()[idx_0-window_width:idx_n+window_width+1], 0, -1) all_data = all_data.rechunk(('auto',)*(all_data.ndim-1) + (-1,)) all_data = client.persist(all_data) progress(all_data) data = [] for idx_d in range(max_doy): data.append(all_data[..., np_indices[idx_d].ravel()]) d = client.persist(all_data[..., np_indices[idx_d].ravel()]) progress(d) data.append(d) logging.info(f"Finished doy {idx_d}") data = da.stack(data, axis=0) data = data.rechunk({0: -1, 1: 'auto', 2: 'auto', 3: -1}) data = data.rechunk({0: 'auto'}) logging.info(f"data chunks: {data.chunks}") data = client.persist(data) progress(data) logging.info(f"data chunks: {data.chunks}") chunks = (-1,) + (10,)*(data.ndim-2) + (-1,) data = data.rechunk(chunks) data = client.persist(data) progress(data) logging.info("Initializing quantiles") self.quantiler = BootstrapQuantiles(data, self.percentile, self.first_year, window_size, client) client.cancel(data) logging.info("Starting quantile calculation") res = client.persist(self.quantiler.quantiles().rechunk()) ... ... @@ -193,6 +273,7 @@ class CountPercentileOccurrences(IndexFunction): count = da.count_nonzero(cond, axis=0) counts.append(count) counts = da.stack(counts, axis=-1) counts = counts.rechunk(('auto',)*(counts.ndim-1) + (-1,)) avg_counts = counts.mean(axis=-1) percents = avg_counts/(data.shape[0]/100.) else: ... ...