Skip to content

Setting up, configuring and deploying `dask.distributed`

posted in Dask

While there are several setup options listed in the dask.distributed documentation, this post aims to cover setup and configuration for applications and monorepos. In this context multi-tenancy is not required and the business logic, tasks and configuration will live in the same repository.

The aforementioned approach simplifies deployment and allows for producing a single Docker image that can be used for the Worker, as well as the Scheduler.

An example project would be:

  • versioned with a VCS such as Git
  • tested and built with continuous integration service (CircleCI, Jenkins, Github Actions)
  • running on Kubernetes (with GitOps, i.e. Flux) or AWS Fargate

Table of contents

Configuration

Using the YAML configuration stored in the repository allows for versioning the configuration changes and by proxy releasing new versions (via CI & Kubernetes + Flux) when an option is changed.

The options listed in the documentation however do not support such use-case, so the consumers need to write a wrapper function that loads the YAML file and overrides the defaults. In addition, it's sometimes preferable to have the option to override certain fields by environment variables (i.e. different options for production and staging clusters).

It's also important to note that the configuration loading must happen before the Worker or the Scheduler is started (it's not sufficient to use the preloading mechanism provided by Dask). An example is provided below.

Show example
def load_configuration():
    import os
    import yaml

    from dask.config import collect_env, config, update

    # Load config overrides from file
    config_path = os.path.join(os.path.dirname(__file__), 'dask_configuration.yaml')
    with open(config_path) as f:
        overrides = yaml.safe_load(f)
        update(config, overrides)

    # Override from environment variables
    collect_env()
Running the commands

In order to use the configuration this way, don't forget to wrap dask-scheduler and dask-worker commands.

Wrapper for dask-scheduler
"""
Wrapper around `dask-scheduler` executable, 
in case we need to change defaults and such
"""
from my_project.dask_config import load_configuration

load_configuration()


def run_dask_scheduler():
    from distributed.cli.dask_scheduler import go

    go()


if __name__ == '__main__':
    run_dask_scheduler()
Wrapper for dask-worker
"""
Wrapper around `dask-worker` executable, 
in case we need to change defaults and such
"""
from my_project.dask_config import load_configuration

load_configuration()


def run_dask_worker():
    from distributed.cli.dask_worker import go

    go()


if __name__ == '__main__':
    run_dask_worker()

Tips & tricks

While Dask provides sensible defaults out of the box, it's recommended to extend and modify the configuration based on the application's performance profile. A few categories are discussed below along with an example that serves as a good starting point for any project.

  • Communication: oftentimes intermittent network issues occur, in such cases try increasing the timeout intervals and the number of retries under distributed.comm key.
  • Add or remove HTTP routes: extending the Web API and creating custom HTTP routes is covered in it's own section.
  • Lock lease timeouts: locks and especially semaphores are useful to control resource usage and concurrency, however the documentation warns that the implementation is still experimental and lock lease timeouts need to be closely monitored and adjusted in order to prevent overbooking.
  • Worker memory thresholds: this is mostly subjective, although the defaults are a bit conservative, try to experiment with these and see what works the best for your performance profile by monitoring the number of restarts and warnings in the logs.
  • Logs: decreasing the number of entries is useful in order to lower the memory usage of the workers if needed.
Show an example dask_configuration.yaml
distributed:
  client:
    heartbeat: "10s"
    scheduler-info-interval: "5s"
  comm:
    timeouts:
      connect: "30s"
      tcp: "45s"
    retry:
      count: 3
      delay:
        min: "2s"
        max: "30s"
    socket-backlog: 4096
  deploy:
    lost-worker-timeout: "35s"
  scheduler:
    events-cleanup-delay: "1800s"
    locks:
      lease-validation-interval: "3s"
      lease-timeout: "1260s"
    http:
      routes: [
        'distributed.http.scheduler.prometheus',
        'distributed.http.scheduler.info',
        'distributed.http.scheduler.json',
        'distributed.http.health',
        'distributed.http.statics',
        'my_project.dask_routes.scheduler_routes',
      ]
    preload: [
      'my_project.dask_preload',
    ]
  worker:
    connections:
      outgoing: 50
      incoming: 25
    memory:
      target: 0.75
      spill: 0.85
      pause: 0.90
      terminate: 0.95
    http:
      routes: [
        'distributed.http.worker.prometheus',
        'distributed.http.health',
        'distributed.http.statics',
        'my_project.dask_routes.worker_routes',
      ]
    preload: [
      'my_project.dask_preload',
    ]
  admin:
    log-length: 3000
    log-format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

Error monitoring

Dask executes tasks on remote workers in the distributed settings. Therefore, when an exception occurs during the computation, the worker serializes it and sends it back to the Client which has requested the result. In order to properly monitor the errors that might arise, it's desirable to offload this burden from the clients to the individual workers.

Worker plugins provide a convenient interface to observe the lifecycle of each task being executed by the worker. After extending WorkerPlugin, the transition method can be used to listen for the error state and extract the exception.

Show sample implementation
import logging
from types import TracebackType
from typing import Optional, cast

from dask.utils import funcname
from distributed import Worker
from distributed.diagnostics.plugin import WorkerPlugin


class WorkerMonitor(WorkerPlugin):
    def __init__(self):
        super().__init__()
        self.logger = logging.getLogger('dask.worker.worker_monitor')

    def setup(self, worker: Worker):
        self.worker = worker  # noqa

    def transition(self, key: str, start: str, finish: str, *args, **kwargs):
        if finish == 'error' and key in self.worker.exceptions and key in self.worker.tasks:
            serialized_exc = self.worker.exceptions[key]

            if hasattr(serialized_exc, 'data') and isinstance(serialized_exc.data, Exception):
                tb = self.worker.tracebacks.get(key)
                exc = (
                    serialized_exc.data.with_traceback(tb.data)
                    if (tb and hasattr(tb, 'data') and isinstance(tb.data, TracebackType))
                    else serialized_exc.data
                )
                exc = cast(Exception, exc)

                function, fn_args, fn_kwargs = self.worker.tasks[key]
                function_name = str(funcname(function))

                extra_data = {
                    "task_id": key,
                    "task_name": function_name,
                    "args": fn_args,
                    "kwargs": fn_kwargs,
                    'dask_worker_name': self.worker.name,
                }

                # TODO: your exception reporting and tracing
                my_exception_reporter.report(exc, extra_data)

The sample implementation also extracts the task's name and couple of useful tags that can be sent to the monitoring service such as Datadog or Prometheus. Then the exception can be reported to the error monitoring service of your choosing (Sentry, Bugsnag, etc).

Preloading

To ensure that the plugins are initiated every time a new worker is spun up or restarted, the preloading mechanism can be leveraged. Under the hood Dask uses click library to parse command line arguments and options making it easy for consumers to extend it's functionality.

Using the error monitoring example from above, we can create a new Python module and define a single method called dask_setup which takes either Scheduler or Worker instance as it's only argument.

Show the example implementation of the module
from typing import Union

import click
from distributed import Client, Scheduler, Worker


@click.command()
def dask_setup(service: Union[Scheduler, Worker]):
    """
    References:
    * Lifecycle and CLI integration: https://docs.dask.org/en/latest/setup/custom-startup.html
    * Developing plugins: https://distributed.dask.org/en/latest/plugins.html
    """

    # Scheduler specific
    if isinstance(service, Scheduler):
        # Graceful Scheduler shutdown
        # (remove once https://github.com/dask/distributed/pull/3332 gets merged & released)
        async def on_signal(_signum):
            await service.close()

        # use the reference implementation from `distributed`
        install_signal_handlers(service.loop, cleanup=on_signal)

    # Worker specific
    elif isinstance(service, Worker):
        # Plugin for monitoring & metrics
        worker_monitor_plugin = WorkerMonitor()
        with Client(
            address=service.scheduler.address,
            timeout=30,
            name='worker-plugin-setup-client',
        ) as client:
            client.register_worker_plugin(worker_monitor_plugin, name='worker-monitor')
Don't forget to update dask_configuration.yaml

Add the module path to the distributed.scheduler.preload and distributed.worker.preload arrays to run the script automatically.

In order for Dask to load the aforementioned module, it needs to be registered in the configuration under the distributed.scheduler.preload and distributed.worker.preload keys. Alternatively, the --preload command line argument can be used as shown in the examples below.

Launching the scheduler via CLI
$ python -m my_project.cli.dask_scheduler \
--port 18786 --host 0.0.0.0 \
--dashboard \
--dashboard-address 0.0.0.0:18787 \
--dashboard-prefix my_project-dask \
--preload my_project.dask_preload

# notice the last arguments
Launching the worker via CLI
$ python -m my_project.cli.dask_worker \
my_project-dask-scheduler-app:18786 \
--nprocs 1 --nthreads 10 --memory-limit 1300MB \
--worker-port 16000:18700 \
--nanny-port 14000:15999 \
--dashboard-address 18789 \
--death-timeout 120 \
--preload my_project.dask_preload

Extending the Web API

Custom functionality can also be introduced into workers' and scheduler's API by extending RequestHandler class. Out of the box Dask provides several endpoints, for example an endpoint to check the component's health, usage statistics, dashboards and more. The full list can be found under distributed.scheduler.http.routes and distributed.worker.http.routes configuration keys.

In case you are running Dask within Kubernetes, the following endpoints might be useful.

Proxying workers' dashboards

Endpoint that allows proxying Workers' HTTP server in the Scheduler's UI. It might be useful to remotely shutdown a specific worker or access worker's metrics, logs and dashboards for quick debugging.

Show worker_routes.py
Don't forget to update dask_configuration.yaml

Add the module path to the distributed.worker.http.routes array to load the routes automatically.

from functools import partial
from typing import Any, Dict, List, Optional

from distributed import Scheduler
from distributed.http.proxy import GlobalProxyHandler
from distributed.scheduler import WorkerState
from tornado.web import RequestHandler


class WorkerOnlyProxyHandler(GlobalProxyHandler):
    """
    Endpoint that allows proxying Workers from the Scheduler.
    Useful to remotely shutdown a specific worker or access worker's metrics, logs and dashboards.
    """

    def initialize(self, dask_server: Optional[Scheduler] = None, extra: Optional[Dict[Any, Any]] = None):
        super().initialize(dask_server=dask_server, extra=extra)

        worker_hosts: List[str] = (
            [
                worker_state.host for worker_state in dask_server.workers.values()  # type: WorkerState
            ]
            if dask_server
            else []
        )

        # override the whitelist function to proxy only to workers
        self.host_whitelist = partial(self.whitelist_workers, worker_hosts=worker_hosts)

    @staticmethod
    def whitelist_workers(_handler: RequestHandler, host: str, *, worker_hosts: List[str]):
        return any(host in worker_host for worker_host in worker_hosts)


# Export the HTTP routes
#
# https://docs.dask.org/en/latest/configuration-reference.html#distributed.scheduler.http.routes
# https://distributed.dask.org/en/latest/http_services.html
routes = [(r"proxy/(\d+)/(.*?)/(.*)", WorkerOnlyProxyHandler, {})]

Gracefully shutting down the scheduler

Endpoint to trigger graceful shutdown of the scheduler via Kubernetes's Lifecycle Hooks. It's almost mandatory as there's an outstanding issue with stopping the scheduler gracefully by normal means (see dask/distributed#3332).

Show scheduler_routes.py
Don't forget to update dask_configuration.yaml

Add the module path to the distributed.scheduler.http.routes array to load the routes automatically.

import logging

from distributed import Worker
from distributed.http.utils import RequestHandler

logger = logging.getLogger("distributed.request_handler")


class TerminationHandler(RequestHandler):
    """
    Custom HTTP handler to trigger a graceful shutdown via Kubernetes's Lifecycle Hooks
    reference: https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/
    """

    def get(self):
        self.server: Worker  # add typing information (Worker extends distributed.core.Server)

        logger.info(f"Lifecycle hook triggered. Initiating graceful shutdown of {self.server.name}.")

        self.server.io_loop.add_callback(self.server.close_gracefully)

        self.write({'message': 'Shutting down...', 'extra': {'worker_name': self.server.name}})
        self.set_header("Content-Type", "application/json")


# Export the HTTP routes
#
# https://docs.dask.org/en/latest/configuration-reference.html#distributed.worker.http.routes
# https://distributed.dask.org/en/latest/http_services.html
routes = [('graceful-shutdown', TerminationHandler, {})]

Labels and resources

When using the rolling update release strategy, some workers will still be running an "older" release for a period of time until they get incrementally updated. In such cases, it might not be desirable to submit tasks to the older version of the application.

Worker resources provide a convenient abstraction that allows for labeling different releases or different workers based on allocated resources (GPU support, large amout of RAM). This can be achieved manually or by baking the label (version) as an environment variable in the Docker image via build arguments, CI pipeline and Git tags.


Issues to watch