Giter Site home page Giter Site logo

argoproj-labs / hera Goto Github PK

View Code? Open in Web Editor NEW
493.0 13.0 102.0 3.28 MB

Hera is an Argo Python SDK. Hera aims to make construction and submission of various Argo Project resources easy and accessible to everyone! Hera abstracts away low-level setup details while still maintaining a consistent vocabulary with Argo. ⭐️ Remember to star!

Home Page: https://hera.rtfd.io

License: Apache License 2.0

Python 99.65% Makefile 0.35%
python kubernetes machine-learning workflow-management workflow-automation cloud-native argo-workflows pypi argo-events library

hera's Introduction

Hera

Hera is a Python framework for constructing and submitting Argo Workflows. The main goal of Hera is to make the Argo ecosystem accessible by simplifying workflow construction and submission.

See the Quick Start guide to start using Hera to orchestrate your Argo Workflows!

The Argo was constructed by the shipwright Argus,
and its crew were specially protected by the goddess Hera.

PyPi stats

Pypi Versions

Downloads Downloads/month Downloads/week

Repo information

License: Apache-2.0 CICD Docs codecov

Explore the code

Open in GitHub Codespaces

Open in Gitpod

Hera at a glance

Steps diamond

from hera.workflows import Steps, Workflow, script


@script()
def echo(message: str):
    print(message)


with Workflow(
    generate_name="single-script-",
    entrypoint="steps",
) as w:
    with Steps(name="steps") as s:
        echo(name="A", arguments={"message": "I'm a step"})
        with s.parallel():
            echo(name="B", arguments={"message": "We're steps"})
            echo(name="C", arguments={"message": "in parallel!"})
        echo(name="D", arguments={"message": "I'm another step!"})

w.create()

DAG diamond

from hera.workflows import DAG, Workflow, script


@script()
def echo(message: str):
    print(message)


with Workflow(
    generate_name="dag-diamond-",
    entrypoint="diamond",
) as w:
    with DAG(name="diamond"):
        A = echo(name="A", arguments={"message": "A"})
        B = echo(name="B", arguments={"message": "B"})
        C = echo(name="C", arguments={"message": "C"})
        D = echo(name="D", arguments={"message": "D"})
        A >> [B, C] >> D

w.create()

See the examples for a collection of Argo workflow construction and submission via Hera!

Requirements

Hera requires an Argo server to be deployed to a Kubernetes cluster. Currently, Hera assumes that the Argo server sits behind an authentication layer that can authenticate workflow submission requests by using the Bearer token on the request. To learn how to deploy Argo to your own Kubernetes cluster you can follow the Argo Workflows guide!

Another option for workflow submission without the authentication layer is using port forwarding to your Argo server deployment and submitting workflows to localhost:2746 (2746 is the default, but you are free to change it). Please refer to the documentation of Argo Workflows to see the command for port forward!

Note Since the deprecation of tokens being automatically created for ServiceAccounts and Argo using Bearer tokens in place, it is necessary to use --auth=server and/or --auth=client when setting up Argo Workflows on Kubernetes v1.24+ in order for hera to communicate to the Argo Server.

Authenticating in Hera

There are a few ways to authenticate in Hera - read more in the authentication walk through - for now, with the argo cli tool installed, this example will get you up and running:

from hera.workflows import Workflow, Container
from hera.shared import global_config
from hera.auth import ArgoCLITokenGenerator

global_config.host = "http://localhost:2746"
global_config.token = ArgoCLITokenGenerator

with Workflow(generate_name="local-test-", entrypoint="c") as w:
    Container(name="c", image="docker/whalesay", command=["cowsay", "hello"])

w.create()

Installation

Note Hera went through a name change - from hera-workflows to hera. This is reflected in the published Python package. If you'd like to install versions prior to 5.0.0, you have to use hera-workflows. Hera currently publishes releases to both hera and hera-workflows for backwards compatibility purposes.

Source Command
PyPi pip install hera
PyPi pip install hera-workflows
GitHub repo python -m pip install git+https://github.com/argoproj-labs/hera --ignore-installed; pip install .

Optional dependencies

yaml

  • Install via hera[yaml]
  • PyYAML is required for the yaml output format, which is accessible via hera.workflows.Workflow.to_yaml(*args, **kwargs). This enables GitOps practices and easier debugging.

cli

  • Install via hera[cli]. The [cli] option installs the extra dependency Cappa required for the CLI
  • The CLI aims to enable GitOps practices, easier debugging, and a more seamless experience with Argo Workflows.
  • The CLI is an experimental feature and subject to change! At the moment it only supports generating YAML files from workflows via hera generate yaml. See hera generate yaml --help for more information.

Presentations

Blogs

Contributing

See the contributing guide!

hera's People

Contributors

bchalk101 avatar bnuzhanyu avatar bobh66 avatar dancardin avatar degiorgio avatar dejamiko avatar dependabot[bot] avatar elliotgunton avatar flaviuvadan avatar guypozner avatar harelwa avatar iameskild avatar jacobhayes avatar jplikesbikes avatar kengoa avatar kitagry avatar menzenski avatar mobin102 avatar mostapharoudsari avatar mshatkhin23 avatar mynameisfiber avatar ni1993 avatar nikhiljha avatar pbrunelle avatar rodrigobaron avatar samj1912 avatar tachylatus avatar tmi avatar trollgeir avatar xiang9156 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hera's Issues

problems with v1.0.0 package published to pypi

Hi,

Something is wrong with the wheels / pypi package published.

Here is the way to inspect / reproduce:

> docker run -it --rm continuumio/miniconda3
> conda create -n py37-env python=3.7 pip -y
[...]
> conda activate py37-env
(py37-env) > pip --version
pip 21.2.2 from /opt/conda/envs/py37-env/lib/python3.7/site-packages/pip (python 3.7)
(py37-env) > pip install hera-workflows==1.0.0
[...]
(py37-env) > python -c "import hera"
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'hera'
(py37-env) > ls /opt/conda/envs/py37-env/lib/python3.7/site-packages/ | grep hera
hera_workflows-1.0.0.dist-info
# here the actual package is missing, e.g. with `pydantic` you'll get 
(py37-env)  > ls /opt/conda/envs/py37-env/lib/python3.7/site-packages/ | grep pydantic
pydantic
pydantic-1.9.0.dist-info

The above issue is also the cause of the conda-forge feedstock CI failing -

conda-forge/hera-workflows-feedstock#2

Many thanks,
Harel

New resource volumes not showing in templates

Ever since the change to resources.volumes, I'm not seeing volumes actually mount or show up in workflow templates. I tested with the following script to make sure it isn't my code. This is based on the example code for secret volumes (that's my specific use-case), but I tried with a standard Volume and still did not see it in the w.spec.templates nor in the submitted workflow.

from hera.resources import Resources
from hera.task import Task
from hera.volumes import SecretVolume
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService

from .utils import get_sa_token

def use_secret():
    from pathlib import Path

    print("Files in /var/secrets:")
    for file in Path("/var/secrets").rglob("*"):
        print(file)


namespace = "etl"
token = get_sa_token(
    "default", namespace=namespace, 
)
ws = WorkflowService(
    host="127.0.0.1:2746",
    token=token,
    namespace=namespace,
)
w = Workflow("volume-provision", ws)
d = Task(
    "use_secret",
    use_secret,
    resources=Resources(
        volumes=[
            SecretVolume(
                secret_name="service-account-ro",
                mount_path="/var/secrets/google/",
            ),
            Volume(size="34Gi", mount_path="/test"),
        ]
    ),
)


w.add_task(d)
print(w.spec.templates)
ws.submit(w.workflow, namespace=namespace)

This gives the template:

[{'dag': {'tasks': [{'arguments': {'artifacts': [], 'parameters': []},
                    'name': 'use-secret',
                    'template': 'use-secret'}]},
 'name': 'volume-provision-16b43f28',
 'parallelism': 50,
 'steps': []}, {'daemon': False,
 'inputs': {'artifacts': [], 'parameters': []},
 'metadata': {'labels': {}},
 'name': 'use-secret',
 'outputs': {'artifacts': []},
 'script': {'command': ['python'],
            'image': 'python:3.7',
            'name': 'use-secret',
            'resources': {'limits': {'cpu': '1', 'memory': '4Gi'},
                          'requests': {'cpu': '1', 'memory': '4Gi'}},
            'source': 'import json\n'
                      '\n'
                      'from pathlib import Path\n'
                      '\n'
                      'print("Files in /var/secrets:")\n'
                      'for file in Path("/var/secrets").rglob("*"):\n'
                      '    print(file)\n'},
 'tolerations': []}]

Adjust `twine` publication step in GHA workflow

The release of 0.4.1 failed here: https://github.com/argoproj-labs/hera-workflows/actions/runs/1675071659

This happened because of the new argo-workflows dependency, which is installed straight from a repo, rather than PyPi. Need to investigate how to publish Hera with such a dependency. Potential solution:

  • adjust twine publication process by replacing pypa action with twine command in Makefile. Maybe twine exposes some flag to allow publishing packages dependent on GH repos

Expose `verify_ssl` and `scheme` on `Configuration`

Multiple users wish to use Hera but have control over certificate checking behavior by controlling the scheme and SSL verification configuration of Hera. Hera needs to expose the verify_ssl flag, along with allowing users to specify their preferred scheme, and domain fully.

Add context management for workflow submission

Currently, users have to add tasks to a workflow independently. After a task is defined, it is necessary to call workflow.add_tasks(t1, t2, ...) for otherwise the tasks are not added to the template of the DAG in the main workflow. To save some effort in this arena, a context manager can be introduced to auto-insert tasks into a workflow, and submit a workflow during the exit phase of the context. This will provide two ways of adding tasks to a workflow.

Current, and supported way of adding tasks and submitting a workflow:

wf = Workflow('test')
t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo'])
t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
t1 >> t2
wf.add(t1, t2)
wf.submit()

Potential future (second) way of adding tasks and submitting a workflow:

with Workflow('test') as wf:
    Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
    >> Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])

Alternatively:

with Workflow('test') as wf:
    t1 = Task('t1', image='docker/whalesay', command=['cowsay', 'foo']) 
    t2 = Task('t2', image='docker/whalesay', command=['cowsay', 'foo'])
    t1 >> t2

This will require the implementation of a context manager on Workflow and a container for Task, which tasks insert themselves into during __init__:

class Workflow:

    def __enter__(self, ...):
        ...
        return self

    def __exit__(self, ...):
        ...
        return self.submit()

class _Container(list):
    
    @classmethod
    def add(cls, t: Task):
        cls.append(t)

class Task:

    def __init__(...):
        ...
        _Container.add(self)

Questions to consider:

  • Does this solve a real problem or would it cause confusion because of of the implicit behavior of task additions?
  • Is this an interface that abides by Hera's principle of simplicity? Is the _Container usage a bit too abstract?
  • Is it safe to assume that exiting a context should indeed submit a workflow? If not but the design is something desirable, should users be provided with the feature, along with a parameter on Workflow that says "auto_submit? Does that pollute the interface of Workflow` unnecessarily with QOL features rather than focusing on exposing Argo specific parameters?

Would love to discuss this @bchalk101 and @bobh66! Any thoughts? Would this be useful to your use case?

Add Test PyPi publication process prior to publication to the official PyPi repository

There have been two broken versions published already, which does not fare well for Hera's stability as a package, and reduces trust in development safety. The issues had to do exclusively with:

  • twine publish failed because a PyPi package cannot have a dependency on a package stored in a repo (build)
  • publication is successful but the package cannot be imported (issue)

As a consequence, Hera needs more guardrails/protection prior to the publication of an official version. The two features that are currently needed are:

  • publication test to Test PyPi to makes sure package can be published
  • usage test post Test PyPi publication that installs the package from the Test PyPi index and runs usage (maybe run tests another time in that isolated environment? The time spent on re-running tests is much, much shorter compared to the time requirements of fixing a broken package and that's not only from Hera development perspective but also dependency management on the user/client side)
  • as long as the aforementioned conditioned are met, then and only then publish an official Hera version

This does not completely eliminate the chances for broken versions. It is still possible to publish broken untested code. So, it might be ideal to also have some test coverage check to offer a perspective of the percentage of code paths covered by tests.

How can I use MinIO for other namespace?

I submit a task in 'argo' namespace and works well, but got an error in another namespace.

Error (exit code 1): You need to configure artifact storage. More information on how to do this can be found in the docs: https://argoproj.github.io/argo-workflows/configure-artifact-repository/

I compared the yaml files, and found the spec.volumes are different.

# in other namespace
spec:
  volumes:
    - name: var-run-argo
      emptyDir: {}
    - name: argo-staging
      emptyDir: {}
    - name: default-token-sxcsg
      secret:
        secretName: default-token-sxcsg
        defaultMode: 420

# in argo namespace
    - name: var-run-argo
      emptyDir: {}
    - name: my-minio-cred
      secret:
        secretName: my-minio-cred
        items:
          - key: accesskey
            path: accesskey
          - key: secretkey
            path: secretkey
        defaultMode: 420

https://argoproj.github.io/argo-workflows/configure-artifact-repository/ tells a way to solve, but how can I get it work well for all namespaces when I install argo?

namespace attribute question

First - I want to make sure I say THANK YOU for this project. It is exactly what I needed to accomplish what I'm working on at the moment, and hopefully I can help wherever needed.

I'm curious why the Workflow and CronWorkflow objects take a name attribute as part of object creation but the namespace is passed in as a parameter to the create()/submit() methods? Is the expectation that the same workflow/cronworkflow object instance could be created in different namespaces by the same code?

Would it make sense to pass the namespace in to the object creation and store it in metadata along with the name, and then have the submit()/create() methods accept an optional namespace that defaults to None, and if no namespace is passed in then it defaults to the object's namespace attribute?

I'm just thinking that it might be "cleaner" to construct the object all at once and not have to worry about passing in a namespace when it's time to create/submit.

If it's easier I can push a PR with what I'm thinking and we can discuss the question in code.

Thanks

Make `func: Callable` an optional parameter on `Task`

If users want to submit a container and execute a command in it, they currently need to supply a lambda: _ (no op) placeholder for func. This should not be required as it also forces Argo to insert an empty script, rather than just execute the container, along with the provided command.

How to ignore certificate errors?

When I attempt to port-forward Argo running in GKE, I get a certificate error. I tried the suggestion from the first comment here, but I still get the certificate error.

C:\Users\ryanr\Code\hera-workflows\examples [main ≡ +0 ~1 -0 !]> python .\diamond.py
Traceback (most recent call last):
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 706, in urlopen      
    chunked=chunked,
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 382, in _make_request
    self._validate_conn(conn)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 1010, in _validate_conn
    conn.connect()
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connection.py", line 426, in connect
    tls_in_tls=tls_in_tls,
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 450, in ssl_wrap_socket
    sock, context, tls_in_tls, server_hostname=server_hostname
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\ssl_.py", line 493, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 423, in wrap_socket
    session=session
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 870, in _create
    self.do_handshake()
  File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.7_3.7.2544.0_x64__qbz5n2kfra8p0\lib\ssl.py", line 1139, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\diamond.py", line 32, in <module>
    w.submit()
  File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow.py", line 129, in submit
    self.service.submit(self.workflow, namespace)
  File "c:\users\ryanr\code\hera-workflows\src\hera\v1\workflow_service.py", line 48, in submit
    return self.service.create_workflow(namespace, V1alpha1WorkflowCreateRequest(workflow=workflow))
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 62, in create_workflow
    return self.create_workflow_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api\workflow_service_api.py", line 162, in create_workflow_with_http_info
    collection_formats=collection_formats)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 185, in __call_api
    _request_timeout=_request_timeout)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\api_client.py", line 413, in request
    body=body)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 271, in POST
    body=body)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\argo\workflows\client\rest.py", line 168, in request
    headers=headers)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 79, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\request.py", line 170, in request_encode_body
    return self.urlopen(method, url, **extra_kw)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
    **response_kw
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
    **response_kw
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 796, in urlopen
    **response_kw
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "C:\Users\ryanr\.virtualenvs\hera-workflows-FA8T1hb6\lib\site-packages\urllib3\util\retry.py", line 574, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=2746): Max retries exceeded with url: /api/v1/workflows/default (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))

CronWorkflow name attribute question

The CronWorkflow object suspend and resume methods have a required name parameter. Can this parameter be optional and default to the name attribute of the CronWorkflow?

Add support for raw value task parameters

Some users want to extract pod metadata, such as IP, from a running task and expose it to others. This can be achieved through something like:

    arguments:
      parameters:
        - name: port
          value: "5051"  // raw values
        - name: whatever-task-field
          value: "{{steps.TASK.FIELD}}"  // step

https://argoproj.github.io/argo-workflows/variables/

Suggestion:

  • have an enum with all available variables
  • help users by only requesting a specification of the field and task name: Whatever(Task.name, Field.IP) (sample)

EnvSpec bug?

Not sure if intended but setting environment variables via EnvSpec leaves unwanted quotation marks in the string when resolved in python script as shown below.

Steps to reproduce: Execute the below task and see that it prints false, while it should be true

`def test_env_spec():

ressource = Resources(
        min_cpu=1,
        max_cpu=2,
        min_mem="2Gi",
        max_mem="4Gi"
)
envs = [
    EnvSpec(
            name="ENV_STRING",
            value='test'
    ),
]
task = Task(
        'test',
        lambda: _,
        None,
        image='python:3.7',
        command=[
            "python",
            "-c",
            "import os; print(os.environ['ENV_STRING']);print(os.environ['ENV_STRING']=='test')",
        ],
        resources=ressource,
        env_specs=envs
)
return task

How to retry a task `n` times?

I've been trying to figure out how to retry a task on error. Some of our workflows are susceptible to k8s failures and we'd like to retry them n times.

I see that Argo supports retryStrategy, which looks like what I am looking for.

From what I can tell, it doesn't seem like Hera supports this yet. You can specify a Retry, but that only supports duration-based retries. There's a get_retry_strategy() method, but it only mentions the backoff method

That same Task definition mentions a retry_limit, which sounds like what I'm looking for, but it's not implemented anywhere else. It's also not clear from the wording if I need to create a new Task called retry_limit, or if retry_limit is a setting associated with a task.

Anyway, is this a feature Hera currently supports? If it needs to be added, could someone point me in the direction of what needs to be done?

Add SECURITY.md

The Argo maintainers recently agreed to require all Argoproj Labs project repositories to contain a SECURITY.md file which documents:

  • Contact information for reporting security vulnerabilities
  • Some minimal information about policies, practices, with possibly links to further documentation with more details

This will help direct vulnerability reporting to the right parties which can fix the issue.

You are free to use the following as examples/templates:

Also, please note that in the future we are exploring a requirement that argoproj-labs projects perform a CII self-assessment to better inform its users about which security best practices are being followed.

Running non-python tasks

For some steps in my workflows, I need to run a bash script or container image. Can I do this with hera, or is this meant for pure python workflows?

Example fails with minikube

I tried the following example with minikube as the backend. Everything is set up according to this guide:
https://argoproj.github.io/argo-workflows/quick-start/

`from hera.task import Task
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService

def say(message: str):
"""
This can be anything as long as the Docker image satisfies the dependencies. You can import anything Python
that is in your container e.g torch, tensorflow, scipy, biopython, etc - just provide an image to the task!
"""
print(message)

token = "sometoken"

ws = WorkflowService(host='https://localhost:2746', verify_ssl=False, token=token,namespace="argo")
w = Workflow('my-workflow', ws)
t = Task('say', say, [{'message': 'Hello, world!'}])
w.add_task(t)
w.submit(namespace='argo')`

I am getting the following error message:
argo.workflows.client.exceptions.ApiException: (401) Reason: Unauthorized HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Trailer': 'Grpc-Trailer-Content-Type', 'Date': 'Mon, 17 Jan 2022 12:01:05 GMT', 'Transfer-Encoding': 'chunked'}) HTTP response body: {"code":16,"message":"Unauthorized"}

I am able to execute the basic examples with couler.

Open access to setting `access_modes` on dynamically provisioned volumes

Hera defaults volume access mode to ['ReadyWriteOnce'] as a consequence of GKE not accepting ReadWriteMany. However, not all clients using Hera run on GKE, so it's safer to provide a default field access_modes with ['ReadyWriteOnce'] as the default but provide the flexibility to set it to other access modes.

Blog post

I think someone should write blog post about this.

Add support for Argo artifacts

Argo Workflows supports input and output artifacts in a template, and a workflow step may specify an artifact argument. These can fully specify the artifact repository inline or use a key-only approach. Depending on the executor, a volume may need to be provided for output artifacts. Support for this feature needs to handle all these options.

Hera has a nice design, and I'm not sure how to add this support without beginning to make the API messier. Do you have thoughts on how to add this while retaining the Hera style?

References:

How to share context in a cronworkflow?

Suppose I have a cronworkflow to execute a hive sql each day, and do some further process.

ws = CronWorkflowService(...)
w = CronWorkflow('cron-example', ws, namespace="argo")
t1 = Task(image='hive/image', command=f"hive -e 'select * from t where day={today} > savedpath_{today}")
t2 = Task( process file savedpath_{today} )

The sql is changing every day, so the t1.command is not determined while submitting the cronworkflow, and t2 should have same value for today.
Though I can use Workflow and submit each day, but is there any better way in hera?

I have been thinking use this way, but I don't know whether argo would support this.

def context_gen():
    #return kv, only support standard lib
    today = datetime.date.today().format('%Y%m%d')
    return {
        "sql_command": f"hive -e 'select * from t where day={today}' > savedpath_{today}"
    }

w = CronWorkflow('cron-example', ws, namespace="argo", context_generator=context_gen)  
# when a new workflow start, it will new a task to generate the context, and passing to all other tasks
t1 = Task(image='hive/image', command_gen = from_context('sql_command') )

Duplicate tasks when specifying multiple func_params

When create a task with multiple func_params and connecting that task to any other task (whether it has parallelism in it or not) will double the run count of the original parallel task.

A simple example:

from hera.task import Task
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService

from spincycle.runner.utils import get_sa_token


def use_secret(a):
    print(a)


# todo: replace the domain and token with your own
namespace = "etl-argo-v3"
token = get_sa_token(
    "default", namespace=namespace, config_file="/home/micha/.kube/config"
)

ws = WorkflowService(
    host="127.0.0.1:2746",
    token=token,
    namespace=namespace,
)
w = Workflow("volume-provision", ws)

t1 = Task(
    "use_secret1",
    use_secret,
    func_params=[{"a": "a"}, {"a": "a2"}],
)
w.add_task(t1)

t2 = Task(
    "use_secret2",
    use_secret,
    func_params=[{"a": "a"}],
)
t1 >> t2
w.add_task(t2)

ws.submit(w.workflow, namespace=namespace)

If you run and submit that, things will look fine within the argo UI:

image

However, if you look at the logs you see that in fact there were 4 pods run for use-secret1, two for each parameter choices:

Name:                volume-provision-3ab82d9e
Namespace:           etl
ServiceAccount:      default
Status:              Succeeded
Conditions:          
 PodRunning          False
 Completed           True
Created:             Wed Feb 16 18:19:19 +0100 (3 minutes ago)
Started:             Wed Feb 16 18:19:19 +0100 (3 minutes ago)
Finished:            Wed Feb 16 18:19:39 +0100 (3 minutes ago)
Duration:            20 seconds
Progress:            5/5
ResourcesDuration:   8s*(1 cpu),1m26s*(100Mi memory)

STEP                          TEMPLATE                   PODNAME                               DURATION  MESSAGE
 ✔ volume-provision-3ab82d9e  volume-provision-3ab82d9e                                                    
 ├─✔ use-secret1(0:a:"a")     use-secret1                volume-provision-3ab82d9e-940572814   4s          
 ├─✔ use-secret1(0:a:\"a\")   use-secret1                volume-provision-3ab82d9e-4054141412  5s          
 ├─✔ use-secret1(1:a:\"a2\")  use-secret1                volume-provision-3ab82d9e-4279694763  4s          
 ├─✔ use-secret1(1:a:"a2")    use-secret1                volume-provision-3ab82d9e-3966846017  4s          
 └─✔ use-secret2              use-secret2                volume-provision-3ab82d9e-1375103652  4s          

I've been trying to debug why this is happening but I can't seem to get anywhere. Taking out the use-secret2 task or removing the parallelism for use-secret1 leads to the expected number of tasks being run. A notable thing I've found is that the parallel task is always doubled in the number of tasks, no matter how parallel it's dependencies are. Also, the difference in quote-escaping between the original and duplicate tasks is consistent across all the times I've seen this happening... I presume it's a clue but I'm unsure where to trace it back to.

For reference, the template that is created is:

- name: volume-provision-3ab82d9e
  inputs: {}
  outputs: {}
  metadata: {}
  dag:
    tasks:
      - name: use-secret1
        template: use-secret1
        arguments:
          parameters:
            - name: a
              value: '{{item.a}}'
        withItems:
          - a: '"a"'
          - a: '"a2"'
      - name: use-secret2
        template: use-secret2
        arguments:
          parameters:
            - name: a
              value: '"a"'
        dependencies:
          - use-secret1
  parallelism: 50
- name: use-secret1
  inputs:
    parameters:
      - name: a
        value: '{{item.a}}'
  outputs: {}
  metadata: {}
  daemon: false
  script:
    name: use-secret1
    image: 'python:3.7'
    command:
      - python
    resources:
      limits:
        cpu: '1'
        memory: 4Gi
      requests:
        cpu: '1'
        memory: 4Gi
    source: |
      import json
      a = json.loads('{{inputs.parameters.a}}')

      print(a)
- name: use-secret2
  inputs:
    parameters:
      - name: a
        value: '"a"'
  outputs: {}
  metadata: {}
  daemon: false
  script:
    name: use-secret2
    image: 'python:3.7'
    command:
      - python
    resources:
      limits:
        cpu: '1'
        memory: 4Gi
      requests:
        cpu: '1'
        memory: 4Gi
    source: |
      import json
      a = json.loads('{{inputs.parameters.a}}')

      print(a)

Share implementation of task inserts functions between `CronWorkflow` and `Workflow`

CronWorkflow and Workflow have shared implementations of:

  • add_task
  • add_tasks
  • add_head
  • add_tail

While having duplicate implementations is fine, as independent services can optimize their own implementations, these are, for now, completely shared. So, it might be better to have these centralized as helper functions, the workflows implementing some protocol that forces them to implement the aforementioned methods, and make the implementation helper calls.

Example for using container with input and output

Could anyone provide an example of using container with input and output artifacts? I'm trying to see how to get output from task1 and save it as input for task2.
Open to any suggestions.
Thanks!

Migrate Hera to newly generate Argo Workflows OpenAPI client

Currently, Hera builds on top of the now deprecated/archived argo-client-python. This client has moved to the main Argo Workflows repository here. Hera's constructs and imports should migrate to the new client. A new minor version should be generated - 0.0.0 -> 0.1.0 - as a dependency will be removed, another one introduced, but the interface does not change, only internals.

Coin_flip by task status.

Hello!
Could you please add "when" for tasks and workflow status ( Succeeded, Failed, Error), like exit handler task.
E.G. for tasks, this working for me:

  def when_status(self, other: 'Task', operator: Operator, value: str) -> 'Task':
      self.argo_task.when = f'{{{{tasks.{other.name}.status}}}} {operator.value} {value}'
      return other.next(self)

and

r = Task("r", random_code)
h = Task("h", heads)
t = Task("t", tails)

h.when_status(r, Operator.equals, "Succeeded")
t.when_status(r, Operator.equals, "Failed")

Can't mix subclasses Task classes when using next

When subclassing Task, you can't set a task of the parent type as the next task.

Example:

class SubclassTask(Task):
    pass

t1 = SubclassTask('t', lambda: _)
t2 = Task('t', lambda: _)

t1.next(t2)

The above code will throw an AssertionError

Unable to resolve methods outside script

Hi there, I'm new to Hera and Argo so bear with me if the question is rookie:

Here is my code snippet:

from app.common.image.image_utils import read_image
from app.common.models.thumbnail_metadata import ThumbnailMetadata
from app.common.s3_utils import resolve_s3_url, upload
from app.common.tasks.argo_task import ArgoTask
from app.services.argo_client import ArgoWorkflowService

class ThumbnailTask(ArgoTask):
    def __init__(self, metadata: ThumbnailMetadata):
        # .... some more code

    def create_thumbnails(self, image_url: str, thumbnail_sizes: list[dict]):
        blob = read_image(self.image_url)

        # .... some more code

    def create(self):
        ArgoWorkflowService.create_task('create_thumbnails', ThumbnailTask.create_thumbnails, [{
            'image_url': self.image_url,
            'thumbnail_sizes': self.thumbnail_sizes
        }])
from typing import Callable
from hera.retry import Retry
from hera.task import Task
from hera.workflow import Workflow
from hera.workflow_service import WorkflowService

from app.core.config import get_app_settings

class ArgoWorkflowService(object):
    workflow_service = None
    workflow = None

    @staticmethod
    def create_task(name: str, func: Callable, func_params: list[dict]):
        # .... some more code

        task = Task(name, func, func_params, image='my-repo.dkr.ecr.us-east-2.amazonaws.com/my-app:latest', retry=Retry(duration=3, max_duration=60))
        ArgoWorkflowService.workflow.add_task(task)
        ArgoWorkflowService.workflow.create()

The error I received in Argo is:

│ main Traceback (most recent call last):                                                                                                                                                                                         │
│ main   File "/argo/staging/script", line 5, in <module>                                                                                                                                                                         │
│ main     blob = read_image(self.image_url)                                                                                                                                                                                      │
│ main NameError: name 'read_image' is not defined                                                                                                                                                                                │
│

The method read_image is from other packages.

In addition, Argo was unable to import 3rd party python libraries. I used the same application image but still not working.

Any help would be appreciated!

Remove internal `v1` module

It is not necessary to version the internal modules of Hera for a few reasons:

  • it makes semantic versioning more complicated than it needs to be (should Hera release a major version update if the underlying SDK changes? what if a single object has to change, such as the Config to modify the domain and verify_ssl flag?). Major package updates will reflect breaking changes, and the CHANGELOG will showcase what's available in a specific release that can be installed via PyPi;
  • the introduction of v2 would mean maintaining two internal version, or more in the future, and a public 3.x version potentially, which can be very confusing for users of all levels of experience. Hera needs to keep in mind that the purpose of it is to make it easy to adopt Argo Workflows. Understanding what a version of Hera means is critical for adoption and clarity/simplicity of installation
  • reduce maintenance burden - when "v1" is conceptually eradicated, Hera can slowly move towards v2, rather than introduce a partially developed v2, which would not be a great experience for users (simplicity is key, so Hera should keep it simple)

Artifacts example

Would you mind sharing how one might use input and output artifacts with hera? Is that possible?

Support latest Python version

Hi! Thank you for developing such a great tool!
Currently, Hera supports only Python 3.7. The version is now security status, will become end of support soon.

reference: https://www.python.org/downloads/

So it is better that supporting Python 3.8+, IMO.
I'm happy to create PR for supporting newer version of Python, if it is OK.

Thank you.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.