main.py 6.36 KB
Newer Older
Klaus Zimmermann's avatar
Klaus Zimmermann committed
1
2
3
4
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
Klaus Zimmermann's avatar
Klaus Zimmermann committed
5
import logging
Klaus Zimmermann's avatar
Klaus Zimmermann committed
6
7
8
import os
import time

9
import sentry_sdk
Klaus Zimmermann's avatar
Klaus Zimmermann committed
10

Klaus Zimmermann's avatar
Klaus Zimmermann committed
11
from . import __version__
12
from .dask_setup import SCHEDULERS, setup_scheduler
Klaus Zimmermann's avatar
Klaus Zimmermann committed
13
from .datahandling import prepare_input_data, save
14
from .metadata import load_metadata
Klaus Zimmermann's avatar
Klaus Zimmermann committed
15
16


17
18
19
20
21
22
23
LOG_LEVELS = {
    'debug': logging.DEBUG,
    'info': logging.INFO,
    'warning': logging.WARNING,
    'error': logging.ERROR,
    'critical': logging.CRITICAL,
}
Klaus Zimmermann's avatar
Klaus Zimmermann committed
24
25
26
27


def parse_args():
    parser = argparse.ArgumentParser(
Klaus Zimmermann's avatar
Klaus Zimmermann committed
28
        description=(f'A climate index thing, version {__version__}.'))
29
30
31
32
33
34
    parser.add_argument('-l', '--log-level', choices=LOG_LEVELS.keys(),
                        default='info',
                        help='the lowest priority level '
                        'of log messages to display')
    parser.add_argument('-v', '--verbose', action='store_true',
                        help='write more detailed log messages')
35
36
37
    parser.add_argument('-e', '--deactivate-error-tracking',
                        action='store_true',
                        help='deactivate sentry based error tracking')
38
    parser.add_argument('-d', '--dask-scheduler',
39
40
41
42
                        default='distributed-local-cluster')
    parser.add_argument('-k', '--keep-open', action='store_true',
                        help='keep climix running until key press '
                        '(useful for debugging)')
43
44
45
    parser.add_argument('-s', '--sliced-mode', action='store_true',
                        help='activate calculation per period to avoid memory '
                        'problems')
46
47
    parser.add_argument('-i', '--iterative-storage', action='store_true',
                        help='store results iteratively per period')
48
49
50
51
    parser.add_argument('-o', '--output', dest='output_template',
                        help='output filename')
    parser.add_argument('-x', '--index', action='append',
                        required=True, metavar='INDEX', dest='indices',
Klaus Zimmermann's avatar
Klaus Zimmermann committed
52
53
54
55
                        help='the index to calculcate '
                        '(use "-x list" to get a list '
                        'of all available indices)')
    parser.add_argument('datafiles', nargs='*', metavar="DATAFILE",
56
                        help='the input data files')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
57
58
59
    return parser.parse_args()


60
61
62
63
64
65
66
67
68
def setup_logging(log_level, verbose=False):
    if verbose:
        format = ('%(relativeCreated)8dms:%(filename)s:%(funcName)s() '
                  '%(levelname)s:%(name)s:%(message)s')
    else:
        format = '%(levelname)s:%(name)s:%(message)s'
    logging.basicConfig(level=LOG_LEVELS[log_level], format=format)


69
def init_sentry():
70
    logging.info('Activating sentry (automatic error reporting)')
71
72
73
74
    sentry_sdk.init(
        "https://d3ac73a62877407b848dfc3f318bed85@sentry.io/1458386")


Klaus Zimmermann's avatar
Klaus Zimmermann committed
75
def guess_output_template(datafiles):
76
    output_template = '{var_name}_{frequency}.nc'
77

Klaus Zimmermann's avatar
Klaus Zimmermann committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
    def filename_stripper(path):
        # remove directory part...
        basename = os.path.basename(path)
        # ...and extension
        root, ext = os.path.splitext(basename)
        # split at _...
        parts = root.split('_')
        # and remove
        # first part (usually variable)
        # and last part (usually time)
        base = '_'.join(parts[1:-1])
        try:
            time = [int(t) for t in parts[-1].split('-')]
            if len(time) == 1:
                time *= 2
        except ValueError:
            time = [None, None]
        return (base, time[0], time[1])
    files = [filename_stripper(p) for p in datafiles]
    bases, starts, ends = zip(*files)
    unique_bases = set(bases)
    if len(unique_bases) == 1:
100
        base = unique_bases.pop()
Klaus Zimmermann's avatar
Klaus Zimmermann committed
101
102
        start = min(starts)
        end = max(ends)
103
104
105
106
107
108
109
        if '_day_' in base:
            base = base.replace('_day_', '_{frequency}_')
        elif base.endswith('_day'):
            base = base.replace('_day', '_{frequency}')
        else:
            base += '_{frequency}'
        output_template = f'{{var_name}}_{base}_{start}-{end}.nc'
Klaus Zimmermann's avatar
Klaus Zimmermann committed
110
111
112
113
114
115
    return output_template


def build_output_filename(index, datafiles, output_template):
    if output_template is None:
        output_template = guess_output_template(datafiles)
116
    frequency = index.period.label
117
    return output_template.format(frequency=frequency,
118
                                  **index.metadata.output.drs)
Klaus Zimmermann's avatar
Klaus Zimmermann committed
119
120


Klaus Zimmermann's avatar
Klaus Zimmermann committed
121
def do_main(index_catalog, requested_indices, datafiles,
122
            output_template, sliced_mode, iterative_storage, scheduler):
123
    logging.debug('Preparing indices')
124
    indices = index_catalog.prepare_indices(requested_indices)
125
    for index_no, index in enumerate(indices):
126
127
128
129
        #
        ## Below one would preferably want to know from which file the index
        ## definition comes rather than an abstract python object reference
        #
130
131
        logging.info('Starting calculations for index '
                     f'{requested_indices[index_no]} in {index}')
132
        logging.debug('Building output filename')
133
134
135
        output_filename = build_output_filename(index,
                                                datafiles,
                                                output_template)
136
        logging.debug('Preparing input data')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
137
        input_data = prepare_input_data(datafiles)
138
        logging.debug('Calculating index')
139
140
        result = index(input_data,
                       client=scheduler.client, sliced_mode=sliced_mode)
141
142
        logging.info(
            f'Saving result in {os.path.join(os.getcwd(), output_filename)}')
143
        save(result, output_filename, iterative_storage, scheduler.client)
Klaus Zimmermann's avatar
Klaus Zimmermann committed
144
145
146
147


def main():
    args = parse_args()
148
    setup_logging(args.log_level, args.verbose)
149
150
    if not args.deactivate_error_tracking:
        init_sentry()
Klaus Zimmermann's avatar
Klaus Zimmermann committed
151

152
    logging.debug('Loading metadata')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
153
154
    index_catalog = load_metadata()
    if 'list' in args.indices:
155
        print('Available indices are:')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
156
157
158
        print(list(index_catalog.get_list()))
        return

159
    with setup_scheduler(args) as scheduler:
160
        logging.debug('Scheduler ready; starting main program.')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
161
        start = time.time()
162
        try:
Klaus Zimmermann's avatar
Klaus Zimmermann committed
163
            do_main(index_catalog, args.indices, args.datafiles,
164
165
                    args.output_template,
                    args.sliced_mode, args.iterative_storage, scheduler)
166
167
168
169
170
        finally:
            end = time.time()
            logging.info(f'Calculation took {end-start:.4f} seconds.')
        if args.keep_open:
            input('Press enter to close the cluster ')
Klaus Zimmermann's avatar
Klaus Zimmermann committed
171
172
173
174


if __name__ == "__main__":
    main()