metadata.py 12.4 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

from dataclasses import dataclass
import copy
from enum import Enum
6
import glob
7
8
9
10
11
from itertools import combinations
import logging
import os
import re
import string
12
import sys
13
14
15
from typing import Any, List, Mapping, Optional, Union

import iris
16
import xdg.BaseDirectory
17
18
19
20
21
22
import yaml

from .index import Index
from .period import PeriodSpecification


23
24
25
26
27
28
29
if sys.version_info[:2] >= (3, 10):
    # pylint: disable=no-name-in-module
    from importlib.metadata import entry_points
else:
    from importlib_metadata import entry_points


30
31
32
33
34
35
36
37
38
@dataclass
class CellMethod:
    name: str
    method: str


def format_var_name(var_name, parameters):
    def format_value(value):
        if value < 0:
39
            return f"m{-value}"
40
        else:
41
42
            return f"{value}"

43
    parsed_var_name = list(string.Formatter().parse(var_name))
44
45
46
47
48
    items = {
        ft[1]: format_value(parameters[ft[1]])
        for ft in parsed_var_name
        if ft[1] is not None
    }
49
50
51
52
53
54
55
56
57
58
59
60
61
62
    return var_name.format(**items)


@dataclass
class OutputVariable:
    var_name: str
    standard_name: str
    proposed_standard_name: str
    long_name: str
    units: str
    cell_methods: List[CellMethod]

    @property
    def drs(self):
63
        return {"var_name": self.var_name}
64
65
66
67

    def instantiate(self, parameters):
        return OutputVariable(
            format_var_name(self.var_name, parameters),
68
69
            self.standard_name,
            self.proposed_standard_name,
70
71
            self.long_name.format(**parameters),
            self.units,
72
73
            self.cell_methods,
        )
74
75
76
77
78
79
80


@dataclass
class InputVariable:
    var_name: str
    standard_name: str
    cell_methods: List[CellMethod]
81
    aliases: List[str]
82
83
84
85

    def instantiate(self, parameters):
        return InputVariable(
            format_var_name(self.var_name, parameters),
86
87
            self.standard_name,
            self.cell_methods,
88
89
            self.aliases,
        )
90
91


92
def build_variable(name, variable, path):
93
94
95
96
    cell_methods = [CellMethod(*cm.popitem()) for cm in variable.pop("cell_methods")]
    return InputVariable(
        name, variable["standard_name"], cell_methods, variable["aliases"]
    )
97
98


99
class ParameterKind(Enum):
100
101
102
103
    FLAG = "flag"
    OPERATOR = "operator"
    QUANTITY = "quantity"
    REDUCER = "reducer"
104
105


106
107
108
PARAMETER_KINDS = {}


109
110
111
112
113
114
115
116
117
118
119
@dataclass
class ParameterFlag:
    name: str
    flag: bool
    kind: ParameterKind = ParameterKind.FLAG

    @property
    def parameter(self):
        return self.flag

    def instantiate(self, parameters):
120
        return ParameterFlag(ParameterKind.FLAG, self.flag)
121
122


123
PARAMETER_KINDS["flag"] = ParameterFlag
124
125
126
127
128
129
130
131
132
133
134
135
136


@dataclass
class ParameterOperator:
    name: str
    operator: str
    kind: ParameterKind = ParameterKind.OPERATOR

    @property
    def parameter(self):
        return self.operator

    def instantiate(self, parameters):
137
        return ParameterOperator(ParameterKind.OPERATOR, self.operator)
138
139


140
PARAMETER_KINDS["operator"] = ParameterOperator
141
142


143
@dataclass
144
145
class ParameterQuantity:
    var_name: str
146
147
148
    standard_name: str
    data: Any
    units: str
149
    long_name: str = None
150
    proposed_standard_name: str = None
151
    kind: ParameterKind = ParameterKind.QUANTITY
152
153
154

    @property
    def parameter(self):
155
156
157
158
159
160
161
        return iris.coords.AuxCoord(
            self.data,
            self.standard_name,
            self.long_name,
            self.var_name,
            units=self.units,
        )
162
163
164
165
166
167

    def instantiate(self, parameters):
        data = self.data
        if isinstance(data, dict) and len(data) == 1:
            key, value = data.popitem()
            data = parameters[key]
168
169
170
171
        ln = self.long_name
        if ln is not None:
            ln = ln.format(**parameters)
        param = ParameterQuantity(
172
            format_var_name(self.var_name, parameters),
173
            self.standard_name,
174
            data,
175
            self.units,
176
177
            ln,
        )
178
        return param
179
180


181
PARAMETER_KINDS["quantity"] = ParameterQuantity
182
183


184
@dataclass
185
186
class ParameterReducer:
    name: str
187
    reducer: str
188
    kind: ParameterKind = ParameterKind.REDUCER
189
190
191
192
193
194

    @property
    def parameter(self):
        return self.reducer

    def instantiate(self, parameters):
195
        return ParameterReducer(ParameterKind.REDUCER, self.reducer)
196
197


198
PARAMETER_KINDS["reducer"] = ParameterReducer
199
200


201
202
203
Parameter = Union[ParameterQuantity, ParameterOperator, ParameterReducer]


204
205
206
207
208
209
210
211
@dataclass
class IndexFunction:
    name: str
    parameters: Mapping[str, Parameter]

    def instantiate(self, parameters):
        return IndexFunction(
            self.name,
212
213
214
215
216
            {
                name: param.instantiate(parameters)
                for name, param in self.parameters.items()
            },
        )
217
218
219
220
221
222
223


@dataclass
class IndexDefinition:
    reference: str
    period: Mapping[str, Union[str, Mapping[str, Optional[str]]]]
    output: OutputVariable
224
    input: Mapping[str, InputVariable]
225
    index_function: IndexFunction
226
    source: str
227
228
229
230
231
232

    def instantiate(self, parameters):
        idx = IndexDefinition(
            self.reference,
            self.period,
            self.output.instantiate(parameters),
233
            {key: iv.instantiate(parameters) for key, iv in self.input.items()},
234
            self.index_function.instantiate(parameters),
235
236
            self.source,
        )
237
238
239
        return idx


240
def build_parameter(name, metadata):
241
    return PARAMETER_KINDS[metadata["kind"]](name, **metadata)
242
243


244
def build_index(metadata, variables, source=None):
245
    if (cell_methods := metadata["output"]["cell_methods"]) is None:  # noqa
246
247
248
        cms = []
    else:
        cms = [CellMethod(*cm.popitem()) for cm in cell_methods]
249
    output = OutputVariable(
250
251
252
253
254
255
256
257
258
        metadata["output"]["var_name"],
        metadata["output"]["standard_name"],
        metadata["output"].get("proposed_standard_name", None),
        metadata["output"]["long_name"],
        metadata["output"]["units"],
        cms,
    )
    if isinstance(metadata["input"], str):
        input_metadata = {"data": metadata["input"]}
259
    else:
260
        input_metadata = metadata["input"]
261
    input = {key: variables[name] for key, name in input_metadata.items()}
262
    params = metadata["index_function"]["parameters"]
263
264
265
    if params is None:
        parameters = {}
    else:
266
267
268
269
270
        parameters = {name: build_parameter(name, params[name]) for name in params}
    index_function = IndexFunction(metadata["index_function"]["name"], parameters)
    idx = IndexDefinition(
        metadata["reference"], metadata["period"], output, input, index_function, source
    )
271
272
273
274
    return idx


def get_signature_candidates(signature_parts):
275
276
277
278
    m_indices = [i for i, p in enumerate(signature_parts) if p.endswith("m")]
    combs = sum(
        [list(combinations(m_indices, i)) for i in range(len(m_indices) + 1)], []
    )
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    candidates = []
    for combination in combs:
        parts = copy.copy(signature_parts)
        for idx in combination:
            parts[idx] = parts[idx][:-1]
        candidates.append(tuple(parts))
    return zip(combs, candidates)


class IndexCatalog:
    def __init__(self, indices):
        self.indices = indices
        self._build_template_index()

    def get_list(self):
        return self.indices.keys()

    def _build_template_index(self):
297
        expr = re.compile(r"{([^}]+)}")
298
299
300
301
302
303
304
305
306
307
308
309
310
311
        template_index = {}
        for index in self.indices.keys():
            split = expr.split(index)
            if len(split) == 1:
                continue
            signature = tuple(split[::2])
            parameter_names = split[1::2]
            template_index[signature] = (index, parameter_names)
        self.template_index = template_index

    def get_index_definition(self, index):
        try:
            return self.indices[index]
        except KeyError:
312
            index_expr = re.compile(r"(\d+)")
313
314
315
316
317
318
            split = index_expr.split(index)
            if len(split) == 1:
                raise
            signature_parts = split[::2]
            candidates = get_signature_candidates(signature_parts)
            matching_signatures = [
319
320
                candidate
                for candidate in candidates
321
322
323
324
325
326
327
328
329
330
                if candidate[1] in self.template_index
            ]
            if len(matching_signatures) == 0:
                raise
            elif len(matching_signatures) > 1:
                raise RuntimeError("More than one matching signature found")
            combination, signature = matching_signatures[0]
            template_name, parameter_names = self.template_index[signature]
            parameter_values = split[1::2]
            for i in combination:
331
332
333
334
335
                parameter_values[i] = "-" + parameter_values[i]
            parameter_dict = {
                name: int(value)
                for (name, value) in zip(parameter_names, parameter_values)
            }
336
337
338
339
340
341
            template = self.indices[template_name]
            index_definition = template.instantiate(parameter_dict)
            return index_definition

    def prepare_indices(self, requested_indices):
        def select_period(metadata):
342
343
            selected_period = metadata["default"]
            period_metadata = metadata["allowed"][selected_period]
344
            return PeriodSpecification(selected_period, period_metadata)
345

346
        indices = []
347
348
        for index_name in requested_indices:
            definition = self.get_index_definition(index_name)
349
            period_spec = select_period(definition.period)
350
            try:
351
                index_function = build_index_function(definition.index_function)
352
            except TypeError:
353
354
                logging.error(
                    f"Could not build index function for index "
355
                    f"{index_name} from definition {definition}"
356
                )
357
                raise
358
359
360
361
362
363
            logging.info(
                f"Trying to build index <{index_name}> "
                f"with period <{period_spec.type}"
                f"({p if (p := period_spec.parameters) is not None else ''})> "
                f"from definition in <{definition.source}>"
            )
364
            index = Index(index_function, definition, period_spec)
365
366
367
368
369
370
            indices.append(index)
        return indices


def build_index_function(spec):
    name = spec.name
371
    candidates = list(entry_points(group="climix.index_functions", name=name))
372
    if len(candidates) == 0:
373
        raise ValueError(f"No implementation found for index_function <{name}>")
374
375
376
    elif len(candidates) > 1:
        distributions = [candidate.dist for candidate in candidates]
        raise ValueError(
377
378
379
380
            f"Found several implementations for index_function <{name}>. "
            f"Please make sure only one is installed at any time. "
            f"The implementations come from the distributions {distributions}"
        )
381
    candidate = candidates[0]
382
383
384
385
    logging.info(
        f"Found implementation for index_function <{name}> "
        f"from distribution <{candidate.dist}>"
    )
386
    index_function_factory = candidates[0].load()
387
    parameters = {name: param.parameter for name, param in spec.parameters.items()}
388
389
390
391
    index_function = index_function_factory(**parameters)
    return index_function


392
393
def find_metadata_files_in_dir(directory):
    if os.path.isdir(directory):
394
        return glob.glob(os.path.join(directory, "*.yml"))
395
396
397
398
399
    return []


def find_metadata_files():
    directories = [
400
401
402
        os.path.join(os.path.dirname(__file__), "etc"),
        "/etc/climix",
    ] + list(xdg.BaseDirectory.load_config_paths("climix"))[::-1]
403
    for d in directories:
404
        logging.info(f"Looking for metadata in directory {d}")
405
406
407
    files = sum(
        [find_metadata_files_in_dir(directory) for directory in directories], []
    )
408
    return files
409

410
411

def load_metadata():
412
    variables = {}
413
    indices = {}
414
415
416
    variable_metadata = []
    index_metadata = []
    for path in find_metadata_files():
417
        logging.info(f"Reading index definitions from file {path}")
418
        with open(path) as md_file:
419
            metadata = yaml.safe_load(md_file)
420
421
        index_metadata.append((metadata.get("indices", {}), path))
        variable_metadata.append((metadata.get("variables", {}), path))
422
423
424
425
426
427
428
    for var_metadata, path in variable_metadata:
        for name, var in var_metadata.items():
            variables[name] = build_variable(name, var, path)
    for idx_metadata, path in index_metadata:
        for name, idx_meta in idx_metadata.items():
            try:
                indices[name] = build_index(idx_meta, variables, path)
429
            except (KeyError, TypeError):
430
                logging.error("Metadata error for index {} from {}.".format(name, path))
431
                raise
432
    return IndexCatalog(indices)