Giter Site home page Giter Site logo

larribas / dagger Goto Github PK

View Code? Open in Web Editor NEW
13.0 3.0 5.0 10.23 MB

Define sophisticated data pipelines with Python and run them on different distributed systems (such as Argo Workflows).

License: Apache License 2.0

Dockerfile 0.05% Makefile 0.58% Python 99.37%
distributed-systems data-pipelines workflows pipelines-as-code data-science data-engineering argo-workflows

dagger's Introduction

Dagger

Define sophisticated data pipelines and run them on different distributed systems (such as Argo Workflows).

Python Versions Supported Latest PyPI version Test Coverage (Codecov) Continuous Integration


Features

  • Define tasks and DAGs, and compose them together seamlessly.
  • Create dynamic for loops and map-reduce operations.
  • Run your DAGs locally or using a distributed workflow orchestrator (such as Argo Workflows).
  • Take advantage of advanced runtime features (e.g. Retry strategies, Kubernetes scheduling directives, etc.)
  • ... All with a simple Pythonic DSL that feels just like coding regular Python functions.

Other nice features of Dagger are: Zero dependencies, 100% test coverage, great documentation and plenty of examples to get you started.

Installation

Dagger is published to the Python Package Index (PyPI) under the name py-dagger. To install it, you can simply run:

pip install py-dagger

Looking for Tutorials and Examples?

Check our Documentation Portal!

Architecture Overview

Dagger is built around 3 components:

  • A set of core data structures that represent the intended behavior of a DAG.
  • A domain-specific language (DSL) that uses metaprogramming to capture how a DAG should behave, and represents it using the core data structures.
  • Multiple runtimes that inspect the core data structures to run the corresponding DAG, or prepare the DAG to run in a specific pipeline executor.

components

How to contribute

Do you have some feedback about the library? Have you implemented a Serializer or a Runtime that may be useful for the community? Do you think a tutorial or example could be improved?

Every contribution to Dagger is greatly appreciated.

Please read our Contribution Guidelines for more details.

Local development

We use Poetry to manage the dependencies of this library. In the codebase, you will find a Makefile with some useful commands to run and test your contributions. Namely:

  • make install - Install the project's dependencies
  • make test - Run tests and report test coverage. It will fail if coverage is too low.
  • make ci - Run all the quality checks we run for each commit/PR. This includes type hint checking, linting, formatting and documentation.
  • make build - Build the project.
  • make docker-build - Package the project in a Docker image
  • make docs-build - Build the documentation portal.
  • make docs-serve - Serve the documentation portal.
  • make k3d-set-up - Create a k3d cluster and image registry for the project.
  • make k3d-docker-push - Build and push the project's Docker image to the local k3d registry.
  • make k3d-install-argo - Install Argo on k3d, for local testing of Argo Workflows.
  • make k3d-tear-down - Destroy the k3d cluster and registry.

dagger's People

Contributors

larribas avatar pablobd avatar raztud avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

dagger's Issues

Mismatched output names when using a sub-key of a nested DAG in the DSL

What happened:

When building a DAG such as this one:

@dsl.task()
def generate_number() -> int:
    return 1

@dsl.DAG()
def inner_dag():
    return {
        "a": generate_number(),
        "b": generate_number(),
    }

@dsl.DAG()
def outer_dag():
    return inner_dag()["a"]

It returned the following error:

ValueError: Output 'return_value' depends on the output 'key_a' of another node named 'inner-dag'. However, node 'inner-dag' does not declare any o
utput with such a name. These are the outputs defined by the node: ['a', 'b']

What you expected to happen:

We expected the outer DAG to reference an output named exactly the same as the outputs exposed by the inner DAG.

How to reproduce it (as minimally and precisely as possible):

Run dsl.build(outer_dag) on the example above.

Environment:

  • py-dagger version: 0.4.1
  • Python version: 3.9

Support default values for parameters

What are you trying to do:

When defining DAGs through the DSL, it may be confusing not to be able to set default parameters for tasks and DAGs.

Take this example:

from dagger import dsl

@dsl.task()
def f(a, b=2):
  print(a, b)

@dsl.DAG()
def d(x, y=1):
  f(x)
  f(y, 3)

from dagger.runtime.local import invoke
invoke(dsl.build(d), params={"x": 2})

At the moment, this would result in multiple errors:

  • when building d, we would get an error saying our first call to fย  is missing a parameter b.
  • when invoking it, we would get an error saying we are missing parameter y.

At the same time, the defaults for b and y would be completely ignored by the library, which doesn't deliver on the promise of providing an intuitive and Pythonic experience.

This is a request to accept default values on all parameters (as part of the Input protocol) and use them whenever a param is not specified.

Suggested implementation (optional):

The suggested implementation is to:

  • Accept a default value in ever input, as part of the Input protocol.
  • When validating parameters, take the defaults of the task/DAG's inputs into account.
  • Capture default values on the DSL.

Use buffered I/O for serializers, so that we can serialize large datasets keeping a low memory footprint

What are you trying to do:

We are adopting Dagger on a project that works with Dask DataFrames to manipulate large datasets.

At the moment, we cannot pass one of these datasets from one task to another because, even though the datasets live in disk, in order for Dagger to serialize them, the full bytes returned by the serializer needs to be allocated in memory.

This is a request for serializers to work with streaming I/O types instead of bytes, so that we can implement custom serializers that make the aforementioned use case feasible.

This would be aligned with Dagger's design principles of being performant and having a low memory footprint.

Background for your use case:

We are adopting Dagger at Glovo, and one of our projects needs to deal with large datasets that would not fit into memory at a reasonable cost.

Suggested implementation (optional):

My suggestion is to change the API for the serializer to go

serialize(Any) -> io.BufferedReader
deserialize(io.BufferedReader) -> Any

Filesystem permission errors on Windows

What happened:

When running the current test suite on Windows via GitHub Actions, we got multiple failures related to filesystem permissions.

https://github.com/larribas/dagger/pull/16/checks?check_run_id=3512172878

What you expected to happen:

We expected tests to run successfully on any operating system.

How to reproduce it (as minimally and precisely as possible):

On .github/workflows/tests.yaml, add the following OS to the matrix: windows-latest.

Environment:

  • Failing commit: f4fa308
  • Dagger version: 0.1.0
  • Python version: 3.9.0
  • Operating system and version: windows-latest on GitHub Actions

[Argo runtime] Storing the output artifacts doesn't work on older versions of Argo v3

What happened:

While trying to run Argo workflows generated by the Argo runtime in our cluster, we encountered the following error in the "wait" container:

Error (exit code 1): failed to chroot to main filesystem: operation not permitted

After some troubleshooting, we found the root cause to be the final slash in the volumeMount.0.mountPath.

We believe this error may be related to this bug in Argo: argoproj/argo-workflows#5520

To increase compatibility with older versions, we would like to remove the final slash.

What you expected to happen:

We expected the outputs to be saved successfully by the wait container.

How to reproduce it (as minimally and precisely as possible):

Running this manifest with the version of Kubernetes and Argo specified below:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: troubleshoot-outputs-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
        - name: generate-output
          template: generate-output

  - name: generate-output
    outputs:
      artifacts:
      - name: return_value
        path: /tmp/outputs/myfile
    volumes:
    - emptyDir: {}
      name: outputs
    script:
      image: python:alpine3.6
      command: [python]
      volumeMounts:
      - mountPath: /tmp/outputs/
        name: outputs
      source: |
        import os
        with open(os.path.join("/tmp/outputs", "myfile"), "w") as f:
          f.write("...")

Environment:

  • Dagger version: 0.2.0
  • Python version: 3.8
  • Argo version: 3.0.2
  • Argo Exec container image: quay.io/argoproj/argoexec:v3.0.2

Cannot rename file across multiple filesystems

What happened:

When Argo's wait container tried to upload the outputs of a specific task as artifacts, it got the following error:

OSError: [Errno 18] Invalid cross-device link

This error comes from the fact that the local runtime places the output in a temporary directory (root filesystem), but the CLI runtime tries to move those output files onto /tmp/output, which is mounted in Kubernetes as an emptyDir volume.

os.rename does not work across filesystems.

This ticket suggests we use shtutil.move, which will rename when we're on the same file system, or copy if we are working with different ones.

What you expected to happen:

We expected the output files to be moved seamlessly from one directory to the other.

How to reproduce it (as minimally and precisely as possible):

Run the "passing_parameters" example on Argo Workflows. In the k3d cluster set up by the project's Makefile, we'd do something like:

argo submit -n argo --watch tests/examples/argo/passing_parameters.yaml

Environment:

  • py-dagger version: 0.3.0
  • Python version: 3.9

CyclicDependencyError when task is used at different levels in the graph

What happened:

On runtime local, when using a task at different hierarchical levels, got an error:

raise CyclicDependencyError( dagger.dag.topological_sort.CyclicDependencyError: There is a cyclic dependency between the following nodes: {'sum', 'identity'}

What you expected to happen:

I expected dagger to be able to build a consistent graph with different node names even if the task is used at different levels in the graph.

How to reproduce it (as minimally and precisely as possible):

from dagger import dsl


@dsl.task()
def f(x):
    return x


@dsl.task()
def g(x):
    return x


@dsl.DAG()
def pipeline(x):
    x = f(x)
    x = g(x)
    x = f(x)
    return x


if __name__ == "__main__":
    from dagger.runtime.local import invoke
    invoke(dsl.build(pipeline), params={"x": 1})

Environment:

  • Dagger version: 0.1.1
  • Python version: 3.8.6
  • Operating system and version:

IDEs complain about the type returned by the `@dsl.` decorators

What happened:

We noticed that PyCharm was complaining about the types returned by the dsl.DAG and dsl.task decorators, when decorated DAGs were injected into the dsl.build() function.

image

Credit to @razvantudorica-gl for finding this issue.

What you expected to happen:

To get no complaints from the IDE's type system checker.

How to reproduce it (as minimally and precisely as possible):

Environment:

  • Dagger version: 0.1.1
  • Python version: 3.8

Remove having to explicitly write the identity task

What are you trying to do:

When parallelizing on objects in DAGs, it can be confusing having to write the identity function on the input.

Let's say you want to run this DAG:

@dsl.DAG()
def my_dag(my_list: list) -> list:
    output = [some_task(x) for x in my_list]
    return concat_task(output)

At the moment it results in an error:

ValueError: Iterating over the value of a parameter is not a valid parallelization pattern in Dagger. You need to convert the parameter into the output of a node. Read this section in the documentation to find out more: https://larribas.me/dagger/user-guide/dags/map-reduce

This is a request to accept this simplified syntax.

Suggested implementation (optional):

Alternative implementations (optional):

More flexible merges for runtime options

What are you trying to do:

I am trying to create a workflow that has access to a persistent volume on Argo Workflows.

Following the official example, I need to set up a volume claim template at the WorkflowSpec level, and add a volume mount at the container level.

However, Dagger will not allow me to add an extra volume mount to the container, since it is being used to mount an empty dir for the outputs.

I believe the current implementation is not flexible enough to cover many overrides users may want to do in the future.

Background for your use case:

We are implementing Dagger at Glovo, and one of the project is built around the idea of several distributed machines having access to the same volume.

Suggested implementation (optional):

I believe instead of a shallow merge where any key overrides are forbidden, we should do a deep merge, more in line with what Kubernetes provides when you patch a resource.

Thus, if you have:

x = {
  "a": {
    "a": 1
  },
  "b": [1],
}

y = {
  "a": {
    "c": 2
  },
  "b": [2],
}

And you try to merge(x,y), the system would:

  • Set the value of a.c to 2
  • Append 2 to b

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.