Giter Site home page Giter Site logo

dask-databricks's Issues

High-level plan and scope

This issue outlines the high-level plan and scope of dask-databricks.

Overview

Running Dask clusters on Databricks is possible today but is not well supported. The goal of dask-databricks is to provide folks with useful tools to make launching and using Dask clusters on Databricks easier.

Specifically this tool is intended to launch a Dask cluster alongside a Databricks multi-node Spark cluster. Spark's architecture is similar to Dask in that it has a driver node that coordinates work and many worker nodes that run the work. When using Databricks notebooks the cells of the notebook are executed on the driver node. When using pyspark or other libraries the tasks are submitted to the driver which distributes them onto the workers.

Via init scripts it is possible to run arbitrary commands on each node as it starts up, and node are aware of their role and the network information of the driver node via environment variables. Therefore we can use an init script to start up Dask scheduler and worker processes alongside the Spark driver and worker processes.

From the user's perspective in a notebook environment they can then choose to either use pyspark or dask and utilize the same set of hardware.

Outline

There are two key technical aspects to using Dask on Databricks, 1) starting the cluster and 2) using the cluster. These functions will be provided by two separate parts of dask-databricks.

Cluster startup tool

To simplify launching the Dask cluster components a CLI tool will be provided that simplifies starting the Dask cluster. Users should be able to create a minimal init script with something like this.

#!/bin/bash

pip install dask-databricks
dask databricks run

It would be great if that's all that is needed to launch the Dask cluster.

Cluster manager

Then from the notebook side it should be easy for the user to create a Dask Client and connect to the cluster.

It could be nice to implement a DatabricksCluster object that provides convenience methods to the user. This would be similar to the HelmCluster in Dask Kubernetes where unlike most cluster managers it doesn't create the cluster, but instead finds and connects to the existing cluster and then provides the Cluster API.

from dask_databricks import DatabricksCluster
from dask.distributed import Client

# If `dask databricks run` was successful this will connect to the scheduler, 
# otherwise will raise a useful error to help folks troubleshoot their cluster
cluster = DatabricksCluster() 

# Connect the client
client = Client(cluster)

# Easily view logs from scheduler and workers in the notebook
cluster.get_logs()

example of read/write parquet using dbfs://

Fun project!

I remember nerd sniping @martindurant to work on https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/dbfs.py (https://github.com/fsspec/filesystem_spec/blob/master/fsspec/registry.py#L152) when I was using databricks a few years ago.

May be a good test for parallel read/writes of parquet files to the databricks file system. Curious if it gets speed up compared to s3 for example.

Given this repo is slim it could be added to the README once tested

Dynamically alter behaviour when on single node cluster

I noticed that when on a databricks single node cluster, that after running:

client = dask_databricks.get_client()

Dask calls will just hang - this makes sense since there is only a driver and no worker nodes, so nothing to actually do the work.

It would be really nice if:

  • After a while of waiting, the cluster threw an error because no workers are available (I'm guessing this would belong in dask/distributed proper though?)
  • dask-databricks had some kind of clever behaviour to use multi-process rather than distributed workflows in on a single cluster node

I think the obvious way to implement the second would be to have some behaviour change based on some kind of spark.conf variable (although I haven't seen an obvious candidate).

@jacobtomlinson, any thoughts? Would this be a handy feature or is it a bit of an overcomplication? (I can't figure thinking it through if its actually worth the additional complexity - it'd probably be handy if someone had a notebook that they wanted to run over multiple different cluster types, but the use case for dask-databricks isn't really there?)

Publish to conda-forge

Once published to PyPI in #15 we should also publish to conda-forge.

To do this we should generate a recipe with greyskull and submit it to the conda-forge/stages-recipes repo.

Installing additional packages on the dask workers

I'd like to install additional packages on the workers.

Typically, I'd use the following script to install packages on the dask workers:

from dask.distributed import Client
client = Client()
plugin = PipInstall(["xarray[io]", "dataretrieval", "earthaccess", "geopandas", "h5netcdf"], pip_options=["--upgrade"])
client.register_plugin(plugin)

However, when I use this code, the installed packages do not show up when I use client = dask_databricks.get_client(), and conversely, I can't install the packages using the client from dask_databricks.get_client().

Any advice would be appreciated.

Dask Dashboard URL not working for Azure Databricks

Description:

The dashboard link method returns a hard-coded URL that is specific only for workspace hosted by Databricks. This does not work for Databricks running on other cloud service provider e.g Azure.

Inside databrickscluster.py:

def dashboard_link(self):
    cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId")
    return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8087/status"

For example, a typical workspace URL for Azure Databricks would be https://adb-1486644750497365.5.azuredatabricks.net/, which is different from https://dbc-dp-{org_id}.cloud.databricks.com

Suggested Fix:

To replace hardcoded url with dynamic workspace URL returned by spark.conf.get("spark.databricks.workspaceUrl")

def dashboard_link(self):
    cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId")
    workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
    return f"https://{workspace_url}/driver-proxy/o/{org_id}/{cluster_id}/8087/status"

I've tested on Azure Databricks and this seemed to work.

Check process health

When starting the scheduler and workers we just call them and move on.

https://github.com/jacobtomlinson/dask-databricks/blob/a6ff0d4ff5ca91c29b174386474a69b9454c48d4/dask_databricks/cli.py#L38

https://github.com/jacobtomlinson/dask-databricks/blob/a6ff0d4ff5ca91c29b174386474a69b9454c48d4/dask_databricks/cli.py#L51

It would be a better user experience if we watched their health for a little while. Eventually, we need to exit the init script and leave the Dask processes running, and we don't want to watch them forever otherwise the init script will never exit. But in the overall timeline of the Databricks cluster starting we could afford to spend a few seconds watching the Dask components to make sure they are healthy and don't exit prematurely.

Here are a few ideas for health checks we could implement:

  • Scheduler startup
    • For a few seconds after starting the scheduler check the process hasn't exited
    • Poll the network socket like the workers do and wait for the scheduler to start
    • Watch the stdout for the Scheduler at: ... log line
  • Worker startup
    • Poll the worker socket (requires starting all workers on the same port)
    • Watch the stdout for Start worker at: log line

We should add at least one check for the scheduler and workers.

We could add some configurable timeout and if the components don't start up in that time we would exit with a non-zero return code and some useful logs about what is going on.

#!/bin/bash

# Install Dask Databricks
/databricks/python/bin/pip install dask-databricks

# Start Dask cluster
dask databricks run --timeout 30s

When init scripts exit like that the whole cluster provisioning fails so we want to be cautious about when we do this, but if the scheduler or worker processes fail to start up cleanly this seems like a good time to do this.

image

Add CI

Add GitHub Actions CI to run pytest.

Add support for alternative worker commands and config options

Right now the dask worker command is hard coded.

https://github.com/jacobtomlinson/dask-databricks/blob/a64698074bcbe4117a0a16e32737b4fdeba3d491/dask_databricks/cli.py#L51

We should support configuring this so we can set it to other things like dask cuda worker. We may also want to be able to pass extra flags to dask scheduler and dask worker and we should expose that via the dask databricks run CLI.

#!/bin/bash

# Install Dask Databricks
/databricks/python/bin/pip install dask-databricks

# Start Dask cluster
dask databricks run --worker-command "dask cuda worker" --worker-args "--nthreads 10"

Add docs

We need to write and build some documentation with Sphinx using the dask-sphinx-theme.

Add DatabricksCluster class

Add a cluster manager class to simplify connecting a Client and provide useful utilities.

Currently to connect a client we can do the following.

from dask.distributed import Client
import os

client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')

It would be nice if we could do this the "Dask way" with a cluster manager class like this.

from dask.distributed import Client
from dask_databricks import DatabricksCluster

cluster = DatabricksCluster()
client = Client(cluster)

Then we could have a cluster object that we can use to call useful utilities like cluster.get_logs().

Add a DatabricksRunner

There have been some discussions lately like https://dask.discourse.group/t/dask-on-databricks-clusters/2086/8 about being able to side load a Dask cluster onto a Databricks multi-node cluster. A Runner could be a good model for this.

A Databricks cluster can be given an init script that gets executed on every node of the cluster. We could create a runner that can be launched as an init script as a background process.

It looks like Databricks sets environment variables like DB_IS_DRIVER and DB_DRIVER_IP so the scheduler could start on the driver node and all of the workers can find it because they know the driver IP address.

It probably doesn't make sense to support running client code as that would likely be driven from a notebook.

Unable to use all nodes/threads setup for Databricks Compute

Hi,

Thansk for creating this library.

I am trying to spin up Dask cluster on Azure Databricks.

  1. I've followed the instructions and managed to setup a dask cluster. However, i realised that the compute resources are not maximized. There is only 1 worker instead of 2 (based on the minimum number of workers I configured). Any idea how to fix this ? I have tried to use client.cluster.scale(n=2) but got an error returned.

  2. Second issue is that i realised that URL for the dask dashboard is not accessible and is different if you are using Azure Databricks. The typical URL of an Azure databricks notebook is as followed:

https://adb-1486644750497365.5.azuredatabricks.net/?o=1486644750497365#notebook/1676261431997434/command/1676261431997440

config client_cluster

Thanks and any advice is much appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.