After getting the cluster set up in the previous post, it was time to finally play with Dask on the cluster. Thankfully, there are dask-kubernetes and dask-docker projects that provide the framework to do this. Since I’m still new to Dask, I decided to start off by using Dask from a local notebook (in retrospect maybe not the best choice).

Getting Dask on ARM in Docker

The dask-docker project gives us a good starting point for building a container for Dask, but the project’s containers are only built for amd64. I started off by trying to rebuild the containers without any modifications, but it turned out there were a few issues that I needed to address. The first is that the regular conda docker image is also only built for amd64. Secondly, some of the packages that the Dask container uses are also not yet cross-built. While these problems will likely go away over the coming year, for the time being, I solved these issues by making a multi-platform condaforge docker container, asking folks to rebuild packages, and, when the packages did not get rebuilt, installing from source.

To do this I created a new Dockerfile for replacing miniconda base with miniforge:

FROM debian:buster-slim

ENV PATH /opt/conda/bin:$PATH

RUN apt-get update --fix-missing && \
    apt-get install -y wget bzip2 ca-certificates libglib2.0-0 libxext6 libsm6 libxrender1 git mercurial subversion && \
    apt-get clean

CMD [ "/bin/bash" ]

Most of the logic lives in this setup script:


set -ex
export arch=$(uname -m)
if [ "$arch" == "aarm64" ]; then
wget --quiet${arch}.sh -O ~/
chmod a+x ~/
~/ -b -p /opt/conda
/opt/conda/bin/conda clean -tipsy
ln -s /opt/conda/etc/profile.d/ /etc/profile.d/
echo ". /opt/conda/etc/profile.d/" >> ~/.bashrc
echo "conda activate base" >> ~/.bashrc
source ~/.bashrc
find /opt/conda/ -follow -type f -name '*.a' -delete
find /opt/conda/ -follow -type f -name '*' -delete
/opt/conda/bin/conda clean -afy
/opt/conda/bin/conda install --yes nomkl cytoolz cmake tini
/opt/conda/bin/conda init bash
/opt/conda/bin/conda install --yes mamba

I chose to install mamba, a fast C++ reimplementation of conda, and use this to install the rest of the packages. I did this since debugging the package conflicts with the regular conda program was resulting in confusing error messages, and mamba can have clearer error messages. I created a new version of the "base" Dockerfile, from dask-docker, which installed the packages with mamba and pip when not available from conda.

FROM  holdenk/miniforge:v0.3

SHELL ["/bin/bash", "-c"]

ENV PATH /opt/conda/bin:$PATH

RUN apt-get update --force-yes  -y --fix-missing && \
    apt-get install --force-yes  -y wget bzip2 ca-certificates libglib2.0-0 libxext6 libsm6 libxrender1 git mercurial subversion && \
    apt-get install --force-yes -y build-essential cmake libcurl4 libcurl4-openssl-dev libblosc-dev libblosc1 python3-blosc python3-dev && \
    apt-get upgrade --force-yes -y && \
    apt-get clean

RUN mamba install --yes python==3.8.6 \
    && mamba install --yes \
    cytoolz \
    dask==2.30.0 \
    dask-core==2.30.0 \
    lz4 \
    numpy==1.19.2 \
    pandas \
    tini \
    scikit-build \
    python-blosc=1.9.2 \
    pyzmq \
    && mamba install --yes s3fs gcsfs dropboxdrivefs requests dropbox paramiko adlfs pygit2 pyarrow\
    && mamba install --yes bokeh \
    && (mamba install --yes aiohttp==3.7.1 || pip install aiohttp==3.7.1 ) \
    && (mamba install --yes jupyter-server-proxy || pip install jupyter-server-proxy) \
    && (mamba install --yes llvmlite numba ) \
    && (mamba install --yes fastparquet || pip install fastparquet) \
    && find /opt/conda/ -type f,l -name '*.a' -delete \
    && find /opt/conda/ -type f,l -name '*.pyc' -delete \
    && find /opt/conda/ -type f,l -name '*' -delete \
    && find /opt/conda/lib/python*/site-packages/bokeh/server/static -type f,l -name '*.js' -not -name '*.min.js' -delete \
    && rm -rf /opt/conda/pkgs

# pyzmq is installed explicitly so we don't have to build it from src since jupyter-server-proxy needs it, but jupyter-server-proxy won't install from conda directly

COPY /usr/bin/

RUN mkdir /opt/app

ENTRYPOINT ["tini", "-g", "--", "/usr/bin/"]

One interesting thing I noticed while exploring this is the link::[ script] that is used as the entry point for the container. This script checks a few different environment variables that, when present, are used to install additional packages (Python or system) at container startup. While normally putting all of the packages into a container is best (since installations can be flaky and slow), this does allow for faster experimentation. At first glance, it seems like this still requires a Dask cluster restart to add a new package, but I’m going to do more exploring here.

Getting Dask on Kube

With the containers built, the next step was trying to get them running on Kubernetes. I first tried the helm installation, but I wasn’t super sure how to configure it to use my new custom containers and the documentation also contained warnings indicating that Dask with helm did not play well with dynamic scaling. Since I’m really interested in exploring how the different systems support dynamic scaling, I decided to install the dask-kubernetes project. With dask-kubernetes, I can create a cluster by running:

cluster = KubeCluster.from_yaml('worker-spec.yaml')

As I was setting this up, I realized it was creating resources in the default namespace, which made keeping track of everything difficult. So I created a namespace, service account, and role binding so that I could better keep track of (and clean up) everything:

kubectl create namespace dask
kubectl create serviceaccount dask --namespace dask
kubectl apply -f setup.yaml
kubectl create rolebinding dask-sa-binding --namespace dask --role=daskKubernetes --user=dask:dask

To use this, I rewrote added another parameter to cluster creation and updated the yaml:

cluster = KubeCluster.from_yaml('worker-spec.yaml', namespace='dask')

The from_yaml is important, as it lets me specify specific containers and resource requests (which will be useful when working with GPUs). I modified the standard worker-spec to use the namespace and service account I created.

# worker-spec.yml

kind: Pod
  namespace: dask
    foo: bar5
  restartPolicy: Never
# Added by holden
  serviceAccountName: dask
  automountServiceAccountToken: true
# End added by Holden
# Configure for dual arch
  - image: holdenk/dask-base:v0.9.1
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
    name: dask
    #  - name: EXTRA_PIP_PACKAGES
    #    value: git+
        cpu: "2"
        memory: 8G
        cpu: "2"
        memory: 8G

While this would work if I was inside the Kubernetes cluster I wanted to start with an experimental notebook outside the cluster. This required some changes, and in retrospect is not where I should have started.

Dask in Kube with Notebook access

There are two primary considerations when setting up Dask for notebook access on Kube. The first is where you want your notebook to run, inside the Kubernetes cluster or outside (e.g. on your machine). The second consideration is if you want the Dask scheduler to run alongside your notebook, or in a separate container inside of Kube.

The first configuration I tried was having a notebook on my local machine. At first, I could not get it working because the scheduler was running on my local machine and could not talk to the worker pods it spun up. That’s why, unless you’re using host networking, I recommend having the scheduler run inside the cluster. Doing this involves adding a "deploy_mode" keyword to your KubeCluster invocation and asking Dask to create a service for your notebook to talk to the scheduler.

# In[2]:

# Specify a remote deployment using a load blanacer, necessary for communication with notebook from cluster
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer"})

# In[4]:

cluster = KubeCluster.from_yaml('worker-spec.yaml', namespace='dask', deploy_mode='remote')

# In[ ]:

Running your notebook on a local machine may make getting started faster, but it comes with some downsides. It’s important that you keep your client’s python environment in sync with the worker/base containers. For me setting up my conda env, I ended up having to run:

sudo /opt/conda/bin/conda install -c conda-forge --all --yes mamba
mamba install --yes python==3.8.6 cytoolz dask==2.30.0 dask-core==2.30.0 lz4 numpy==1.19.2 pandas tini \
      scikit-build python-blosc=1.9.2 pyzmq s3fs gcsfs dropboxdrivefs requests dropbox paramiko adlfs \
      pygit2 pyarrow bokeh aiottp==3.7.1 llvmlite numba fastparquet

Another big issue you’ll likely run into is that transient network errors between your notebook and the cluster can result in non-recoverable errors. This has happened to me even with networking all inside my house, so I can imagine that it would be even more common with a VPN or a cloud provider network involved.

The final challenge that I ran into was with I/O. Some code will run in the workers and some will run on the client, and if your workers and client have a different network view or if there are resources that are only available inside the cluster (for me MinIO), the error messages can be confusing [1].

Note: you don’t have to use Dask with Kubernetes, or even a cluster. If you don’t have a cluster, or have a problem where a cluster might not be the best solution, Dask also supports other execution environments like multithreading and GPU acceleration. I’m personally excited to see how the GPU acceleration can be used together with Kubernetes.

The different APIs

Dask exposes a few different APIs for distributed programming at different levels of abstraction. Dask’s "core" building block is the delayed API, on top of which collections and DataFrame support is built. The delayed API is notably a lower level API than Spark’s low level public APIs — and I’m super interested to see what kind of things it enables us to do.

Dask has three different types of distributed collection APIs: Bag, DataFrame, and Array. These distributed collections map relatively nicely to common Python concepts, and the DataFrame API is especially familiar.

Almost [2] separate from the delayed and collections APIs, Dask also has an (experimental) Actor API. I’m curious to see how this API continues to be developed and used. I’d really like to see if I can use it as a parameter server.

To verify my cluster was properly set up I did a quick run through the tutorials for the different APIs.

Next Steps

Now that I’ve got Dask on Kube running on my cluster I want to do some cleanup and then explore more about how Dask handles dataframes, partitioning/distributing data/tasks, auto scaling, and GPU acceleration. If you’ve got any suggestions for things you’d like me to try out, do please get in touch :)

1. I worked around this by setting up port-forwarding so that the network environment was the same between my local machine and the cluster. You could also expose the internal-only resources through a service and have internal & external access through the service, but I just wanted a quick stop-gap. This challenge convinced me I should re-run with my notebook inside the cluster.
2. You can use the actor API within the other APIs, but it is not part of the same building blocks.