Datashader is designed to make it simple to work with even very large datasets. To get good performance, it is essential that each step in the overall processing pipeline be set up appropriately. Below we share some of our suggestions based on our own benchmarking and optimization experience, which should help you obtain suitable performance in your own work.
>>> import dask.dataframe as dd >>> dd.to_parquet(filename, df, compression="SNAPPY")
If your data includes categorical values that take on a limited, fixed number of possible values (e.g. "Male", "Female"), Parquet's categorical columns use a more memory-efficient data representation and are optimized for common operations such as sorting and finding uniques. Before saving, just convert the column as follows:
>>> df[colname] = df[colname].astype('category')
By default, numerical datasets typically use 64-bit floats, but many applications do not require 64-bit precision when aggregating over a very large number of datapoints to show a distribution. Using 32-bit floats reduces storage and memory requirements in half, and also typically greatly speeds up computations because only half as much data needs to be accessed in memory. If applicable to your particular situation, just convert the data type before generating the file:
>>> df[colname] = df[colname].astype(numpy.float32)
Datashader performance will vary significantly depending on the library and specific data object type used to represent the data in Python, because different libraries and data objects have very different abilities to use the available processing power and memory. Moreover, different libraries and objects are appropriate for different types of data, due to how they organize and store the data internally as well as the operations they provide for working with the data. The various objects available from the supported libraries all fall into one of the following three types of data structures:
- Columnar (tabular) data: Relational, table-like data consisting of arbitrarily many rows, each with data for a fixed number of columns. For example, if you track the location of a particular cell phone over time, each time sampled would be a row, and for each time there could be columns for the latitude and longitude for the location at that time.
- Multidimensional (nd-array) data: Data laid out in n dimensions, where n is typically >1. For example, you might have the precipitation measured on a latitude and longitude grid covering the whole world, for every time at which precipitation was measured. Such data could be stored columnarly, but it would be very inefficient; instead it is stored as a three dimensional array of precipitation values, indexed with time, latitude, and longitude.
- Ragged arrays: Relational/columnar data where the value of at least one column is a list of values that could vary in length for each row. For example, you may have a table with one row per US state and columns for population, land area, and the geographic shape of that state. Here the shape would be stored as a polygon consisting of an arbitrarily long list of latitude and longitude coordinates, which does not fit efficiently into a standard columnar data structure due to its ragged (variable length) nature.
As you can see, all three examples include latitude and longitude values, but they are very different data structures that need to be stored differently for them to be processed efficiently.
Apart from the data structure involved, the data libraries and objects differ by how they handle computation:
- Single-core CPU: All processing is done serially on a single core of a single CPU on one machine. This is the most common and straightforward implementation, but the slowest, as there are generally other processing resources available that could be used.
- Multi-core CPU: All processing is done on a single CPU, but using multiple threads running on multiple separate cores. This approach is able to make use of all of the CPU resources available on a given machine, but cannot use multiple machines.
- Distributed CPU: Processing is distributed across multiple cores that may be on multiple CPUs in a cluster or a set of cloud-based machines. This approach can be much faster than single-CPU approaches when many CPUs are available.
- GPU: Processing is done not on the CPU but on a separate general-purpose graphics-processing unit (GP-GPU). The GPU typically has far more (though individually less powerful) cores available than a CPU does, and for highly parallelizable computations like those in Datashader a GPU can typically achieve much faster performance at a given price point than a CPU or distributed set of CPUs can. However, not all machines have a supported GPU, memory available on the GPU is often limited, and it takes special effort to support a GPU (e.g. to avoid expensive copying of data between CPU and GPU memory), and so not all CPU code has been rewritten appropriately.
- Distributed GPUs: When there are multiple GPUs available, processing can be distributed across all of the GPUs to further speed it up or to fit large problems into the larger total amount of GPU memory available across all of them.
Finally, libraries differ by whether they can handle datasets larger than memory:
- In-core processing: All data must fit into the directly addressable memory space available (e.g. RAM for a CPU); larger datasets have to be explicitly split up and processed separately.
- Out-of-core processing: The data library can process data in chunks that it reads in as needed, allowing it to work with data much larger than the available memory (at the cost of having to read in those chunks each time they are needed, instead of simply referring to the data in memory).
Given all of these options, the data objects currently supported by Datashader are:
- Pandas DataFrame: Basic single-core, single CPU, CPU-only, in-core, non-ragged columnar data support, typically lower performance than the other options where those are supported.
- Dask DataFrame (using Pandas DataFrame internally): Multi-core, multi-CPU, in-core or out-of-core, distributed CPU computation.
- cuDF: Single NVIDIA-hardware GPU DataFrame.
- Dask DataFrame (using cuDF DataFrame internally): Distributed GPUs with a Dask API on multiple NVIDIA GPUs on the same or different machines.
- SpatialPandas DataFrame: Pandas DataFrame with support for ragged arrays and spatial indexing (efficient access of spatially distributed values), typically using one core of one CPU.
- Dask (using SpatialPandas DataFrame internally): Distributed CPU processing, in-core or out-of-core, using Dask's DataFrame API built on SpatialPandas.
- Xarray+NumPy: Multidimensional data operation built on NumPy arrays.
- Xarray+DaskArray: Dask-based multidimensional array processing built on Dask arrays, with support for distributed (multi-CPU) operation.
- Xarray+CuPy: Multidimensional array processing built on CuPy arrays, with storage and processing on a single NVIDIA GPU.
Datashader's current release supports these libraries for nearly all of the Canvas glyph types (points, lines, etc.) where they would naturally apply. Supported combinations of glyph and data library are listed in this table, where the entries mean:
- Yes: Supported
- No: Not (yet) supported, but could be with sufficient development effort (feel free to contribute effort or funding!)
- -: Not supported because that combination is not normally appropriate or useful (e.g. columnar data libraries do not currently provide efficient multidimensional array support)
|Glyph||PandasDF||DaskDF + PandasDF||cuDF||DaskDF + cuDF||SpatialPandasDF||Dask + SpatialPandasDF||Xarray + NumPy||Xarray + DaskArray||Xarray + CuPy|
In general, all it takes to use the indicated data library for a particular glyph type is to instantiate a DataFrame (Pandas, Dask, cuDF, SpatialPandas) or DataArray/DataSet (Xarray), and then pass it to the appropriate
ds.Canvas method call, as illustrated in the various examples in the user guide and topics.
Using Dask efficiently¶
Even on a single machine, a Dask DataFrame typically give higher performance than Pandas, because it makes good use of all available cores, and it also supports out-of-core operation for datasets larger than memory.
Dasks works on chunks of the data at any one time, called partitions.
With Dask on a single machine, a rule of thumb for the number of
partitions to use is
multiprocessing.cpu_count(), which allows Dask to
use one thread per core for parallelizing computations.
When the entire dataset fits into memory at once, you can (and should) persist the data as a Dask dataframe prior to passing it into Datashader, to ensure that data only needs to be loaded once:
>>> from dask import dataframe as dd >>> import multiprocessing as mp >>> dask_df = dd.from_pandas(df, npartitions=mp.cpu_count()) >>> dask_df.persist() ... >>> cvs = datashader.Canvas(...) >>> agg = cvs.points(dask_df, ...)