Commit dcd5951e authored by Klaus Zimmermann's avatar Klaus Zimmermann
Browse files

Improve iterative storage (closes #239)

parent d8c349e5
...@@ -53,7 +53,7 @@ def prepare_input_data(datafiles): ...@@ -53,7 +53,7 @@ def prepare_input_data(datafiles):
def save(result, output_filename, iterative_storage=False, client=None): def save(result, output_filename, iterative_storage=False, client=None):
data = result.core_data() data = result.core_data().rechunk()
if iterative_storage: if iterative_storage:
logger.info('Storing iteratively') logger.info('Storing iteratively')
logger.info('Creating empty data') logger.info('Creating empty data')
...@@ -62,7 +62,6 @@ def save(result, output_filename, iterative_storage=False, client=None): ...@@ -62,7 +62,6 @@ def save(result, output_filename, iterative_storage=False, client=None):
iris.save(result, output_filename, fill_value=MISSVAL, iris.save(result, output_filename, fill_value=MISSVAL,
local_keys=['proposed_standard_name']) local_keys=['proposed_standard_name'])
logger.info('Reopening output file and beginning storage') logger.info('Reopening output file and beginning storage')
result.data = data
with netCDF4.Dataset(output_filename, 'a') as ds: with netCDF4.Dataset(output_filename, 'a') as ds:
var = ds[result.var_name] var = ds[result.var_name]
time_dim = result.coord_dims('time')[0] time_dim = result.coord_dims('time')[0]
...@@ -70,19 +69,20 @@ def save(result, output_filename, iterative_storage=False, client=None): ...@@ -70,19 +69,20 @@ def save(result, output_filename, iterative_storage=False, client=None):
end = time.time() end = time.time()
cumulative = 0. cumulative = 0.
for i, result_data in enumerate(result.core_data()): start_index = 0
result_id = f'{i+1}/{no_slices}' for result_data in data.blocks:
r = result_data.persist() result_id = f'{start_index+1}/{no_slices}'
progress(r)
print()
logging.info(f'Storing result {result_id}')
var[i, ...] = r
logger.info(f'Storing partial result {result_id}') logger.info(f'Storing partial result {result_id}')
end_index = start_index+result_data.shape[0]
logger.debug(f'{start_index}:{end_index}')
logger.debug(f'{result_data.shape}')
var[start_index:end_index, ...] = result_data
start_index = end_index
start = end start = end
end = time.time() end = time.time()
last = end - start last = end - start
cumulative += last cumulative += last
eta = cumulative/(i+1)*no_slices eta = cumulative/(start_index+1)*no_slices
logger.info(f'Finished {result_id} in (last cum eta): ' logger.info(f'Finished {result_id} in (last cum eta): '
f'{last:4.0f} {cumulative:4.0f} {eta:4.0f}') f'{last:4.0f} {cumulative:4.0f} {eta:4.0f}')
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment