Commit e5f1a3e7 authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Improved saving and sliced mode (closes #112, closes #113)

* Move sliced_mode to output layer in `save` in main
* Store individual slices as they become available
parent 2346cc51
......@@ -17,7 +17,7 @@ class Index:
output_metadata)
def __call__(self, cube, sliced_mode=False):
def __call__(self, cube):
logging.info('Adding coord categorisation.')
coord_name = self.period.add_coord_categorisation(cube)
logging.info('Extracting period cube')
......@@ -27,14 +27,5 @@ class Index:
logging.info('Setting up aggregation')
aggregated = sub_cube.aggregated_by(coord_name, self.aggregator,
period=self.period)
if sliced_mode:
results = []
for r in aggregated.slices_over(coord_name):
r.data
results.append(r)
result = iris.cube.CubeList(results).merge_cube()
else:
result = aggregated
result.data
result.attributes['frequency'] = self.period.label
return result
aggregated.attributes['frequency'] = self.period.label
return aggregated
......@@ -3,10 +3,12 @@
import argparse
import os
import threading
import time
import iris
from iris.experimental.equalise_cubes import equalise_attributes
import netCDF4
import numpy as np
import pkg_resources
import yaml
......@@ -162,6 +164,47 @@ def build_output_filename(index, datafiles, output_template):
return output_template.format(frequency=frequency, **index.output_metadata)
def save(result, output_filename, sliced_mode=False):
if sliced_mode:
logging.info('Performing aggregation in sliced mode')
data = result.core_data()
logging.info('creating empty data')
result.data = np.empty(data.shape, data.dtype)
result.data
logging.info('saving empty cube')
iris.save(result, output_filename, fill_value=MISSVAL,
local_keys=['proposed_standard_name'])
logging.info('opening')
result.data = data
with netCDF4.Dataset(output_filename, 'a') as ds:
var = ds[result.var_name]
time_dim = result.coord_dims('time')[0]
no_slices = result.shape[time_dim]
def store(i, data):
var[i, ...] = data
thread = threading.Thread()
thread.start()
start = time.time()
for i, result_cube in enumerate(result.slices_over(time_dim)):
logging.info('Starting with {}'.format(result_cube.coord('time')))
result_cube.data
logging.info('Waiting for previous save to finish')
thread.join()
thread = threading.Thread(target=store,
args=(i, result_cube.data))
thread.start()
end = time.time()
partial = end - start
total_estimate = partial/(i+1)*no_slices
logging.info(f'Finished {i+1}/{no_slices} in {partial:4f}s. '
f'Estimated total time is {total_estimate:4f}s.')
thread.join()
else:
logging.info('Performing aggregation in normal mode')
iris.save(result, output_filename, fill_value=MISSVAL,
local_keys=['proposed_standard_name'])
def do_main(requested_indices, datafiles, output_template, sliced_mode):
logging.info('Loading metadata')
metadata = load_metadata()
......@@ -174,9 +217,9 @@ def do_main(requested_indices, datafiles, output_template, sliced_mode):
logging.info('Preparing input data')
input_data = prepare_input_data(datafiles)
logging.info('Calculating index')
result = index(input_data, sliced_mode=sliced_mode)
iris.save(result, output_filename, fill_value=MISSVAL,
local_keys=['proposed_standard_name'])
result = index(input_data)
logging.info('Saving result')
save(result, output_filename, sliced_mode=sliced_mode)
def main():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment