basph / data-pipelines-with-apache-airflow Goto Github PK
View Code? Open in Web Editor NEWCode for Data Pipelines with Apache Airflow
Home Page: https://www.manning.com/books/data-pipelines-with-apache-airflow
License: Other
Code for Data Pipelines with Apache Airflow
Home Page: https://www.manning.com/books/data-pipelines-with-apache-airflow
License: Other
The citibike_api_1 fails to run , showing an error ` File "app.py", line 6, in
from flask import Flask, jsonify, Response
File "/usr/local/lib/python3.7/site-packages/flask/init.py", line 14, in
from jinja2 import escape`
Same thing goes for the nyc_transportation api.
Also After running and opening the web servers the DAGs do not show up even though they are visible and could be opened inside the file structure
In Chapter 2, when i try to copy the DAG python file to the path ~/airflow/dags/ , I get a response which says the recipient path does not exist.
Can you please share how to arrive at the correct file path. I installed airflow using pip
the code license is missing. Is it yet defined and can the code presented here be re-used, resp. under which conditions?
The 11_xcoms_return.py should modify to use "return_value" as XComs pull key as indicated in Figure 5.17.
From:
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
},
)
To:
deploy_model = PythonOperator(
task_id="deploy_model",
python_callable=_deploy_model,
templates_dict={
"model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='return_value')}}"
},
)
Hi team, the request is to add best practice(s) around mocking metastore variables for use in unit tests. I've been able to pull together the following code in my code but curious if you have other/better approaches for mocking out Variables for use in pytest or other unit test frameworks. Thanks.
from airflow.models.variable import Variable
def test_dummy(monkeypatch):
def mock_get(*args, **kwargs):
mocked_dict = { "dummy_key1":"dummy_val1","dummy_key2":"dummy_val2"}
return mocked_dict.get(args[0])
monkeypatch.setattr(Variable,"get",mock_get)
dummy_var = Variable.get("dummy_key1")
data-pipelines-with-apache-airflow/chapter04/dags/listing_4_8.py
Lines 13 to 14 in 40b52ba
airflow==2.2.2 shows
DeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'logical_date' or 'data_interval_start' instead.
and
DeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
can not use "sql="postgres_query-{{ ts_nodash }}.sql"," in chapter4
error:jinja2.exceptions.TemplateNotFound: postgres_query-{{ ts_nodash }}.sql
when you use "curl -o /data/events.json http://events_api:5000/events" and say "fetch and store the events from the API"
What is that API? I assume this DAG won't run? not sure if I missed something earlier and I can't seem to find that.
Hi Team,
The concepts of chapter 9 help me a lot and I found codes in the path of airflowbook.operators.movielens_operator are missing. Could you help upload them?
Thank you for your great work.
On Macbook Pro M1 Max, after running docker-compose up -d
, the following error occured on port 8083 and 8082.
Traceback (most recent call last):
File "app.py", line 4, in <module>
from flask import Flask, render_template
File "/usr/local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>
from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.7/site-packages/jinja2/__init__.py)
Traceback (most recent call last):
File "app.py", line 4, in <module>
from flask import Flask, render_template
File "/usr/local/lib/python3.7/site-packages/flask/__init__.py", line 14, in <module>
from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.7/site-packages/jinja2/__init__.py)
Also, 'minio_init-1' and 'initdb-adduser-1' are remained as 'EXITED(0)'.
Traceback (most recent call last):
File "/app.py", line 8, in
from flask import Flask, jsonify, request
File "/usr/local/lib/python3.8/site-packages/flask/init.py", line 14, in
from jinja2 import escape
ImportError: cannot import name 'escape' from 'jinja2' (/usr/local/lib/python3.8/site-packages/jinja2/init.py)
I tried to run figure_6_20.py. ExternalTaskSensor in dag2 remains in running state even the process_market
task in dag1 is success. Could anyone give me a pointer on this issue? Thanks.
There is a mistake in script in chapter02/scripts/listing_2_10.sh : lack of '\' special symbol after command --name airflow
in line 15.
Execution of script ends with error: "docker run" requires at least 1 argument.
Line 15 should be --name airflow \
If I run pytest
from data-pipelines-with-apache-airflow/chapters/chapter8
it encounters several ModuleNotFoundError
errors:
pytest
===================================================================================== test session starts =====================================================================================
platform darwin -- Python 3.8.2, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8
plugins: pytest_docker_tools-0.2.2, helpers-0.1.0, helpers-namespace-2019.1.8, mock-3.3.1
collected 1 item / 5 errors
=========================================================================================== ERRORS ============================================================================================
__________________________________________________________ ERROR collecting tests/airflowbook/operators/test_json_to_csv_operator.py __________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_json_to_csv_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_json_to_csv_operator.py:5: in <module>
from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
E ModuleNotFoundError: No module named 'airflowbook.operators.json_to_csv_operator'
___________________________________________________________ ERROR collecting tests/airflowbook/operators/test_movielens_operator.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_movielens_operator.py:11: in <module>
from airflowbook.operators.movielens_operator import (
E ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
__________________________________________________________ ERROR collecting tests/airflowbook/operators/test_movielens_operator2.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator2.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
tests/airflowbook/operators/test_movielens_operator2.py:7: in <module>
from airflowbook.operators.movielens_operator import (
E ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
________________________________________________________________ ERROR collecting tests/dags/chapter7/custom/test_operators.py ________________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/dags/chapter7/custom/test_operators.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
tests/dags/chapter7/custom/test_operators.py:4: in <module>
from airflowbook.operators.movielens_operator import (
E ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
___________________________________________________________ ERROR collecting tests/dags/chapter7/custom/test_operators_incorrect.py ___________________________________________________________
ImportError while importing test module '/Users/ejstembler/Projects/data-pipelines-with-apache-airflow/chapters/chapter8/tests/dags/chapter7/custom/test_operators_incorrect.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/__init__.py:127: in import_module
return _bootstrap._gcd_import(name[level:], package, level)
tests/dags/chapter7/custom/test_operators_incorrect.py:5: in <module>
from airflowbook.operators.movielens_operator import MovielensPopularityOperator
E ModuleNotFoundError: No module named 'airflowbook.operators.movielens_operator'
=================================================================================== short test summary info ===================================================================================
ERROR tests/airflowbook/operators/test_json_to_csv_operator.py
ERROR tests/airflowbook/operators/test_movielens_operator.py
ERROR tests/airflowbook/operators/test_movielens_operator2.py
ERROR tests/dags/chapter7/custom/test_operators.py
ERROR tests/dags/chapter7/custom/test_operators_incorrect.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 5 errors during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
====================================================================================== 5 errors in 0.18s ======================================================================================
Is there a way to get pytest to see these?
UPDATE: I guess you need to include the corresponding src
directory.
In any case, I have another project I'm modelling after this which has src
and test
and an operator in a subdirectory. It's getting similiar ModuleNotFoundError
errors. It would be great to see a full working example.
Running the tutorial on Digital Ocean Droplet had to make the following edits to the code to get it to run:
Update Import Statements to :
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
Update the curl statement in the bash command passed to the BashOperator:
bash_command = "curl -Lk -o /tmp/launches.json 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'"
Hi,
I am trying to use the test_dag_integrity.py
code on my own project.
Here is my directory structure:
root/
In dag1.py
, it imports several airflow operator like BigQueryOperator, SlackOperator, etc.
In test_dag_integrity.py
, I modified the
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
->
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "dags/**.py")
to meet my directory structure.
Then I run pytest test_dag_integrity.py
, it failed and the summary info is like:
FAILED test_daily_pipeline_integration.py::test_dag_integrity[PATH\\TO\\MY\\DIRECTORY\\..\\dags\\ip_address_mapping.py] - ModuleNotFoundError: No module named 'airflow.providers.google'
Is there anything I missed?
Thanks !
The ELT process presented in the chapter 4 leads to an expected results if mulitple dag runs are executed in a short window or concurrently, which happens during a backfill or when the max dag runs param > 1. All the dag runs are writing to the sampe files /tmp/wikipageviews.gz and /tmp/wikipageviews.sql rather than files specific to the execution date. The same process also will produce duplicated results if pipeline is run multiple times or restarted.
While trying to start to project I found the issue with the path to NYC data that is stored in s3
More explanation here: https://coiled.io/blog/nyc-taxi-parquet-csv-index-error/
Solution proposal:
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-
to
https://s3.amazonaws.com/nyc-tlc/csv_backup/yellow_tripdata_2019-01
git clone https://github.com/BasPH/data-pipelines-with-apache-airflow.git
Cloning into 'data-pipelines-with-apache-airflow'...
remote: Enumerating objects: 1492, done.
remote: Counting objects: 100% (2/2), done.
remote: Compressing objects: 100% (2/2), done.
remote: Total 1492 (delta 1), reused 0 (delta 0), pack-reused 1490
Receiving objects: 100% (1492/1492), 267.00 KiB | 223.00 KiB/s, done.
Resolving deltas: 100% (751/751), done.
warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:
'chapter07/README.md'
'chapter07/readme.md'
The dag id is named as listing_6_08
in chapter06/dags/listing_6_8.py
, so the chapter06/scripts/trigger_dag.sh
should follow the dag id.
From:
#!/usr/bin/env bash
# Trigger DAG with Airflow CLI
airflow dags trigger listing_6_8 --conf '{"supermarket": 1}'
# Trigger DAG with Airflow REST API
curl -X POST "http://localhost:8080/api/v1/dags/listing_6_8/dagRuns" -H "Content-Type: application/json" -d '{"conf": {"supermarket": 1}}' --user "admin:admin"
To:
#!/usr/bin/env bash
# Trigger DAG with Airflow CLI
airflow dags trigger listing_6_08 --conf '{"supermarket": 1}'
# Trigger DAG with Airflow REST API
curl -X POST "http://localhost:8080/api/v1/dags/listing_6_08/dagRuns" -H "Content-Type: application/json" -d '{"conf": {"supermarket": 1}}' --user "admin:admin"
I was reading Chapter 7 of book and tried to reproduce the examples there, so I came here to check it out.
If I go to https://github.com/BasPH/data-pipelines-with-apache-airflow/tree/master/chapter07, there is code there for Chapter09 and I cant find instructions to reproduce the content on Chapter07.
chapter 7 directory has two readme files: README.md
and readme.md
. This caused git to give the warning about colliding paths:
warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:
'chapter07/README.md'
'chapter07/readme.md'
These two files seem to have the same content, can we delete one of them?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.