#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import logging import os import time import sentry_sdk from . import __version__ from .dask_setup import SCHEDULERS, setup_scheduler from .datahandling import prepare_input_data, save from .metadata import load_metadata LOG_LEVELS = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL, } def parse_args(): parser = argparse.ArgumentParser( description=(f'A climate index thing, version {__version__}.')) parser.add_argument('-l', '--log-level', choices=LOG_LEVELS.keys(), default='info', help='the lowest priority level ' 'of log messages to display') parser.add_argument('-v', '--verbose', action='store_true', help='write more detailed log messages') parser.add_argument('-e', '--deactivate-error-tracking', action='store_true', help='deactivate sentry based error tracking') parser.add_argument('-d', '--dask-scheduler', default='distributed-local-cluster') parser.add_argument('-k', '--keep-open', action='store_true', help='keep climix running until key press ' '(useful for debugging)') parser.add_argument('-s', '--sliced-mode', action='store_true', help='activate calculation per period to avoid memory ' 'problems') parser.add_argument('-i', '--iterative-storage', action='store_true', help='store results iteratively per period') parser.add_argument('-o', '--output', dest='output_template', help='output filename') parser.add_argument('-x', '--index', action='append', required=True, metavar='INDEX', dest='indices', help='the index to calculcate ' '(use "-x list" to get a list ' 'of all available indices)') parser.add_argument('datafiles', nargs='*', metavar="DATAFILE", help='the input data files') return parser.parse_args() def setup_logging(log_level, verbose=False): if verbose: format = ('%(relativeCreated)8dms:%(filename)s:%(funcName)s() ' '%(levelname)s:%(name)s:%(message)s') else: format = '%(levelname)s:%(name)s:%(message)s' logging.basicConfig(level=LOG_LEVELS[log_level], format=format) def init_sentry(): logging.info('Activating sentry (automatic error reporting)') sentry_sdk.init( "https://d3ac73a62877407b848dfc3f318bed85@sentry.io/1458386") def guess_output_template(datafiles): output_template = '{var_name}_{frequency}.nc' def filename_stripper(path): # remove directory part... basename = os.path.basename(path) # ...and extension root, ext = os.path.splitext(basename) # split at _... parts = root.split('_') # and remove # first part (usually variable) # and last part (usually time) base = '_'.join(parts[1:-1]) try: time = [int(t) for t in parts[-1].split('-')] if len(time) == 1: time *= 2 except ValueError: time = [None, None] return (base, time[0], time[1]) files = [filename_stripper(p) for p in datafiles] bases, starts, ends = zip(*files) unique_bases = set(bases) if len(unique_bases) == 1: base = unique_bases.pop() start = min(starts) end = max(ends) if '_day_' in base: base = base.replace('_day_', '_{frequency}_') elif base.endswith('_day'): base = base.replace('_day', '_{frequency}') else: base += '_{frequency}' output_template = f'{{var_name}}_{base}_{start}-{end}.nc' return output_template def build_output_filename(index, datafiles, output_template): if output_template is None: output_template = guess_output_template(datafiles) frequency = index.period.label return output_template.format(frequency=frequency, **index.metadata.output.drs) def do_main(index_catalog, requested_indices, datafiles, output_template, sliced_mode, iterative_storage, scheduler): logging.debug('Preparing indices') indices = index_catalog.prepare_indices(requested_indices) for index_no, index in enumerate(indices): # ## Below one would preferably want to know from which file the index ## definition comes rather than an abstract python object reference # logging.info('Starting calculations for index ' f'{requested_indices[index_no]} in {index}') logging.debug('Building output filename') output_filename = build_output_filename(index, datafiles, output_template) logging.debug('Preparing input data') input_data = prepare_input_data(datafiles) logging.debug('Calculating index') result = index(input_data, client=scheduler.client, sliced_mode=sliced_mode) logging.info( f'Saving result in {os.path.join(os.getcwd(), output_filename)}') save(result, output_filename, iterative_storage, scheduler.client) def main(): args = parse_args() setup_logging(args.log_level, args.verbose) if not args.deactivate_error_tracking: init_sentry() logging.debug('Loading metadata') index_catalog = load_metadata() if 'list' in args.indices: print('Available indices are:') print(list(index_catalog.get_list())) return with setup_scheduler(args) as scheduler: logging.debug('Scheduler ready; starting main program.') start = time.time() try: do_main(index_catalog, args.indices, args.datafiles, args.output_template, args.sliced_mode, args.iterative_storage, scheduler) finally: end = time.time() logging.info(f'Calculation took {end-start:.4f} seconds.') if args.keep_open: input('Press enter to close the cluster ') if __name__ == "__main__": main()