Giter Site home page Giter Site logo

crypto_sentiment_pipeline's Introduction

Data Engineering Project: Crypto Sentiment Pipeline (WIP)

Python 3.8

Crypto Sentiment Pipeline is an implementation of the data pipeline which consumes the latest data from coinmarketcap and post data from twitter and reddit. Consolidating all data into a centralized data store for sentiment analysis. The pipeline infrastructure is built using popular, open-source projects.

Tools & Technologies

Table of Contents

Architecture diagram

Pipeline Architecture

Project Structure

Airflow
|____config
| |____airflow.cfg                                      # Config file
|
|____dags
| |____binance_dag.py                                   # Dag folder
| |____coinmarketcap_dag.py
| |____reddit_dag.py
| |____twitter_dag.py
|
|____docker                                             # Docker build instructions
| |____Dockerfile
|
|____plugins
| |____custom_hooks                                     # Custom Hooks to connect with APIs
| | |____binance_hook.py
| | |____coinmarketcap_hook.py
| | |____reddit_hook.py
| | |____twitter_hook.py
| |  
| |____custom_operators                                 # Custom operators to build dags
| | |____binance_toAzureDataLake.py
| | |____coinmarketcap_toAzureDataLake.py
| | |____data_quality.py
| | |____reddit_toAzureDataLake.py
| | |____twitter_toAzureDataLake.py
| |
| |____custom_scripts                                   # Vadersentiment script for sentiment analysis
| | |____vaderSentiment.py
| |
| |____custom_transfers                                 # Custom transfers to build dags
| | |____azureDataLake_toSnowflake.py
| | |____load_toSnowflakeTables.py
| |
| |____helpers                                          # SQL scripts to build data warehouse
| | |____sql_queries.py
|
|____docker-compose.yml 
|

Setup

Airflow Connections

Setup your airflow connections

Service Conn ID Conn Type Other fields
Snowflake snowflake_conn_id Snowflake Fill out your Snowflake database credentials
Azure Blob Storage azure_conn_id Azure Blob Storage Fill out your Azure container credentials
Twitter API V2 (Tweepy) twitter_conn_id HTTP Extras: {'BEARER_TOKEN' : YOUR_BEARER_TOKEN}
Reddit API reddit_conn_id HTTP {'CLIENT_ID' : YOUR_CLIENT_ID, 'CLIENT_SECRET' : YOUR_CLIENT_SECRET}
Coinmarketcap API coinmarketcap_conn_id HTTP Extras: {'API_KEY' : YOUR_API_KEY}

How it works

Data Flow-Airflow Dags

Airflow Dags are seperated by data source. They are responsible for making calls to the API, extracting data and loading into target destinations. It runs periodically every X minutes producing micro-batches.

CoinMarketCap DAG
  • Id: coinmarketcap_dag
  • Source Type: JSON API
  • Data Source: https://coinmarketcap.com/api/
    • Returns a paginated list of all active cryptocurrencies with latest market data.

coinmarketcap_toAzureDataLake: Fetches data from the coinmarketcap API and loads it to Azure Blob Storage as a json file.
azure_coinmarketcap_snowflake: Copy data from Azure Blob storage to coinmarketcap raw staging table.
json_transform: Json data in staging tables are flatten and inserted into two seperate processing staging tables (coinmarketcap_tags and coinmarketcap_marketdata).
data_quality: A simple data quality check for nulls and emptiness.
final_load: Cleaned and transformed data in processed staging tables are inserted into coinmarketcap final tables.

CoinMarketCap DAG

Reddit DAG
  • Id: reddit_dag
  • Source Type: JSON API - Returns Object using the PRAW and PMAW API wrapper
  • Data Source: reddit.com/dev/api
    • Returns a submission object for the newly created submission.

reddit_toAzureDataLake: Fetches data from the reddit API using the PMAW API wrapper. The sentiment analyzer assigns a score to the title of the reddit posts and loads the values to Azure Blob Storage as a csv file.
azure_reddit_snowflake: Copy data from Azure Blob storage to reddit raw staging table.
data_quality: A simple data quality check for nulls and emptiness.
loadReddit_toSnowflakeFinalTables: Cleaned and transformed data in raw staging tables are inserted into reddit final tables.

Reddit DAG

Twitter DAG

twitter_toAzureDataLake: Fetches recent tweet data from the twitter API. Tweet data is seperated into two pandas dataframe to identify tweet data and the hashtags associated to that tweet. The sentiment analyzer assigns a score to the tweet and loads it to Azure Blob Storage as a csv file.
load_toSnowflakeStaging: Copy data from Azure Blob storage to twitter raw staging tables.
data_quality: A simple data quality check for nulls and emptiness.
final_load: Cleaned and transformed data in raw staging tables are inserted into twitter final tables.

Twitter DAG

Data Schema

Staging Schema

Final Schema

Dashboards

Twitter Dashboard

Staging Schema

References

Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for Sentiment Analysis of Social Media Text. Eighth International Conference on Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014.

Work in Progress

  • Test robustness of pipelines with higher data volume
  • Create views and materialized views then connect them to Power BI
  • Normalize twitter hashtags to reflect coinmarketcap symbols
  • Add Kafka and Spark structured streaming
  • Create a simple web application to navigate/search in the data of these crawled jobs

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.