Commit a71a0ebb authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Handle negative parameter values (closes #117)

Adds handling of negative parameters in templated indices,
ie a template like `tngt{TT}` can be used as `tngtm2` to
use a threshold of -2.

It also comes with a massive change in the handling of metadata based on python dataclasses.
After the fact, I discovered the attrs project and I might consider redoing this in attrs
to gain access to validators etc. But for now, it is good enough and should be revisited
in the future to include validation one way or another as well as a possible refactoring
of some of the standalone functions into classmethods.
parent 54aa9305
......@@ -17,8 +17,8 @@ class PointLocalAggregator(Aggregator):
def update_metadata(self, cube, coords, **kwargs):
super().update_metadata(cube, coords, **kwargs)
cube.var_name = self.output_metadata['var_name']
cube.long_name = self.output_metadata['long_name']
cube.var_name = self.output_metadata.var_name
cube.long_name = self.output_metadata.long_name
cube.standard_name = self.index_function.standard_name
cube.units = self.index_function.units
......@@ -32,14 +32,12 @@ class PointLocalAggregator(Aggregator):
pass
else:
cube = post_processor(cube, data_result, coords, **kwargs)
standard_name = self.output_metadata['standard_name']
standard_name = self.output_metadata.standard_name
unit_standard_name = standard_name
proposed_standard_name = self.output_metadata.get(
'proposed_standard_name', None)
proposed_standard_name = self.output_metadata.proposed_standard_name
if unit_standard_name is None:
unit_standard_name = proposed_standard_name
units = self.output_metadata['units']
change_units(cube, units, unit_standard_name)
change_units(cube, self.output_metadata.units, unit_standard_name)
cube.standard_name = standard_name
if proposed_standard_name is not None:
cube.attributes['proposed_standard_name'] = proposed_standard_name
......
......@@ -2,9 +2,7 @@
# -*- coding: utf-8 -*-
import argparse
import copy
import os
import re
import threading
import time
......@@ -12,16 +10,11 @@ import iris
from iris.experimental.equalise_cubes import equalise_attributes
import netCDF4
import numpy as np
import pkg_resources
import sentry_sdk
import six
import yaml
import climix
from .dask_setup import SCHEDULERS, setup_scheduler
from .index import Index
from .index_functions import SUPPORTED_OPERATORS, SUPPORTED_REDUCERS
from .period import PeriodSpecification
from .metadata import load_metadata
import logging
logging.basicConfig(level=logging.INFO)
......@@ -65,118 +58,6 @@ def ignore_cb(cube, field, filename):
cube.attributes.pop('tracking_id', None)
def load_metadata():
metadata_filename = os.path.join(os.path.dirname(__file__),
'etc', 'metadata.yml')
with open(metadata_filename) as md_file:
metadata = yaml.safe_load(md_file)
return metadata
def build_parameters(parameters_metadata):
parameters = {}
for name, md in parameters_metadata.items():
kind = md.pop('kind')
if kind == 'quantity':
parameter = iris.cube.Cube(**md)
elif kind == 'operator':
op = md['operator']
if op not in SUPPORTED_OPERATORS:
raise ValueError(f'Unknown operator <{op}>')
parameter = op
elif kind == 'reducer':
red = md['reducer']
if red not in SUPPORTED_REDUCERS:
raise ValueError(f'Unknown reducer <{red}>')
parameter = red
else:
raise ValueError(f'Unknown parameter kind <{kind}>')
parameters[name] = parameter
return parameters
def build_index_function(spec):
name = spec['name']
candidates = list(pkg_resources.iter_entry_points('climix.index_functions',
name=name))
if len(candidates) == 0:
raise ValueError(f'No implementation found for '
f'index_function <{name}>')
elif len(candidates) > 1:
distributions = [candidate.dist for candidate in candidates]
raise ValueError(
f'Found several implementations for index_function <{name}>. '
f'Please make sure only one is installed at any time. '
f'The implementations come from the distributions {distributions}')
candidate = candidates[0]
logging.info(f'Found implementation for index_function <{name}> '
f'from distribution <{candidate.dist}>')
index_function_factory = candidates[0].load()
parameters = build_parameters(spec['parameters'])
index_function = index_function_factory(**parameters)
return index_function
def build_template_index(index_definitions):
expr = re.compile(r'{([^}]+)}')
template_index = {}
for index in index_definitions.keys():
split = expr.split(index)
if len(split) == 1:
continue
signature = tuple(split[::2])
parameter_names = split[1::2]
template_index[signature] = (index, parameter_names)
return template_index
def replace_parameters_in_dict(dictionary, parameter_dict):
for key, value in dictionary.items():
if isinstance(value, dict):
k, v = list(value.items())[0]
if len(value) == 1 and v is None and k in parameter_dict.keys():
dictionary[key] = parameter_dict[k]
else:
replace_parameters_in_dict(value, parameter_dict)
elif isinstance(value, six.string_types):
dictionary[key] = dictionary[key].format(**parameter_dict)
def get_index_definition(index_definitions, index):
try:
return index_definitions[index]
except KeyError:
index_expr = re.compile(r'(\d+)')
split = index_expr.split(index)
if len(split) == 1:
raise
templates = build_template_index(index_definitions)
signature = tuple(split[::2])
template, parameter_names = templates[signature]
parameter_values = split[1::2]
parameter_dict = {name: int(value)
for (name, value) in
zip(parameter_names, parameter_values)}
index_definition = copy.deepcopy(index_definitions[template])
replace_parameters_in_dict(index_definition, parameter_dict)
return index_definition
def prepare_indices(index_definitions, requested_indices):
def select_period(metadata):
selected_period = metadata['default']
period_metadata = metadata['allowed'][selected_period]
return PeriodSpecification(selected_period, period_metadata)
indices = []
for index in requested_indices:
definition = get_index_definition(index_definitions, index)
period_spec = select_period(definition['period'])
index_function = build_index_function(definition['index_function'])
index = Index(index_function, definition['output'], period_spec)
indices.append(index)
return indices
def prepare_input_data(datafiles):
datacube = iris.load_raw(datafiles, callback=ignore_cb)
iris.util.unify_time_units(datacube)
......@@ -221,7 +102,8 @@ def build_output_filename(index, datafiles, output_template):
if output_template is None:
output_template = guess_output_template(datafiles)
frequency = index.period.label
return output_template.format(frequency=frequency, **index.output_metadata)
return output_template.format(frequency=frequency,
**index.output_metadata.drs)
def save(result, output_filename, sliced_mode=False):
......@@ -268,9 +150,9 @@ def save(result, output_filename, sliced_mode=False):
def do_main(requested_indices, datafiles, output_template, sliced_mode):
logging.info('Loading metadata')
metadata = load_metadata()
index_catalog = load_metadata()
logging.info('Preparing indices')
indices = prepare_indices(metadata['indices'], requested_indices)
indices = index_catalog.prepare_indices(requested_indices)
for index in indices:
logging.info(f'Starting calculations for index {index}')
logging.info('Building output filename')
......
# -*- coding: utf-8 -*-
from dataclasses import dataclass
import copy
from enum import Enum
from itertools import combinations
import logging
import os
import re
import string
from typing import Any, List, Mapping, Optional, Union
import iris
import pkg_resources
import yaml
from .index import Index
from .index_functions import SUPPORTED_OPERATORS, SUPPORTED_REDUCERS
from .period import PeriodSpecification
@dataclass
class CellMethod:
name: str
method: str
def format_var_name(var_name, parameters):
def format_value(value):
if value < 0:
return f'm{-value}'
else:
return f'{value}'
parsed_var_name = list(string.Formatter().parse(var_name))
items = {ft[1]: format_value(parameters[ft[1]])
for ft in parsed_var_name if ft[1] is not None}
return var_name.format(**items)
@dataclass
class OutputVariable:
var_name: str
standard_name: str
proposed_standard_name: str
long_name: str
units: str
cell_methods: List[CellMethod]
@property
def drs(self):
return {'var_name': self.var_name}
def instantiate(self, parameters):
return OutputVariable(
format_var_name(self.var_name, parameters),
self.standard_name.format(**parameters),
self.proposed_standard_name.format(**parameters),
self.long_name.format(**parameters),
self.units,
self.cell_methods)
@dataclass
class InputVariable:
var_name: str
standard_name: str
cell_methods: List[CellMethod]
def instantiate(self, parameters):
return InputVariable(
format_var_name(self.var_name, parameters),
self.standard_name.format(**parameters),
self.cell_methods)
class ParameterKind(Enum):
QUANTITY = 'quantity'
OPERATOR = 'operator'
REDUCER = 'reducer'
@dataclass
class Parameter:
kind: ParameterKind
@dataclass
class ParameterQuantity(Parameter):
long_name: str
standard_name: str
data: Any
units: str
@property
def parameter(self):
return iris.cube.Cube(self.data,
self.standard_name,
self.long_name,
units=self.units)
def instantiate(self, parameters):
data = self.data
if isinstance(data, dict) and len(data) == 1:
key, value = data.popitem()
data = parameters[key]
return ParameterQuantity(
ParameterKind.QUANTITY,
self.long_name.format(**parameters),
self.standard_name.format(**parameters),
data,
self.units)
@dataclass
class ParameterOperator(Parameter):
operator: str
@property
def parameter(self):
return self.operator
def instantiate(self, parameters):
return ParameterOperator(
ParameterKind.OPERATOR,
self.operator)
@dataclass
class ParameterReducer(Parameter):
reducer: str
@property
def parameter(self):
return self.reducer
def instantiate(self, parameters):
return ParameterReducer(
ParameterKind.REDUCER,
self.reducer)
@dataclass
class IndexFunction:
name: str
parameters: Mapping[str, Parameter]
def instantiate(self, parameters):
return IndexFunction(
self.name,
{name: param.instantiate(parameters)
for name, param in self.parameters.items()})
@dataclass
class IndexDefinition:
reference: str
period: Mapping[str, Union[str, Mapping[str, Optional[str]]]]
output: OutputVariable
input: InputVariable
index_function: IndexFunction
def instantiate(self, parameters):
idx = IndexDefinition(
self.reference,
self.period,
self.output.instantiate(parameters),
self.input.instantiate(parameters),
self.index_function.instantiate(parameters))
return idx
def build_parameter(metadata):
if metadata['kind'] == 'quantity':
param = ParameterQuantity(ParameterKind.QUANTITY,
metadata['long_name'],
metadata['standard_name'],
metadata['data'],
metadata['units'])
elif metadata['kind'] == 'operator':
param = ParameterOperator(ParameterKind.OPERATOR,
metadata['operator'])
elif metadata['kind'] == 'reducer':
param = ParameterReducer(ParameterKind.REDUCER,
metadata['reducer'])
else:
raise RuntimeError(f'Unknown parameter kind {metadata}')
return param
def build_index(metadata):
idx = IndexDefinition(
metadata['reference'],
metadata['period'],
OutputVariable(
metadata['output']['var_name'],
metadata['output']['standard_name'],
metadata['output'].get('proposed_standard_name', None),
metadata['output']['long_name'],
metadata['output']['units'],
[CellMethod(*cm.popitem())
for cm in metadata['output']['cell_methods']]),
InputVariable(
metadata['input']['var_name'],
metadata['input']['standard_name'],
[CellMethod(*cm.popitem())
for cm in metadata['input']['cell_methods']]),
IndexFunction(
metadata['index_function']['name'],
{name: build_parameter(param)
for name, param in
metadata['index_function']['parameters'].items()}))
return idx
def get_signature_candidates(signature_parts):
m_indices = [i
for i, p in enumerate(signature_parts)
if p.endswith('m')]
combs = sum([list(combinations(m_indices, l))
for l in range(len(m_indices)+1)], [])
candidates = []
for combination in combs:
parts = copy.copy(signature_parts)
for idx in combination:
parts[idx] = parts[idx][:-1]
candidates.append(tuple(parts))
return zip(combs, candidates)
class IndexCatalog:
def __init__(self, indices):
self.indices = indices
self._build_template_index()
def get_list(self):
return self.indices.keys()
def _build_template_index(self):
expr = re.compile(r'{([^}]+)}')
template_index = {}
for index in self.indices.keys():
split = expr.split(index)
if len(split) == 1:
continue
signature = tuple(split[::2])
parameter_names = split[1::2]
template_index[signature] = (index, parameter_names)
self.template_index = template_index
def get_index_definition(self, index):
try:
return self.indices[index]
except KeyError:
index_expr = re.compile(r'(\d+)')
split = index_expr.split(index)
if len(split) == 1:
raise
signature_parts = split[::2]
candidates = get_signature_candidates(signature_parts)
matching_signatures = [
candidate for candidate in candidates
if candidate[1] in self.template_index
]
if len(matching_signatures) == 0:
raise
elif len(matching_signatures) > 1:
raise RuntimeError("More than one matching signature found")
combination, signature = matching_signatures[0]
template_name, parameter_names = self.template_index[signature]
parameter_values = split[1::2]
for i in combination:
parameter_values[i] = '-' + parameter_values[i]
parameter_dict = {name: int(value)
for (name, value) in
zip(parameter_names, parameter_values)}
template = self.indices[template_name]
index_definition = template.instantiate(parameter_dict)
return index_definition
def prepare_indices(self, requested_indices):
def select_period(metadata):
selected_period = metadata['default']
period_metadata = metadata['allowed'][selected_period]
return PeriodSpecification(selected_period, period_metadata)
indices = []
for index in requested_indices:
definition = self.get_index_definition(index)
period_spec = select_period(definition.period)
index_function = build_index_function(definition.index_function)
index = Index(index_function, definition.output, period_spec)
indices.append(index)
return indices
def build_index_function(spec):
name = spec.name
candidates = list(pkg_resources.iter_entry_points('climix.index_functions',
name=name))
if len(candidates) == 0:
raise ValueError(f'No implementation found for '
f'index_function <{name}>')
elif len(candidates) > 1:
distributions = [candidate.dist for candidate in candidates]
raise ValueError(
f'Found several implementations for index_function <{name}>. '
f'Please make sure only one is installed at any time. '
f'The implementations come from the distributions {distributions}')
candidate = candidates[0]
logging.info(f'Found implementation for index_function <{name}> '
f'from distribution <{candidate.dist}>')
index_function_factory = candidates[0].load()
parameters = {name: param.parameter
for name, param in spec.parameters.items()}
index_function = index_function_factory(**parameters)
return index_function
def load_metadata():
metadata_filename = os.path.join(os.path.dirname(__file__),
'etc', 'metadata.yml')
with open(metadata_filename) as md_file:
metadata = yaml.safe_load(md_file)
indices = {name: build_index(idx_meta)
for name, idx_meta in metadata['indices'].items()}
return IndexCatalog(indices)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment