Dask: Out-of-memory machine learning

Introduction

Why Dask? For those of us who regularly work with python machine learning libraries, the Pandas DataFrame library is a firm fixture in our toolkit. Pandas DataFrames allow fast and efficient manipulation of data and a host of data wrangling functions. And the processing is fast because pandas does its work 100% in memory. However, Data Science being what it is, the amounts of data that we want to typically process may exceed the RAM available to us. We either have to work with manageable amounts of data, use another framework (such as H2O, which does not insist on in-memory processing) or move to a cluster (Spark, Hadoop etc.).

Dask offers a further possibility. It is an extra level of abstraction on top of Pandas that uses (or can use) out-of-memory processing. Dask does this by using Pandas DataFrames for all internal processing. A Dask DataFrame actually consists of multiple Pandas DataFrames, but with added functionality to run this processing within available RAM. In the same way, a Dask array is simply a wrapper around one or more Numpy arrays (see the documentation).

This article will take a look at what this means in practice, by looking at two things:

  • integration with machine learning libraries (specifically, Keras) and
  • parallel row-by-row processing.

Dask overview

Directed Acyclic Graphs

Dask processes data by chaining tasks together. Each link of the chain is only resolved or computed on-demand. This is a bit like the lazy-loading we see when using e.g. Spark. Dask does not materialize results until we call specific methods (for example .compute() on a dask dataframe). This enables us to “map out” where we are going with our processing, to visualize it, and to allow dask to optimize our instructions where possible. The downside is that running subsequent operations on data frames may mean we start everything from scratch, but there are possibilities to cache and/or persist interim results.

Integration with scikit-learn

Dask offers several estimators that we can use directly with scikit-learn. You can find the list here. Alternatively, one can run scikit-learn with joblib specifying Dask as an alternative back end.

Integration with Keras and Tensorflow

Dask integration with Keras and Tensorflow is a little confusing. I had some Keras/Dask code that I wanted to update to Tensorflow 2.x, but there were some breaking changes along the way. The Keras API itself is now bundled with Tensorflow as of Tensorflow 2.2.x. But within Tensorflow there are two packages: keras.wrapper.scikit-learn and tf.keras.wrapper.scikit-learn. This is maybe because Keras was originally designed for use with different back ends (Tensorflow, Theano, Caffe) – and two of these are no longer supported. So it certainly makes sense to move the packages to within Tensorflow to reflect the tight coupling of the pure TF implementation. However, tf.keras.wrapper.scikit-learn is not actively maintained and will be removed in a future release.

The same applies to keras.wrapper.scikit-learn. This is being phased out, and the recommendation is to use the new Scikeras library instead. I’m not sure why one third party package (keras.wrapper.scikit-learn) is being replaced by another one (Scikeras), but there you go.

Anway, Keras runs seamlessly with Dask once we decide which of the above permutations is applicable: I am going with Keras 2.4.3, scikit-learn 0.24.0 and tensorflow 2.2.0 (and python 3.8).

Moving to distributed processing

Dask can process its data over multiple nodes fairly easily. Dask data frames wrap multiple pandas data frames, so it is not a huge step to scale the processing out over several nodes.

Anyway, let’s get started with some examples.

Training a Keras Autoencoder with Dask

In this example I am going to create some dummy data and train an auto-encoder on it.

An autoencoder is a type of model that learns to identify the essential relationships between features. It does this by compressing (or “encoding”) the data into lower dimensionality. It then expands it again to the original dimensionality (“decoding”). In attempting to keep the resultant discrepancy as small as possible, the model forces itself to learn only the primary characteristics and to ignore noise. We can use such models for e.g. recommender systems or anomaly detection. It doesn’t really matter what type of model we choose – our purpose here is simply to show how dask can be used with keras for out-of-memory processing.

We first set-up our Dask client:

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=1, threads_per_worker=1, memory_limit='4GB')
client = Client(cluster)

We can use an existing cluster of servers where dask has been installed. Or – as shown above – define a local cluster on a single machine. Dask will not attempt to work with more RAM than we have defined when we set the memory limit.

Data preparation

Next, we use Dask’s version of make_classification to create classification test data:

from dask_ml.datasets import make_classification as make_classification_dask

X_dask, _ = make_classification_dask(n_samples=10000, n_features=10, n_informative=5, n_redundant=5, random_state=1, chunks=10)

We don’t need the labelled data that make_classification returns. The autoencoder matches each input with the result of the encode/decode transformation acting on this input.

You wouldn’t normally use an LSTM network for autocoding when the data does not contain a sequential element (which can be spatial, chronological or textual), but the purpose here is to show how seamlessly a Dask object – in this case an array – can be used with Keras.

I’m going to feed my autoencoder with batches of data rather than individual tuples. The model then attempts to recreate each batch with the smallest discrepancy. So we will split my dataset of 10000 elements into 200 batches of 50 elements each:

X_dask.shape
n_seq_length = 50

n_samples = int(X_dask.shape[0] // n_seq_length)
n_features = X_dask.shape[1]

X_dask = X_dask[0:n_samples * n_seq_length, :]
X_dask = X_dask.reshape(n_samples, n_seq_length, n_features)
print(f'using {n_seq_length} timesteps, {n_features} features giving {n_samples} samples to be batched...')
print(f'shape {X_dask.shape}')

Each batch thus represents a window that moves through my data.

#--------------------------------------------------
# keras
#--------------------------------------------------
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM

model_lstm = Sequential()

model_lstm.add(LSTM(100, return_sequences=True, input_shape=(n_seq_length, n_features)))
model_lstm.add(Dropout(0.1))
model_lstm.add(LSTM(40, return_sequences=True))
model_lstm.add(Dropout(0.5))
model_lstm.add(Dense(n_features, activation='linear'))

model_lstm.compile(loss='mae', optimizer='adam', metrics=['accuracy'])

print(model_lstm.summary())

Training output

We then fit the model and confirm that the training loss decreases over the epochs:

history = model_lstm.fit(x=X_dask
                         , y=X_dask
                         , epochs=50
                         , verbose=1
                         , batch_size = 128
                         , shuffle=False
                        )
#--------------------------------------------------
# plotting
#--------------------------------------------------
%matplotlib inline
import matplotlib.pyplot as plt

plt.plot(history.history['loss'], 'b', label='Training loss')
plt.plot(history.history['accuracy'], 'r', label='Accuracy')
plt.legend(loc='upper right')
plt.xlabel('Epochs')
plt.ylabel('Loss, [mae]')
plt.show();
Training output

Again, the model details are not important. The purpose here is to show how seamlessly Dask integrates with Keras. We have passed a Dask array as our input and in the background the individual Numpy arrays that make up that Dask array are processed by Keras. We can see this by viewing the Dask Client dashboard that was available once we start our client:

Dask client dashboard

Row-by-row processing with Dask

Our second example of Dask usage is that of row-by-row processing. Pandas offers efficient ways of manipulating DataFrames by operating on whole columns as vectors e.g. to transform a column’s datatype we can just do this:

df[“date_col”] = pd.to_datetime(df[“date_col_as_string”], format='%Y-%m-%d')

Transformations that may involve our own, more-complex logic can still be executed efficiently if we can vectorize the inputs (out of scope for this article, but see here for more details). Alternatively, with Dask we can partition our inputs and then distribute the function calls over all available cores so that they run in parallel (this assumes that the function operates on a single row at a time).

df_dask = dd.from_pandas(df, npartitions=cores)

df_dask['my_new_col'] = df_dask.apply(lambda x: my_func(x.col1, x.col2), axis=1, meta=(''my_new_col ', 'int')).persist()

df_combined = df_dask.compute()

We should note a couple of things here:

  • Dask builds Direct Acyclic Graphs in the background, which it executes lazily by calling e.g. .compute()
  • if Dask functions are called in a loop, the individual .apply() callouts are overwritten unless they are committed to memory by calling .persist()
  • if function calls are *not* independent of one another, then separate them into different loops and computes, as there is no guarantee that lazily-loaded function calls are executed in the same logical order in which they were submitted

Summary

  • Dask is a library that offers good integration with both Pandas and machine learning libraries such as scikit-learn and Keras.
  • As such we can use it for model training that is not constrained by available RAM
  • Dask thus allows a relatively seamless move from single-machine to cluster mode, without having to rewrite our code
0 0 votes
Article Rating
Subscribe
Notify of

0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x