datahandling.py 3.13 KB
Newer Older
Klaus Zimmermann's avatar
Klaus Zimmermann committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# -*- coding: utf-8 -*-

import logging
import threading
import time

import iris
from iris.experimental.equalise_cubes import equalise_attributes
import netCDF4
import numpy as np


MISSVAL = 1.0e20


def ignore_cb(cube, field, filename):
    cube.attributes.pop('creation_date', None)
    cube.attributes.pop('tracking_id', None)


def prepare_input_data(datafiles):
22
23
24
25
    datacubes = iris.load_raw(datafiles, callback=ignore_cb)
    iris.util.unify_time_units(datacubes)
    equalise_attributes(datacubes)
    cubes = datacubes.concatenate()
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
    var_names = [c.var_name for c in cubes]
    if len(var_names) > len(set(var_names)):  # noqa
        cubes_per_var_name = {}
        for c in cubes:
            cs = cubes_per_var_name.setdefault(c.var_name, [])
            cs.append(c)
        inconsistent_var_names = []
        for var_name, cubes in cubes_per_var_name.items():
            if len(cubes) > 1:
                inconsistent_var_names.append(var_name)
                logging.error('Found more than one cube for var_name "{}".\n'
                              '{}'.format(
                                  var_name,
                                  '\n'.join(map(lambda c: str(c), cubes))))
        raise ValueError('Found too many cubes for var_names {}. '
                         'See log for details.'.format(inconsistent_var_names))
42
    return cubes
Klaus Zimmermann's avatar
Klaus Zimmermann committed
43
44


45
def save(result, output_filename, sliced_mode=False, client=None):
Klaus Zimmermann's avatar
Klaus Zimmermann committed
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
    if sliced_mode:
        logging.info('Performing aggregation in sliced mode')
        data = result.core_data()
        logging.info('creating empty data')
        result.data = np.empty(data.shape, data.dtype)
        result.data
        logging.info('saving empty cube')
        iris.save(result, output_filename, fill_value=MISSVAL,
                  local_keys=['proposed_standard_name'])
        logging.info('opening')
        result.data = data
        with netCDF4.Dataset(output_filename, 'a') as ds:
            var = ds[result.var_name]
            time_dim = result.coord_dims('time')[0]
            no_slices = result.shape[time_dim]

            def store(i, data):
                var[i, ...] = data
            thread = threading.Thread()
            thread.start()
            start = time.time()
            for i, result_cube in enumerate(result.slices_over(time_dim)):
                logging.info(f'Starting with {result_cube.coord("time")}')
                result_cube.data
                logging.info('Waiting for previous save to finish')
                thread.join()
                thread = threading.Thread(target=store,
                                          args=(i, result_cube.data))
                thread.start()
                end = time.time()
                partial = end - start
                total_estimate = partial/(i+1)*no_slices
                logging.info(f'Finished {i+1}/{no_slices} in {partial:4f}s. '
                             f'Estimated total time is {total_estimate:4f}s.')
            thread.join()
    else:
        logging.info('Performing aggregation in normal mode')
        iris.save(result, output_filename, fill_value=MISSVAL,
                  local_keys=['proposed_standard_name'])