Replies: 4 comments
-
Try adding |
Beta Was this translation helpful? Give feedback.
-
Actually it looks like this example is relevant: https://xarray.pydata.org/en/stable/examples/apply_ufunc_vectorize_1d.html |
Beta Was this translation helpful? Give feedback.
-
Thanks for the really quick response. I tried chunking and that is when the performance gets worse (from 5msec per interp to 70msec). I just added the exact chunking you suggested and tested again as well. It is slower but it also only seems to be using 10% of one CPU. I will have to wait until tomorrow to have a try of the ufunc method. It sounds like it might be a workaround so thanks for that suggestion. I still think there is a bug in this case (either in my code or in xarray). |
Beta Was this translation helpful? Give feedback.
-
Hi dcherian, I had a look at the apply_ufunc() example you linked and have re-implemented my code. The example helped me understand apply_ufunc() usage better but is very different from my use case and I still am unable to parallelize using dask. The key difference is apply_ufunc() as described in the docs and the example, applys a function to a vector of data of a single type (in the example case it is air temperature across the 3 dimensions lat,long,time). Where as I need to apply an operation using heterogeneous data (depth_bins, lower_limit, upper_limit) over a single dimension (time) to produce a new array of depths over time (which is why I tried groupby/map initially). Anyhow, I have an implementation using apply_ufunc() that works using xarray and numpy arrays with apply_ufunc(), but when I try to parallelize it using dask my ufunc is called with empty arrays by xarray and it fails. I.e. You can see when running the code below it logs the following when entering the ufunc: I was expecting this to be called once for each chunk with 1000 items for each array. Have I done something wrong in this work-around for the groupby/map code? Thanks, import sys
import math
import logging
import dask
import xarray
import numpy
logger = logging.getLogger('main')
if __name__ == '__main__':
logging.basicConfig(
stream=sys.stdout,
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
logger.info('Starting dask client')
client = dask.distributed.Client()
SIZE = 3000
SONAR_BINS = 2000
upper_limit = numpy.random.randint(0, 10, (SIZE))
lower_limit = numpy.random.randint(20, 30, (SIZE))
sonar_data = numpy.random.randint(0, 255, (SIZE, SONAR_BINS))
time = range(0, SIZE)
channel = xarray.Dataset({
'upper_limit': (['time'], upper_limit, {'units': 'depth meters'}),
'lower_limit': (['time'], lower_limit, {'units': 'depth meters'}),
'data': (['time', 'depth_bin'], sonar_data, {'units': 'amplitude'}),
},
coords={
'time': (['time'], time),
'depth_bin': (['depth_bin'], range(0,SONAR_BINS)),
})
logger.info('get overall min/max radar range we want to normalize to called the adjusted range')
adjusted_min, adjusted_max = channel.upper_limit.min().values.item(), channel.lower_limit.max().values.item()
adjusted_min = math.floor(adjusted_min)
adjusted_max = math.ceil(adjusted_max)
logger.info('adjusted_min: %s, adjusted_max: %s', adjusted_min, adjusted_max)
bin_count = len(channel.depth_bin)
logger.info('bin_count: %s', bin_count)
adjusted_depth_per_bin = (adjusted_max - adjusted_min) / bin_count
logger.info('adjusted_depth_per_bin: %s', adjusted_depth_per_bin)
adjusted_bin_depths = [adjusted_min + (j * adjusted_depth_per_bin) for j in range(0, bin_count)]
logger.info('adjusted_bin_depths[0]: %s ... [-1]: %s', adjusted_bin_depths[0], adjusted_bin_depths[-1])
def InterpSingle(unadjusted_depth_amplitudes, unadjusted_min, unadjusted_max, time):
if (time % 1000) == 0:
total = len(channel.time)
perc = 100.0 * time / total
logger.info('%s : %s of %s', perc, time, total)
unadjusted_depth_per_bin = (unadjusted_max - unadjusted_min) / bin_count
min_index = (adjusted_min - unadjusted_min) / unadjusted_depth_per_bin
max_index = ((adjusted_min + ((bin_count - 1) * adjusted_depth_per_bin)) - unadjusted_min) / unadjusted_depth_per_bin
index_mapping = numpy.linspace(min_index, max_index, bin_count)
adjusted_depth_amplitudes = numpy.interp(index_mapping, range(0, len(unadjusted_depth_amplitudes)), unadjusted_depth_amplitudes, left=0, right=0)
return adjusted_depth_amplitudes
def Interp(*args, **kwargs):
logger.info('args: %s, kwargs: %s', args, kwargs)
data = args[0]
upper_limit = args[1]
lower_limit = args[2]
time = args[3]
#logger.info('data: %s len(data[0]): %s data.shape: %s', data, len(data[0]), data.shape)
adjusted = []
for i in range(0, len(upper_limit)):
d = data[i]
u = upper_limit[i]
l = lower_limit[i]
t = time[i]
result = InterpSingle(d, u, l, t)
adjusted.append(result)
#logger.info('adjusted: %s', adjusted)
return adjusted
channel = channel.chunk({'time':1000}) # Comment this line out to disable dask
#logger.info('Channel: %s', channel)
#logger.info('shape of data: %s len(data[0]): %s', channel.data.shape, len(channel.data[0]))
m2_lazy = xarray.apply_ufunc(
Interp,
channel.data,
channel.upper_limit,
channel.lower_limit,
channel.time,
input_core_dims=[['depth_bin'], [], [], []],
output_core_dims=[['depth']],
dask='parallelized', # Comment this line out to disable dask
output_dtypes=[numpy.dtype(numpy.int32)], # Comment this line out to disable dask
output_sizes={'depth':len(adjusted_bin_depths)}, # Comment this line out to disable dask
)
m2 = m2_lazy.compute() # Comment this line out to disable dask
m2 = m2.assign_coords({'depth':adjusted_bin_depths})
logger.info('Closing dask client')
client.close()
logger.info('Exit process') |
Beta Was this translation helpful? Give feedback.
-
MCVE Code Sample
Expected Output
I am fairly new to xarray but feel this example could have been executed a bit better than xarray currenty does. Each map call of the above custom function should be possible to be parallelized from what I can tell. I imagined that in the backend, xarray would have chunked it and run in parallel on dask. However I find it is VERY slow even for single threaded case but also that it doesn't seem to parallelize.
It takes roughly 5msec per map call in my hardware when I don't include the chunk and 70msec with the chunk call you can find in the code.
Problem Description
The single threaded performance is super slow, but also it fails to parallelize the computations across the cores on my machine.
If you are after more background to what I am trying to do, I also asked a SO question about how to re-organize the code to improve performance. I felt the current behavior though is a performance bug (assuming I didn't do something completely wrong in the code).
https://stackoverflow.com/questions/60103317/can-the-performance-of-using-xarray-groupby-map-be-improved
Output of
xr.show_versions()
INSTALLED VERSIONS
commit: None
python: 3.7.6 | packaged by conda-forge | (default, Jan 7 2020, 21:48:41) [MSC v.1916 64 bit (AMD64)]
python-bits: 64
OS: Windows
OS-release: 10
machine: AMD64
processor: Intel64 Family 6 Model 142 Stepping 10, GenuineIntel
byteorder: little
LC_ALL: None
LANG: None
LOCALE: None.None
libhdf5: 1.10.4
libnetcdf: 4.6.1
xarray: 0.14.1
pandas: 0.25.3
numpy: 1.17.3
scipy: 1.3.1
netCDF4: 1.4.2
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: None
cftime: 1.0.4.2
nc_time_axis: None
PseudoNetCDF: None
rasterio: 1.1.2
cfgrib: None
iris: None
bottleneck: None
dask: 2.9.1
distributed: 2.9.1
matplotlib: 3.1.1
cartopy: 0.17.0
seaborn: None
numbagg: None
setuptools: 44.0.0.post20200102
pip: 19.3.1
conda: None
pytest: None
IPython: 7.11.1
sphinx: None
Beta Was this translation helpful? Give feedback.
All reactions