66import logging
77import os
88
9+ import xarray as xr
910from rex .utilities .loggers import init_logger
1011
1112from sup3r .preprocessing .cachers import Cacher
13+ from sup3r .preprocessing .names import Dimension
1214from sup3r .utilities .utilities import xr_open_mfdataset
1315
1416from .base import BaseCollector
@@ -32,6 +34,12 @@ def collect(
3234 ):
3335 """Collect data files from a dir to one output file.
3436
37+ TODO: This assumes that if there is any spatial chunking it is split
38+ by latitude. This should be generalized to allow for any spatial
39+ chunking and any dimension. This will either require a new file
40+ naming scheme with a spatial index for both latitude and
41+ longitude or checking each chunk to see how they are split.
42+
3543 Filename requirements:
3644 - Should end with ".nc"
3745
@@ -76,10 +84,19 @@ def collect(
7684 logger .info (f'overwrite=True, removing { out_file } .' )
7785 os .remove (out_file )
7886
87+ spatial_chunks = collector .group_spatial_chunks ()
88+
7989 tmp_file = out_file + '.tmp'
8090 if not os .path .exists (tmp_file ):
81- res_kwargs = res_kwargs or {}
82- out = xr_open_mfdataset (collector .flist , ** res_kwargs )
91+ res_kwargs = res_kwargs or {
92+ 'combine' : 'nested' ,
93+ 'concat_dim' : Dimension .TIME ,
94+ }
95+ for s_idx in spatial_chunks :
96+ spatial_chunks [s_idx ] = xr_open_mfdataset (
97+ spatial_chunks [s_idx ], ** res_kwargs
98+ )
99+ out = xr .concat (spatial_chunks .values (), dim = Dimension .SOUTH_NORTH )
83100 Cacher .write_netcdf (tmp_file , data = out , features = features )
84101
85102 os .replace (tmp_file , out_file )
@@ -88,12 +105,12 @@ def collect(
88105 logger .info ('Finished file collection.' )
89106
90107 def group_spatial_chunks (self ):
91- """Group same spatial chunks together so each chunk has same spatial
108+ """Group same spatial chunks together so each entry has same spatial
92109 footprint but different times"""
93110 chunks = {}
94111 for file in self .flist :
95- s_chunk = file . split ( '_' )[ 0 ]
96- dirname = os . path . dirname ( file )
97- s_file = os . path . join ( dirname , f's_ { s_chunk } .nc' )
98- chunks [s_file ] = [ * chunks . get ( s_file , []), s_file ]
112+ _ , s_idx = self . get_chunk_indices ( file )
113+ chunks [ s_idx ] = [ * chunks . get ( s_idx , []), file ]
114+ for k , v in chunks . items ():
115+ chunks [k ] = sorted ( v )
99116 return chunks
0 commit comments