mastak / airflow_multi_dagrun Goto Github PK
View Code? Open in Web Editor NEWtriggering a DAG run multiple times
License: Apache License 2.0
triggering a DAG run multiple times
License: Apache License 2.0
Thank you for making this! Was exactly what I needed.
I was trying to run some examples and I stumbled in this error
I'm running the master version of your code with apache-airflow 1.10.6 and python 3.7.5
*** Reading local file: /home/vito.detullio/airflow/logs/trigger_with_multi_dagrun_sensor/gen_target_dag_run/2019-12-10T00:00:00+00:00/1.log
[2019-12-11 17:19:03,862] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [queued]>
[2019-12-11 17:19:03,876] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [queued]>
[2019-12-11 17:19:03,876] {taskinstance.py:841} INFO -
--------------------------------------------------------------------------------
[2019-12-11 17:19:03,876] {taskinstance.py:842} INFO - Starting attempt 1 of 1
[2019-12-11 17:19:03,876] {taskinstance.py:843} INFO -
--------------------------------------------------------------------------------
[2019-12-11 17:19:03,894] {taskinstance.py:862} INFO - Executing <Task(TriggerMultiDagRunOperator): gen_target_dag_run> on 2019-12-10T00:00:00+00:00
[2019-12-11 17:19:03,894] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'trigger_with_multi_dagrun_sensor', 'gen_target_dag_run', '2019-12-10T00:00:00+00:00', '--job_id', '419', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/trigger_with_multi_dagrun_sensor.py', '--cfg_path', '/tmp/tmp6vren6xo']
[2019-12-11 17:19:06,320] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,320] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-12-11 17:19:06,321] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,320] {dagbag.py:92} INFO - Filling up the DagBag from /home/vito.detullio/airflow/dags/trigger_with_multi_dagrun_sensor.py
[2019-12-11 17:19:06,346] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run [2019-12-11 17:19:06,346] {cli.py:545} INFO - Running <TaskInstance: trigger_with_multi_dagrun_sensor.gen_target_dag_run 2019-12-10T00:00:00+00:00 [running]> on host FGVIL021718L
[2019-12-11 17:19:06,361] {logging_mixin.py:112} INFO - [2019-12-11 17:19:06,361] {dagbag.py:92} INFO - Filling up the DagBag from /home/vito.detullio/airflow/dags
[2019-12-11 17:19:06,460] {taskinstance.py:1058} ERROR - 'NoneType' object has no attribute 'create_dagrun'
Traceback (most recent call last):
File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/vito.detullio/airflow/plugins/multi_dagrun/operators/multi_dagrun.py", line 43, in execute
dr = trigger_dag.create_dagrun(
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2019-12-11 17:19:06,488] {taskinstance.py:1089} INFO - Marking task as FAILED.
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run Traceback (most recent call last):
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/bin/airflow", line 37, in <module>
[2019-12-11 17:19:06,552] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run args.func(args)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run return f(*args, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 551, in run
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run _run(args, dag, ti)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/bin/cli.py", line 469, in _run
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run pool=args.pool,
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run return func(*args, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/sentry.py", line 144, in wrapper
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run return func(task_instance, *args, session=session, **kwargs)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow-venv/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run result = task_copy.execute(context=context)
[2019-12-11 17:19:06,553] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run File "/home/vito.detullio/airflow/plugins/multi_dagrun/operators/multi_dagrun.py", line 43, in execute
[2019-12-11 17:19:06,554] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run dr = trigger_dag.create_dagrun(
[2019-12-11 17:19:06,554] {base_task_runner.py:115} INFO - Job 419: Subtask gen_target_dag_run AttributeError: 'NoneType' object has no attribute 'create_dagrun'
[2019-12-11 17:19:08,872] {logging_mixin.py:112} INFO - [2019-12-11 17:19:08,871] {local_task_job.py:124} WARNING - Time since last heartbeat(0.02 s) < heartrate(5.0 s), sleeping for 4.979225 s
[2019-12-11 17:19:13,855] {logging_mixin.py:112} INFO - [2019-12-11 17:19:13,854] {local_task_job.py:103} INFO - Task exited with return code 1
This is a request to add op_args and op_kwargs to template_fields so that you can jinjafy the parameters that are passed into the python callable. This would make it behave more similar to the PythonOperator.
Another option is to inherit PythonOperator (in addition to TriggerDagRun), but that might come with other considerations.
simple example that trigger the webui "crash":
create a pair of DAGS
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': '...',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 0,
'provide_context': True
}
with DAG('dag1',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag1:
op = TriggerMultiDagRunOperator(task_id='op',
trigger_dag_id='dag2',
python_callable=lambda **_: iter([datetime.timedelta(seconds=123.456)]))
with DAG('dag2', default_args=default_args, schedule_interval=None) as dag2:
dummy_operator = DummyOperator(task_id='dummy_operator')
"deploy" them into airflow, and start dag1
.
After the first run of dag2
, open its "Tree View".
Instead of the tree view, an error page with a stacktrace ending with TypeError: Object of type timedelta is not JSON serializable
is shown.
In this regard the behavior of "TriggerDagRunOperator" is different because the payload is dumps
'd during the creation of the conf attribute:
(using airflow-multi-dagrun==1.2 and apache-airflow==1.10.7)
Hi there!
I'm curious about the best way to use your code.
Just copy and paste? Or maybe it could be wrapped in a installable package?
Hi.
I think your project it's useful and I would like to integrate your operators in my project.
Do you have some plans to distribute the package via pypi?
Airflow 2.0.0a1 has been released (see https://lists.apache.org/thread.html/rf34558953ba367561574c194500a34d7f3c21fe2798b173b86fc309c%40%3Cdev.airflow.apache.org%3E).
Airflow 2.0.0 no longer has a DagRunOrder
:
[2020-10-15 16:07:17,999] {plugins_manager.py:156} ERROR - Failed to import plugin airflow_multi_dagrun
Traceback (most recent call last):
File "/.../lib/python3.6/site-packages/airflow/plugins_manager.py", line 149, in load_entrypoint_plugins
plugin_class = entry_point.load()
File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2447, in load
return self.resolve()
File "/.../lib/python3.6/site-packages/pkg_resources/__init__.py", line 2453, in resolve
module = __import__(self.module_name, fromlist=['__name__'], level=0)
File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/__init__.py", line 5, in <module>
from . import operators
File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/__init__.py", line 1, in <module>
from .multi_dagrun import TriggerMultiDagRunOperator # noqa
File "/.../lib/python3.6/site-packages/airflow_multi_dagrun/operators/multi_dagrun.py", line 3, in <module>
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
ImportError: cannot import name 'DagRunOrder'
see apache/airflow#6317 and this UPDATING note.
Hi,
Is it possible to add wait_for_completion = True
feature in this so that my controller dag waits for target dag execution completion and then proceeds with the next step?
Goal:
Bridge release can help people to upgrade their dags before upgrading the entire airflow and test out everything
Airflow Version: 1.10.9
When setting the child DAG as per your example with default options:
dag = DAG(
dag_id='example_target_dag',
schedule_interval=None,
default_args={'start_date': datetime.utcnow(), 'owner': 'airflow'},
)
def run_this_func(dag_run, **kwargs):
print("Chunk received: {}".format(dag_run.conf['index']))
chunk_handler = PythonOperator(
task_id='chunk_handler',
provide_context=True,
python_callable=run_this_func,
dag=dag
)
Where start_date {'start_date': datetime.utcnow()}
causes the Dag to execute but all tasks are skipped (including the initial task)
Changing this to days_ago(0)
resolves it.
Thanks,
Karl.
Hi @mastak ,
Also big thanks from me! I was looking for a solution for this for a day and stumbled across your solution somehow by chance - google is not your friend in finding this.
I created a fork in https://github.com/flinz/airflow_multi_dagrun that does the following:
Please let me know if i should open a PR for the whole thing - I am not sure if the changes I added go beyond what you want to maintain here.
From airflow
2.1.0rc2, has been added an improvement:
apply_default
to subclasses of BaseOperator
During airflow db upgrade
command, I get different warnings as deprecation.
Simply remove the import of apply_defaults
decorator and @apply_default
.
If TriggerMultiDagRunOperator failed and set to be rescheduled, It may have some dagruns that already created in the first run.... We need to ignore DagRunAlreadyExists but collect the dag_run.id to be stored in TriggerMultiDagRunOperator#CREATED_DAGRUN_KEY.
Alternatively, allow user to pass a mapping of dag_ids to callables so that multiple different dags can be called.
With apache/airflow#6317, TriggerDagRunOperator changed to drop the use of DagRunOrder and you thankfully adapted their way. However I think we are following bad decision by allowing TriggerMultiDagRunOperator to use unidentifiable run_id
I'd like to discuss the possibility of restoring the old behavior where TriggerMultiDagRunOperator#python_callable can return DagRunOrder, so a user can provide custom+dynamic run_id.
In my case, I use TriggerMultiDagRunOperator to trigger thousands of dagruns and I assign for every one of those an identifiable run_id that can distinguish every dagrun from the other. surrendering indefinable run_id with gabbrish one is not wise at all.
WDYT?
Hi,
I am currently using MultiDagRunSensor
and I notice that this sensor always succeed regardless of the success / fail of the target_dag.
Is it possible to add a parameter to mark the sensor task to success if all thetarget_dag
s are succesfully run ?
Thank you for your time and help.
Currently when using the operators and sensors supplied by this package, they do not show up in the DAG Dependencies view (Browse -> DAG Dependencies).
Right now this is coupled rather tightly to the provided Airflow TriggerDagRunOperator
and ExternalTaskSensor
attributes, but you can supply your own dependency detector class via the airflow configuration via scheduler -> dependency detector which could be supplied as part of this package.
Happy to look at this further if this is desired.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.