At the request of Member States, data on the daily number of new reported COVID-19 cases and deaths by EU/EEA country will be available to download from 11 March 2021 on European Centre for Disease Prevention and Control website (https://www.ecdc.europa.eu/en/covid-19/data).
I create an ETL batch data pipeline on Google Cloud Platform by using Dataflow and Composer(AirFlow). The whole workflow covers daily data download, data upload to Google cloud storage, data transform and cleaning by Dataflow and load data to bigquery. When data in have loaded in Bigquery, we can analyze data using SQL in Bigquery and connect Tableau to Bigquery for data visulization and analysis. The pipeline schedule is control by Airflow, and can run the pipeline every day automaticlly, and update the data in Data Warehouse (Bigquery). The visulization in Tableau can also be updated easily according the updated data in Bigquery.
The project contains follow steps:
- Enable API used in this project.
- Create a Composer environment.
- Create a Cloud Storage bucket, named: t-osprey-337221-covid
- Create a bigquery dataset
- Setting Airflow variables in Airflow web UI
- Copy the DAG python file to Cloud Storage Dag folder
- Exploring DAG runs
- Check the Bigquery
- Connect to Tableau
(1) Enable Kubernetes Engine API
(2) Enable DataFlow API in this project
(3) Enable Cloud Composer API
Click CREATE ENVIRONMENT and select Composer 1. Set the following for your environment: Name highcpu Location europe-central2 Zone europe-central2-a Machine type n1-highcpu-4
leave others as default
After create
Go to Computer Engine, it shows:
Go to Google cloud storage, you will see a new bucket create:
The bucket location set to europe-north1 (Finland), Meanwhile create two foders in this bucket: covid-eu, staging
'covid-eu' folder used to storage the data airflow downloadfrom website
'staging' folder used for data temp location in the dataflow pipeline
Meanwhile, in this folder upload two files writen by python:
'covid_composer_dataflow_dag.py' : dag file for airflow, which define the workflow
'dataflow_etl_bigquery.py' : dataflow file for dataflow pipeline
Go back to Composer to check the status of your environment.
Once your environment has been created, click the name of the environment (highcpu) to see its details.
On the Environment details you'll see information such as the Airflow web interface URL, Kubernetes Engine cluster ID, and a link to the DAGs folder, which is stored in your bucket.
Open Airflow web interface URL, setting Airflow variables. Select Admin > Variables from the Airflow menu bar, then Create.
In step4, in the environment configration, we will find DAG folder path: 'gs://europe-central2-highcpu-816bf1da-bucket/dags' In step3, we have already upload the dag python file 'covid_composer_dataflow_dag.py' in the google cloud storage
Run command line in cloud shell to copy dag python file to DAG folder:
gsutil cp gs://t-osprey-337221-covid/covid_composer_dataflow_dag.py gs://europe-central2-highcpu-816bf1da-bucket/dags
After this in DAG folder we will see the Dag python file 'covid_composer_dataflow_dag.py' is in Dag folder
After this The airflow start to run the whole work flow
The Dag file contant:
"""Example Airflow DAG that install packages and download data from website into GCS, run dataflow to do data cleaning and load the final
two tables to Bigquery for later analyze.
This DAG relies on three Airflow variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
created.
"""
import datetime
import os
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_region = models.Variable.get("gce_region")
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"dataflow_default_options": {
"project": project_id,
# Set to your region
"region": gce_region,
},
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"covid_composer_dataflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
# define the first task
first_data_download = BashOperator(
task_id='first_data_download',
bash_command='curl https://opendata.ecdc.europa.eu/covid19/nationalcasedeath_eueea_daily_ei/csv/data.csv | gsutil cp - gs://t-osprey-337221-covid/covid-eu/country.csv',
dag=dag,
)
second_data_download = BashOperator(
task_id='second_data_download',
bash_command='curl https://opendata.ecdc.europa.eu/covid19/subnationalcasedaily/csv/data.csv | gsutil cp - gs://t-osprey-337221-covid/covid-eu/region.csv',
dag=dag,
)
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file='gs://t-osprey-337221-covid/dataflow_etl_bigquery.py',
pipeline_options={
'tempLocation': 'gs://{0}/staging/'.format(bucket_path),
'stagingLocation': 'gs://{0}/staging/'.format(bucket_path),
},
py_options=[],
py_requirements=['apache-beam[gcp]==2.35.0','apache_beam[dataframe]'],
py_interpreter='python3',
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name='{{task.task_id}}', project_id='{0}'.format(project_id), location="europe-north1"
),
)
first_data_download >> second_data_download >> start_python_pipeline_dataflow_runner
In this Dag file, you will found 3 operators, which means the whole work flow can be 3 parts, the third the operator is 'BeamRunPythonPipelineOperator' This operater is function for run a insert dataflow job, thie dataflow job file is already upload in 'gs://t-osprey-337221-covid/dataflow_etl_bigquery.py'
dataflow_etl_bigquery.py, the details of dataflow is :
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.dataframe import convert
#pandas
from apache_beam.dataframe.transforms import DataframeTransform
PROJECT='t-osprey-337221'
BUCKET='t-osprey-337221-covid'
"""
from apache_beam.io.gcp.internal.clients import bigquery
table_spec_region = bigquery.TableReference(
projectId='t-osprey-337221',
datasetId='covid_eu',
tableId='covid_region')
table_spec_country = bigquery.TableReference(
projectId='t-osprey-337221',
datasetId='covid_eu',
tableId='covid_country')
"""
# project-id:dataset_id.table_id
table_spec_region = 't-osprey-337221:covid_eu.covid_region'
table_spec_country = 't-osprey-337221:covid_eu.covid_country'
# column_name:BIGQUERY_TYPE, ...
table_schema_region = 'country:string, region_name:string, nuts_code:string, date:TIMESTAMP, rate_14_day_per_100k:float'
table_schema_country = 'dateRep:string, cases:integer, deaths:integer, countriesAndTerritories:string, geoId:string, countryterritoryCode:string, popData2020:integer'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=europe-north1',
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as pipeline:
# Create two deferred Beam DataFrames with the contents of our csv file.
region_df = pipeline | 'Read region CSV' >> beam.dataframe.io.read_csv(r'gs://t-osprey-337221-covid/covid-eu/region.csv', usecols=[0,1,2,3,4])
country_df = pipeline | 'Read country CSV' >> beam.dataframe.io.read_csv(r'gs://t-osprey-337221-covid/covid-eu/country.csv', usecols=[0,4,5,6,7,8,9])
# Data cleaning
region_df.fillna(value=0,inplace=True)
region_df=region_df.astype({'country':'string', 'region_name':'string', 'nuts_code':'string', 'date':'datetime64', 'rate_14_day_per_100k':'float'})
country_df=country_df.astype({'dateRep':'string', 'cases':'int', 'deaths':'int', 'countriesAndTerritories':'string', 'geoId':'string', 'countryterritoryCode':'string', 'popData2020':'int'})
country_df['cases'].fillna(value=0, inplace=True)
country_df['deaths'].fillna(value=0, inplace=True)
(
# Convert the Beam DataFrame to a PCollection.
convert.to_pcollection(region_df)
# We get named tuples, we can convert them to dictionaries like this.
| 'region To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))
# save the elements to GCS.
| 'region save to bigquery' >> beam.io.WriteToBigQuery(
table_spec_region,
schema=table_schema_region,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
(
# Convert the Beam DataFrame to a PCollection.
convert.to_pcollection(country_df)
# We get named tuples, we can convert them to dictionaries like this.
| 'country To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))
# save the elements to GCS.
| 'country save to bigquery' >> beam.io.WriteToBigQuery(
table_spec_country,
schema=table_schema_country,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
if __name__ == '__main__':
run()
This dataflow file writen by python, using beam structure.
Open Airflow web interface:
Open covid_composer_dataflow_dag
When this dag successful finished, go to Dataflow console, you will see a dataflow job create:
Open this job:
two new tables have been generate
From Tableau, I can choose different country, check differnt date cases, and check cases distribution by country and also by region.