dask-contrib / dask-databricks Goto Github PK
View Code? Open in Web Editor NEWCluster tools for running Dask on Databricks
Home Page: https://pypi.org/project/dask-databricks/
License: BSD 3-Clause "New" or "Revised" License
Cluster tools for running Dask on Databricks
Home Page: https://pypi.org/project/dask-databricks/
License: BSD 3-Clause "New" or "Revised" License
This issue outlines the high-level plan and scope of dask-databricks
.
Running Dask clusters on Databricks is possible today but is not well supported. The goal of dask-databricks
is to provide folks with useful tools to make launching and using Dask clusters on Databricks easier.
Specifically this tool is intended to launch a Dask cluster alongside a Databricks multi-node Spark cluster. Spark's architecture is similar to Dask in that it has a driver node that coordinates work and many worker nodes that run the work. When using Databricks notebooks the cells of the notebook are executed on the driver node. When using pyspark
or other libraries the tasks are submitted to the driver which distributes them onto the workers.
Via init scripts it is possible to run arbitrary commands on each node as it starts up, and node are aware of their role and the network information of the driver node via environment variables. Therefore we can use an init script to start up Dask scheduler and worker processes alongside the Spark driver and worker processes.
From the user's perspective in a notebook environment they can then choose to either use pyspark
or dask
and utilize the same set of hardware.
There are two key technical aspects to using Dask on Databricks, 1) starting the cluster and 2) using the cluster. These functions will be provided by two separate parts of dask-databricks
.
To simplify launching the Dask cluster components a CLI tool will be provided that simplifies starting the Dask cluster. Users should be able to create a minimal init script with something like this.
#!/bin/bash
pip install dask-databricks
dask databricks run
It would be great if that's all that is needed to launch the Dask cluster.
Then from the notebook side it should be easy for the user to create a Dask Client
and connect to the cluster.
It could be nice to implement a DatabricksCluster
object that provides convenience methods to the user. This would be similar to the HelmCluster
in Dask Kubernetes where unlike most cluster managers it doesn't create the cluster, but instead finds and connects to the existing cluster and then provides the Cluster
API.
from dask_databricks import DatabricksCluster
from dask.distributed import Client
# If `dask databricks run` was successful this will connect to the scheduler,
# otherwise will raise a useful error to help folks troubleshoot their cluster
cluster = DatabricksCluster()
# Connect the client
client = Client(cluster)
# Easily view logs from scheduler and workers in the notebook
cluster.get_logs()
Fun project!
I remember nerd sniping @martindurant to work on https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/dbfs.py (https://github.com/fsspec/filesystem_spec/blob/master/fsspec/registry.py#L152) when I was using databricks a few years ago.
May be a good test for parallel read/writes of parquet files to the databricks file system. Curious if it gets speed up compared to s3 for example.
Given this repo is slim it could be added to the README once tested
I noticed that when on a databricks single node cluster, that after running:
client = dask_databricks.get_client()
Dask calls will just hang - this makes sense since there is only a driver and no worker nodes, so nothing to actually do the work.
It would be really nice if:
dask/distributed
proper though?)I think the obvious way to implement the second would be to have some behaviour change based on some kind of spark.conf
variable (although I haven't seen an obvious candidate).
@jacobtomlinson, any thoughts? Would this be a handy feature or is it a bit of an overcomplication? (I can't figure thinking it through if its actually worth the additional complexity - it'd probably be handy if someone had a notebook that they wanted to run over multiple different cluster types, but the use case for dask-databricks
isn't really there?)
black
and do some linting (I like ruff
these days).I most recently set this up in kr8s
so might be a good place to copy from.
https://github.com/kr8s-org/kr8s/blob/e914160dcb6654a21c73fee6a63d73a1550b4c34/.pre-commit-config.yaml
https://github.com/kr8s-org/kr8s/blob/e914160dcb6654a21c73fee6a63d73a1550b4c34/pyproject.toml#L76-L115
Once published to PyPI in #15 we should also publish to conda-forge.
To do this we should generate a recipe with greyskull
and submit it to the conda-forge/stages-recipes
repo.
It's currently unclear how to expose the Dask Dashboard from the Databricks driver node.
There seems to be support for accessing TensorBoard so maybe this mechanism can be reused for Dask?
I'd like to install additional packages on the workers.
Typically, I'd use the following script to install packages on the dask workers:
from dask.distributed import Client
client = Client()
plugin = PipInstall(["xarray[io]", "dataretrieval", "earthaccess", "geopandas", "h5netcdf"], pip_options=["--upgrade"])
client.register_plugin(plugin)
However, when I use this code, the installed packages do not show up when I use client = dask_databricks.get_client()
, and conversely, I can't install the packages using the client
from dask_databricks.get_client()
.
Any advice would be appreciated.
Description:
The dashboard link method returns a hard-coded URL that is specific only for workspace hosted by Databricks. This does not work for Databricks running on other cloud service provider e.g Azure.
Inside databrickscluster.py
:
def dashboard_link(self):
cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId")
return f"https://dbc-dp-{org_id}.cloud.databricks.com/driver-proxy/o/{org_id}/{cluster_id}/8087/status"
For example, a typical workspace URL for Azure Databricks would be https://adb-1486644750497365.5.azuredatabricks.net/
, which is different from https://dbc-dp-{org_id}.cloud.databricks.com
Suggested Fix:
To replace hardcoded url with dynamic workspace URL returned by spark.conf.get("spark.databricks.workspaceUrl")
def dashboard_link(self):
cluster_id = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
org_id = spark.conf.get("spark.databricks.clusterUsageTags.orgId")
workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
return f"https://{workspace_url}/driver-proxy/o/{org_id}/{cluster_id}/8087/status"
I've tested on Azure Databricks and this seemed to work.
When starting the scheduler and workers we just call them and move on.
It would be a better user experience if we watched their health for a little while. Eventually, we need to exit the init script and leave the Dask processes running, and we don't want to watch them forever otherwise the init script will never exit. But in the overall timeline of the Databricks cluster starting we could afford to spend a few seconds watching the Dask components to make sure they are healthy and don't exit prematurely.
Here are a few ideas for health checks we could implement:
Scheduler at: ...
log lineStart worker at:
log lineWe should add at least one check for the scheduler and workers.
We could add some configurable timeout and if the components don't start up in that time we would exit with a non-zero return code and some useful logs about what is going on.
#!/bin/bash
# Install Dask Databricks
/databricks/python/bin/pip install dask-databricks
# Start Dask cluster
dask databricks run --timeout 30s
When init scripts exit like that the whole cluster provisioning fails so we want to be cautious about when we do this, but if the scheduler or worker processes fail to start up cleanly this seems like a good time to do this.
Add GitHub Actions CI to run pytest
.
Set up builds to push to PyPI on tag. kr8s
has a nice workflow for this we can copy.
Right now the dask worker
command is hard coded.
We should support configuring this so we can set it to other things like dask cuda worker
. We may also want to be able to pass extra flags to dask scheduler
and dask worker
and we should expose that via the dask databricks run
CLI.
#!/bin/bash
# Install Dask Databricks
/databricks/python/bin/pip install dask-databricks
# Start Dask cluster
dask databricks run --worker-command "dask cuda worker" --worker-args "--nthreads 10"
We need to write and build some documentation with Sphinx using the dask-sphinx-theme
.
Right now we have a hard coded version.
It would be good to switch over to hatch-vcs
to pick this up automatically at build time from the git tags.
Add a cluster manager class to simplify connecting a Client
and provide useful utilities.
Currently to connect a client we can do the following.
from dask.distributed import Client
import os
client = Client(f'{os.environ["SPARK_LOCAL_IP"]}:8786')
It would be nice if we could do this the "Dask way" with a cluster manager class like this.
from dask.distributed import Client
from dask_databricks import DatabricksCluster
cluster = DatabricksCluster()
client = Client(cluster)
Then we could have a cluster
object that we can use to call useful utilities like cluster.get_logs()
.
There have been some discussions lately like https://dask.discourse.group/t/dask-on-databricks-clusters/2086/8 about being able to side load a Dask cluster onto a Databricks multi-node cluster. A Runner could be a good model for this.
A Databricks cluster can be given an init script that gets executed on every node of the cluster. We could create a runner that can be launched as an init script as a background process.
It looks like Databricks sets environment variables like DB_IS_DRIVER
and DB_DRIVER_IP
so the scheduler could start on the driver node and all of the workers can find it because they know the driver IP address.
It probably doesn't make sense to support running client code as that would likely be driven from a notebook.
Hi,
Thansk for creating this library.
I am trying to spin up Dask cluster on Azure Databricks.
I've followed the instructions and managed to setup a dask cluster. However, i realised that the compute resources are not maximized. There is only 1 worker instead of 2 (based on the minimum number of workers I configured). Any idea how to fix this ? I have tried to use client.cluster.scale(n=2)
but got an error returned.
Second issue is that i realised that URL for the dask dashboard is not accessible and is different if you are using Azure Databricks. The typical URL of an Azure databricks notebook is as followed:
Thanks and any advice is much appreciated.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.