Handling dask arrays#
We have previously worked over applying functions to NumPy arrays contained in Xarray objects.
apply_ufunc
also lets you easily perform many of the steps involving in applying
functions that expect and return Dask arrays.
Learning goals:
Learn that
apply_ufunc
can automate aspects of applying computation functions on dask arraysIt is possible to automatically parallelize certain operations by providing
dask="parallelized"
In some cases, extra information needs to be provided such as sizes of any new dimensions added, or data types for output variables.
Learn that all the concepts from the numpy lessons carry over: like automatic vectorization and specifying input and output core dimensions.
Tip
We’ll reduce the length of error messages using %xmode minimal
See the ipython documentation for details.
Setup#
%xmode minimal
import dask
import numpy as np
import xarray as xr
# limit the amount of information printed to screen
xr.set_options(display_expand_data=False)
np.set_printoptions(threshold=10, edgeitems=2)
Exception reporting mode: Minimal
First lets set up a LocalCluster
using dask.distributed.
You can use any kind of dask cluster. This step is completely independent of xarray. While not strictly necessary, the dashboard provides a nice learning tool.
Client
Client-f0503d15-8a43-11ef-8d6f-000d3a9e3c97
Connection method: Cluster object | Cluster type: distributed.LocalCluster |
Dashboard: http://127.0.0.1:8787/status |
Cluster Info
LocalCluster
5d26669a
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 4 | Total memory: 15.61 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-1c48a924-44a5-41e0-ae00-60529911cd64
Comm: tcp://127.0.0.1:37233 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 4 |
Started: Just now | Total memory: 15.61 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:34223 | Total threads: 1 |
Dashboard: http://127.0.0.1:43141/status | Memory: 3.90 GiB |
Nanny: tcp://127.0.0.1:40521 | |
Local directory: /tmp/dask-scratch-space/worker-1e6bgts_ |
Worker: 1
Comm: tcp://127.0.0.1:38913 | Total threads: 1 |
Dashboard: http://127.0.0.1:38299/status | Memory: 3.90 GiB |
Nanny: tcp://127.0.0.1:35147 | |
Local directory: /tmp/dask-scratch-space/worker-4w_9bwmi |
Worker: 2
Comm: tcp://127.0.0.1:36273 | Total threads: 1 |
Dashboard: http://127.0.0.1:46147/status | Memory: 3.90 GiB |
Nanny: tcp://127.0.0.1:36339 | |
Local directory: /tmp/dask-scratch-space/worker-_jwntzib |
Worker: 3
Comm: tcp://127.0.0.1:34603 | Total threads: 1 |
Dashboard: http://127.0.0.1:34831/status | Memory: 3.90 GiB |
Nanny: tcp://127.0.0.1:40543 | |
Local directory: /tmp/dask-scratch-space/worker-_sb4yts9 |
👆
Click the Dashboard link above. Or click the "Search" button in the dashboard.Let’s test that the dashboard is working..
import dask.array
dask.array.ones((1000, 4), chunks=(2, 1)).compute() # should see activity in dashboard
array([[1., 1., 1., 1.],
[1., 1., 1., 1.],
...,
[1., 1., 1., 1.],
[1., 1., 1., 1.]])
Let’s open a dataset. We specify chunks
so that we create a dask arrays for the DataArrays
ds = xr.tutorial.open_dataset("air_temperature", chunks={"time": 100})
ds
<xarray.Dataset> Size: 31MB Dimensions: (lat: 25, time: 2920, lon: 53) Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat, lon) float64 31MB dask.array<chunksize=(100, 25, 53), meta=np.ndarray> Attributes: Conventions: COARDS title: 4x daily NMC reanalysis (1948) description: Data is from NMC initialized reanalysis\n(4x/day). These a... platform: Model references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
A simple example#
All the concepts from applying numpy functions carry over.
However the handling of dask arrays needs to be explicitly activated.
There are three options for the dask
kwarg.
dask : {"forbidden", "allowed", "parallelized"}, default: "forbidden"
How to handle applying to objects containing lazy data in the form of
dask arrays:
- 'forbidden' (default): raise an error if a dask array is encountered.
- 'allowed': pass dask arrays directly on to ``func``. Prefer this option if
``func`` natively supports dask arrays.
- 'parallelized': automatically parallelize ``func`` if any of the
inputs are a dask array by using :py:func:`dask.array.apply_gufunc`. Multiple output
arguments are supported. Only use this option if ``func`` does not natively
support dask arrays (e.g. converts them to numpy arrays).
We will work through the following two:
dask="allowed"
Dask arrays are passed to the user function. This is a good choice if your function can handle dask arrays and won’t compute the result unless explicitly requested.dask="parallelized"
. This applies the user function over blocks of the dask array usingdask.array.apply_gufunc
. This is useful when your function cannot handle dask arrays natively (e.g. scipy API).
# Expect an error here
def squared_error(x, y):
return (x - y) ** 2
xr.apply_ufunc(squared_error, ds.air, 1)
ValueError: apply_ufunc encountered a chunked array on an argument, but handling for chunked arrays has not been enabled. Either set the ``dask`` argument or load your data into memory first with ``.load()`` or ``.compute()``
A good thing to check is whether the applied function (here squared_error
) can handle pure dask arrays.
To do this call squared_error(ds.air.data, 1)
and make sure of the following:
That you don’t see any activity on the dask dashboard
That the returned result is a dask array.
squared_error(ds.air.data, 1)
|
Since squared_error
can handle dask arrays without computing them, we specify
dask="allowed"
.
sqer = xr.apply_ufunc(
squared_error,
ds.air,
1,
dask="allowed",
)
sqer # dask-backed DataArray! with nice metadata!
<xarray.DataArray 'air' (time: 2920, lat: 25, lon: 53)> Size: 31MB dask.array<chunksize=(100, 25, 53), meta=np.ndarray> Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00
Understanding what’s happening#
Let’s again use the wrapper trick to understand what squared_error
receives.
We see that it receives a dask array (analogous to the numpy array in the previous example).
def wrapper(x, y):
print(f"received x of type {type(x)}, shape {x.shape}")
print(f"received y of type {type(y)}")
return squared_error(x, y)
xr.apply_ufunc(wrapper, ds.air, 1, dask="allowed")
received x of type <class 'dask.array.core.Array'>, shape (2920, 25, 53)
received y of type <class 'int'>
<xarray.DataArray 'air' (time: 2920, lat: 25, lon: 53)> Size: 31MB dask.array<chunksize=(100, 25, 53), meta=np.ndarray> Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00
Core dimensions#
squared_error
operated on a per-element basis. How about a reduction like np.mean
?
Such functions involve the concept of “core dimensions”. This concept is independent of the underlying array type, and is a property of the applied function. See the core dimensions with NumPy tutorial for more.
Exercise
Use dask.array.mean
as an example of a function that can handle dask
arrays and uses an axis
kwarg.
Solution
def time_mean(da):
return xr.apply_ufunc(
dask.array.mean,
da,
input_core_dims=[["time"]],
dask="allowed",
kwargs={"axis": -1}, # core dimensions are moved to the end
)
time_mean(ds.air)
Again, this is identical to the built-in mean
def time_mean(da):
return xr.apply_ufunc(
dask.array.mean,
da,
input_core_dims=[["time"]],
dask="allowed",
kwargs={"axis": -1}, # core dimensions are moved to the end
)
ds.air.mean("time").identical(time_mean(ds.air))
True
Automatically parallelizing dask-unaware functions#
Basics#
Not all functions can handle dask arrays appropriately by default.
A very useful apply_ufunc
feature is the ability to apply arbitrary functions
in parallel to each block. This ability can be activated using
dask="parallelized"
.
We will use scipy.integrate.trapezoid
as an example of a function that cannot
handle dask arrays and requires a core dimension. If we call trapezoid
with a dask
array, we get a numpy array back that is, the values have been eagerly computed.
This is undesirable behaviour
import scipy as sp
import scipy.integrate
sp.integrate.trapezoid(
ds.air.data, axis=ds.air.get_axis_num("lon")
) # does NOT return a dask array, you should see activity on the dashboard
array([[12588.54 , 12582.26 , ..., 15430.04 , 15493.165],
[12571.84 , 12567.28 , ..., 15413.14 , 15477.345],
...,
[12726.68 , 12634.48 , ..., 15454.13 , 15511.48 ],
[12767.33 , 12630.78 , ..., 15495.53 , 15538.18 ]])
Let’s activate automatic parallelization by using apply_ufunc
with dask="parallelized"
integrated = xr.apply_ufunc(
sp.integrate.trapezoid,
ds,
input_core_dims=[["lon"]],
kwargs={"axis": -1},
dask="parallelized",
)
integrated
<xarray.Dataset> Size: 607kB Dimensions: (time: 2920, lat: 25) Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat) float64 584kB dask.array<chunksize=(100, 25), meta=np.ndarray>
And make sure the returned data is a dask array
integrated.air.data
|
Now you have control over executing this parallel computation.
# Dask -> Numpy array of integrated values
parallelized_results = integrated.compute()
parallelized_results
<xarray.Dataset> Size: 607kB Dimensions: (time: 2920, lat: 25) Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat) float64 584kB 1.259e+04 1.258e+04 ... 1.554e+04
Understanding dask="parallelized"
#
It is very important to understand what dask="parallelized"
does. To fully understand it, requires understanding some core concepts.
See also
For dask="parallelized"
apply_ufunc
will call dask.array.apply_gufunc
. See the dask documentation on generalized ufuncs and apply_gufunc
for more.
Embarrassingly parallel or blockwise operations#
dask="parallelized"
works well for “blockwise” or “embarrassingly parallel” operations (Wikipedia).
These are operations where one block or chunk of the output array corresponds to one block or chunk of the input array. Specifically, the blocks or chunks of the core dimension is what matters. Importantly, no communication between blocks is necessary to create the output, which makes parallelization quite simple or “embarrassing”.
Let’s look at the dask repr for ds
and note chunksizes are (100,25,53) for a array with shape (2920, 25, 53). This means that each block or chunk of the array contains all lat
, lon
points and a subset of time
points.
ds.air.data
|
The core dimension for trapezoid
is lon
, and there is only one chunk along lon
. This means that integrating along lon
is a “blockwise” or “embarrassingly parallel” operation and dask="parallelized"
works quite well.
Caution
Question
Do you understand why integrate(ds)
when ds
has a single chunk along lon
is a “embarrassingly parallel” operation?
Exercise
Apply the integrate function to ds
after rechunking to have a different chunksize along lon
using ds.chunk(lon=4)
(for example). What happens?
Solution
apply_ufunc
complains that it cannot automatically parallelize because the dataset ds
is now chunked along the core dimension lon
. You should see the following error:
ValueError: dimension lon on 0th function argument to apply_ufunc with dask='parallelized'
consists of multiple chunks, but is also a core dimension. To fix, either rechunk
into a single array chunk along this dimension, i.e.,
``.chunk(dict(lon=-1))``, or pass ``allow_rechunk=True`` in ``dask_gufunc_kwargs``
but beware that this may significantly increase memory usage.
Understanding execution#
We are layering many concepts together there so it is important to understand how the function is executed, and what input it will receive. Again we will use our wrapper trick.
def integrate_wrapper(array, **kwargs):
print(f"received array of type {type(array)}, shape {array.shape}")
result = sp.integrate.trapezoid(array, **kwargs)
print(f"received array of type {type(result)}, shape {result.shape}")
return result
integrated = xr.apply_ufunc(
integrate_wrapper,
ds,
input_core_dims=[["lon"]],
kwargs={"axis": -1},
dask="parallelized",
)
integrated
received array of type <class 'numpy.ndarray'>, shape (1, 1, 1)
received array of type <class 'numpy.ndarray'>, shape (1, 1)
<xarray.Dataset> Size: 607kB Dimensions: (time: 2920, lat: 25) Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat) float64 584kB dask.array<chunksize=(100, 25), meta=np.ndarray>
Note that we received an Xarray object back (integrated
) but our wrapper function was called with a numpy array of shape (1,1,1)
.
Important
the full 3D array has not yet been passed to integrate_wrapper
. Yet dask needs to know the shape and dtype of the result. This is key.
The integrate_wrapper
function is treated like a black box, and its effect on the inputs has to either be described through additional keyword arguments, or inferred by passing dummy inputs.
To do so, dask.array.apply_gufunc
calls the user function with dummy inputs (here a numpy array of shape (1,1,1)
), and inspects the returned value to understand that one dimension was removed (returned a numpy array of shape (1,1)
.
Caution
Some functions can have trouble handling such dummy inputs. Alternatively you can pass meta = np.ones((1,1))
in dask_gufunc_kwargs
to prevent dask from providing dummy inputs to the array.
xr.apply_ufunc(
integrate_wrapper,
ds,
input_core_dims=[["lon"]],
kwargs={"axis": -1},
dask="parallelized",
dask_gufunc_kwargs={"meta": np.ones((1,1))},
)
Since no errors were raised we proceed as-is.
Let’s compute the array to get real values.
integrated.compute()
<xarray.Dataset> Size: 607kB Dimensions: (time: 2920, lat: 25) Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Data variables: air (time, lat) float64 584kB 1.259e+04 1.258e+04 ... 1.554e+04
We see that integrate_wrapper
is called many times! As many times as there are blocks in the array in fact, which is 30 here (ds.air.data.numblocks
).
Our function is independently executed on each block of the array, and then the results are concatenated to form the final result.
Conceptually, there is a two-way flow of information between various packages when executing integrated.compute()
:
xarray.apply_ufunc
↔ dask.array.apply_gufunc
↔ integrate_wrapper
↔ scipy.integrate.trapezoid
↔ ds.air.data
When executed
Xarray loops over all data variables.
Xarray unwraps the underlying dask array (e.g.
ds.air
) and passes that to dask’sapply_gufunc
.apply_gufunc
callsintegrate_wrapper
on each block of the array.For each block,
integrate_wrapper
callsscipy.integrate.trapezoid
and returns one block of the output array.dask stitches all the output blocks to form the output array.
xarray.apply_ufunc
wraps the output array with Xarray metadata to give the final result.
Phew!
More complex situations#
Here we quickly demonstrate that all the concepts from the numpy material earlier carry over.
Xarray needs a lot of extra metadata, so depending
on the function, extra arguments such as output_dtypes
and output_sizes
may
be necessary for supporting dask arrays. We demonstrate this below.
Adding new dimensions#
We use the np.expand_dims
to change the size of the input along a single dimension.
def add_new_dim(array):
return np.expand_dims(array, axis=0)
When automatically parallelizing with dask
, we need to provide some more information about the outputs.
When adding a new dimensions, we need to provide the size in
dask_gufunc_kwargs
using the keyoutput_sizes
Usually we need provide the datatype or
dtype
of the returned array. Usually the dtype of the input is a good guess.
def add_new_dim(array):
return np.expand_dims(array, axis=-1)
xr.apply_ufunc(
add_new_dim, # first the function
ds.air.chunk({"time": 2, "lon": 2}),
output_core_dims=[["newdim"]],
dask="parallelized",
)
ValueError: dimension 'newdim' in 'output_core_dims' needs corresponding (dim, size) in 'output_sizes'
Provide the size of the newly added dimension newdim
in output_sizes
as part of the dask_gufunc_kwargs
keyword argument:
dask_gufunc_kwargs (dict, optional) – Optional keyword arguments passed to dask.array.apply_gufunc()
if dask=’parallelized’. Possible keywords are output_sizes, allow_rechunk and meta.
The syntax is
dask_gufunc_kwargs={
"output_sizes": {"newdim": 1}
}
xr.apply_ufunc(
add_new_dim, # first the function
ds.air.chunk({"time": 2, "lon": 2}),
output_core_dims=[["newdim"]],
dask="parallelized",
dask_gufunc_kwargs={"output_sizes": {"newdim": 1}},
)
<xarray.DataArray 'air' (time: 2920, lat: 25, lon: 53, newdim: 1)> Size: 31MB dask.array<chunksize=(2, 25, 2, 1), meta=np.ndarray> Coordinates: * lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0 * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Dimensions without coordinates: newdim
Dimensions that change size#
We will now repeat the interpolation example from earlier with "lat"
as the output core dimension. See the numpy notebook on complex output for more.
newlat = np.linspace(15, 75, 100)
xr.apply_ufunc(
np.interp,
newlat,
ds.air.lat,
ds.air.chunk({"time": 2, "lon": 2}),
input_core_dims=[["lat"], ["lat"], ["lat"]],
output_core_dims=[["lat"]],
exclude_dims={"lat"},
)
We will first add dask="parallelized"
and provide output_sizes
in dask_gufunc_kwargs
newlat = np.linspace(15, 75, 100)
xr.apply_ufunc(
np.interp, # first the function
newlat,
ds.air.lat,
ds.air.chunk({"time": 2, "lon": 2}),
input_core_dims=[["lat"], ["lat"], ["lat"]],
output_core_dims=[["lat"]],
exclude_dims={"lat"},
# The following are dask-specific
dask="parallelized",
dask_gufunc_kwargs=dict(output_sizes={"lat": len(newlat)}),
)
ValueError: `dtype` inference failed in `apply_gufunc`.
Please specify the dtype explicitly using the `output_dtypes` kwarg.
Original error is below:
------------------------
ValueError('object too deep for desired array')
Traceback:
---------
File "/home/runner/micromamba/envs/xarray-tutorial/lib/python3.12/site-packages/dask/array/core.py", line 456, in apply_infer_dtype
o = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/micromamba/envs/xarray-tutorial/lib/python3.12/site-packages/numpy/lib/function_base.py", line 1599, in interp
return interp_func(x, xp, fp, left, right)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This error means that we need to provide output_dtypes
output_dtypes (list of dtype, optional) – Optional list of output dtypes.
Only used if dask='parallelized' or vectorize=True.
newlat = np.linspace(15, 75, 100)
xr.apply_ufunc(
np.interp, # first the function
newlat,
ds.air.lat,
ds.air.chunk({"time": 100, "lon": -1}),
input_core_dims=[["lat"], ["lat"], ["lat"]],
output_core_dims=[["lat"]],
exclude_dims={"lat"},
# The following are dask-specific
dask="parallelized",
dask_gufunc_kwargs=dict(output_sizes={"lat": len(newlat)}),
output_dtypes=[ds.air.dtype],
)
<xarray.DataArray (time: 2920, lon: 53, lat: 100)> Size: 124MB dask.array<chunksize=(100, 53, 100), meta=np.ndarray> Coordinates: * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Dimensions without coordinates: lat
Tip
Dask can sometimes figure out the output sizes and dtypes. The usual workflow is to read the error messages and iteratively pass more information to apply_ufunc
.
Automatic Vectorizing#
Automatic vectorizing with vectorize=True
also carries over!
interped = xr.apply_ufunc(
np.interp, # first the function
newlat,
ds.air.lat,
ds.chunk({"time": 100, "lon": -1}),
input_core_dims=[["lat"], ["lat"], ["lat"]],
output_core_dims=[["lat"]],
exclude_dims={"lat"}, # dimensions allowed to change size. Must be set!
dask="parallelized",
dask_gufunc_kwargs=dict(output_sizes={"lat": len(newlat)}),
output_dtypes=[ds.air.dtype],
vectorize=True,
)
interped
<xarray.Dataset> Size: 124MB Dimensions: (time: 2920, lon: 53, lat: 100) Coordinates: * lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0 * time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00 Dimensions without coordinates: lat Data variables: air (time, lon, lat) float64 124MB dask.array<chunksize=(100, 53, 100), meta=np.ndarray>
Again, it is important to understand the conceptual flow of information between the variuus packages when executing interped.compute()
which looks ilke
xarray.apply_ufunc
↔ dask.array.apply_gufunc
↔ numpy.vectorize
↔ numpy.interp
When executed
Xarray loops over all data variables.
Xarray unwraps the underlying dask array (e.g.
ds.air
) and passes that to dask’sapply_gufunc
.apply_gufunc
calls the vectorized function on each block of the array.For each block,
numpy.vectorize
handles looping over the loop dimensions and passes 1D vectors along the core dimension tonumpy.interp
The 1D results for each block are concatenated by
numpy.vectorize
to create one output block.dask stitches all the output blocks to form the output array.
xarray.apply_ufunc
wraps the output array with Xarray metadata to give the final result.
Phew!
Clean up the cluster#
client.close();