Giter Site home page Giter Site logo

airflow_multi_dagrun's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

airflow_multi_dagrun's Issues

Thanks

Thank you for making this! Was exactly what I needed.

ERROR - 'NoneType' object has no attribute 'create_dagrun'

I was trying to run some examples and I stumbled in this error

I'm running the master version of your code with apache-airflow 1.10.6 and python 3.7.5

*** Reading local file: /home/vito.detullio/airflow/logs/trigger_with_multi_dagrun_sensor/gen_target_dag_run/2019-12-10T00:00:00+00:00/1.log
[2019-12-11 17:19:03,862] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [queued]>
[2019-12-11 17:19:03,876] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [queued]>
[2019-12-11 17:19:03,876] {taskinstance.py:841} INFO - 
--------------------------------------------------------------------------------
[2019-12-11 17:19:03,876] {taskinstance.py:842} INFO - Starting attempt 1 of 1
[2019-12-11 17:19:03,876] {taskinstance.py:843} INFO - 
--------------------------------------------------------------------------------
[2019-12-11 17:19:03,894] {taskinstance.py:862} INFO - Executing <Task(TriggerMultiDagRunOperator): gen_target_dag_run> on 2019-12-10T00:00:00+00:00
[2019-12-11 17:19:03,894] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'trigger_with_multi_dagrun_sensor', 'gen_target_dag_run', '2019-12-10T00:00:00+00:00', '--job_id', '419', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger_with_multi_dagrun_sensor.py', '--cfg_path', '/tmp/tmp6vren6xo']
[2019-12-11 17:19:06,320] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,320] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-12-11 17:19:06,321] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,320] {dagbag.py:92} INFO - Filling up the DagBag from /home/vito.detullio/airflow/dags/trigger_with_multi_dagrun_sensor.py
[2019-12-11 17:19:06,346] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,346] {cli.py:545} INFO - Running <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [running]> on host FGVIL021718L
[2019-12-11 17:19:06,361] {logging_mixin.py:112} INFO - [2019-12-11 17:19:06,361] {dagbag.py:92} INFO - Filling up the DagBag from /home/vito.detullio/airflow/dags
[2019-12-11 17:19:06,460] {taskinstance.py:1058} ERROR - 'NoneType' object has no attribute 'create_dagrun'
Traceback (most recent call last):
  File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/vito.detullio/airflow/plugins/multi_dagrun/operators/multi_dagrun.py", line 43, in execute
    dr = trigger_dag.create_dagrun(
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2019-12-11 17:19:06,488] {taskinstance.py:1089} INFO - Marking task as FAILED.
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run Traceback (most recent call last):
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/bin/airflow", line 37, in <module>
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     args.func(args)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     return f(*args, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 551, in run
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     _run(args, dag, ti)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 469, in _run
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     pool=args.pool,
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     return func(*args, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/sentry.py", line 144, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     return func(task_instance, *args, session=session, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     result = task_copy.execute(context=context)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run   File "/home/vito.detullio/airflow/plugins/multi_dagrun/operators/multi_dagrun.py", line 43, in execute
[2019-12-11 17:19:06,554] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run     dr = trigger_dag.create_dagrun(
[2019-12-11 17:19:06,554] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2019-12-11 17:19:08,872] {logging_mixin.py:112} INFO - [2019-12-11 17:19:08,871] {local_task_job.py:124} WARNING - Time since last heartbeat(0.02 s) < heartrate(5.0 s), sleeping for 4.979225 s
[2019-12-11 17:19:13,855] {logging_mixin.py:112} INFO - [2019-12-11 17:19:13,854] {local_task_job.py:103} INFO - Task exited with return code 1

Templating Option for Callable

This is a request to add op_args and op_kwargs to template_fields so that you can jinjafy the parameters that are passed into the python callable. This would make it behave more similar to the PythonOperator.

Another option is to inherit PythonOperator (in addition to TriggerDagRun), but that might come with other considerations.

passing a timedelta object as payload results in a webui crash

simple example that trigger the webui "crash":

create a pair of DAGS

import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': '...',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 0,
    'provide_context': True
}

with DAG('dag1',
         default_args=default_args,
         schedule_interval=datetime.timedelta(days=1)) as dag1:
    op = TriggerMultiDagRunOperator(task_id='op',
                                    trigger_dag_id='dag2',
                                    python_callable=lambda **_: iter([datetime.timedelta(seconds=123.456)]))

with DAG('dag2', default_args=default_args, schedule_interval=None) as dag2:
    dummy_operator = DummyOperator(task_id='dummy_operator')

"deploy" them into airflow, and start dag1 .
After the first run of dag2, open its "Tree View".
Instead of the tree view, an error page with a stacktrace ending with TypeError: Object of type timedelta is not JSON serializable is shown.

In this regard the behavior of "TriggerDagRunOperator" is different because the payload is dumps'd during the creation of the conf attribute:

https://github.com/apache/airflow/blob/c8597cbf143b970ad3c7b0d62e3b44d1dfdc8afe/airflow/operators/dagrun_operator.py#L91-L95

(using airflow-multi-dagrun==1.2 and apache-airflow==1.10.7)

Best way to use it?

Hi there!

I'm curious about the best way to use your code.
Just copy and paste? Or maybe it could be wrapped in a installable package?

No pypi package

Hi.
I think your project it's useful and I would like to integrate your operators in my project.

Do you have some plans to distribute the package via pypi?

Airflow 2.0.0 support?

Airflow 2.0.0a1 has been released (see https://lists.apache.org/thread.html/rf34558953ba367561574c194500a34d7f3c21fe2798b173b86fc309c%40%3Cdev.airflow.apache.org%3E).

Airflow 2.0.0 no longer has a DagRunOrder:

[2020-10-15 16:07:17,999] {plugins_manager.py:156} ERROR - Failed to import plugin airflow_multi_dagrun
Traceback (most recent call last):
  File "/.../lib/python3.6/site-packages/airflow/plugins_manager.py", line 149, in load_entrypoint_plugins
    plugin_class = entry_point.load()
  File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2447, in load
    return self.resolve()
  File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2453, in resolve
    module = __import__(self.module_name, fromlist=['__name__'], level=0)
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/__init__.py", line 5, in <module>
    from . import operators
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/__init__.py", line 1, in <module>
    from .multi_dagrun import TriggerMultiDagRunOperator  # noqa
  File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/multi_dagrun.py", line 3, in <module>
    from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
ImportError: cannot import name 'DagRunOrder'

see apache/airflow#6317 and this UPDATING note.

start_date: datetime.utcnow() causes DAG to run but trigger no tasks

Airflow Version: 1.10.9

When setting the child DAG as per your example with default options:

dag = DAG(
   dag_id='example_target_dag',
   schedule_interval=None,
   default_args={'start_date': datetime.utcnow(), 'owner': 'airflow'},
)


def run_this_func(dag_run, **kwargs):
   print("Chunk received: {}".format(dag_run.conf['index']))


chunk_handler = PythonOperator(
   task_id='chunk_handler',
   provide_context=True,
   python_callable=run_this_func,
   dag=dag
)

Where start_date {'start_date': datetime.utcnow()} causes the Dag to execute but all tasks are skipped (including the initial task)
Changing this to days_ago(0) resolves it.

Thanks,
Karl.

Also thanks / bigger changes

Hi @mastak ,

Also big thanks from me! I was looking for a solution for this for a day and stumbled across your solution somehow by chance - google is not your friend in finding this.

I created a fork in https://github.com/flinz/airflow_multi_dagrun that does the following:

  • include https://github.com/puckel/docker-airflow in the docker compose, remove most things from the Makefile (might be a bit too much)
  • added travis CI
  • patch multi_dagrun.py to work with airflow 1.10 and remove deprecation warnings (I opened a pull request with only this to you)

Please let me know if i should open a PR for the whole thing - I am not sure if the changes I added go beyond what you want to maintain here.

Auto-apply ``apply_default`` decorator

From airflow 2.1.0rc2, has been added an improvement:

  • Auto-apply apply_default to subclasses of BaseOperator

During airflow db upgrade command, I get different warnings as deprecation.

Simply remove the import of apply_defaults decorator and @apply_default.

handle DagRunAlreadyExists

If TriggerMultiDagRunOperator failed and set to be rescheduled, It may have some dagruns that already created in the first run.... We need to ignore DagRunAlreadyExists but collect the dag_run.id to be stored in TriggerMultiDagRunOperator#CREATED_DAGRUN_KEY.

allow custom run_id

With apache/airflow#6317, TriggerDagRunOperator changed to drop the use of DagRunOrder and you thankfully adapted their way. However I think we are following bad decision by allowing TriggerMultiDagRunOperator to use unidentifiable run_id

I'd like to discuss the possibility of restoring the old behavior where TriggerMultiDagRunOperator#python_callable can return DagRunOrder, so a user can provide custom+dynamic run_id.

In my case, I use TriggerMultiDagRunOperator to trigger thousands of dagruns and I assign for every one of those an identifiable run_id that can distinguish every dagrun from the other. surrendering indefinable run_id with gabbrish one is not wise at all.
WDYT?

Feature request: MultiDagRunSensor to fail if any target_dag fails?

Hi,

I am currently using MultiDagRunSensor and I notice that this sensor always succeed regardless of the success / fail of the target_dag.

Is it possible to add a parameter to mark the sensor task to success if all thetarget_dags are succesfully run ?

Thank you for your time and help.

DAGs triggered or waited on by operators or sensors provided do not show up in the DAG Dependencies View

Currently when using the operators and sensors supplied by this package, they do not show up in the DAG Dependencies view (Browse -> DAG Dependencies).

Right now this is coupled rather tightly to the provided Airflow TriggerDagRunOperator and ExternalTaskSensor attributes, but you can supply your own dependency detector class via the airflow configuration via scheduler -> dependency detector which could be supplied as part of this package.

Happy to look at this further if this is desired.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.