Giter Site home page Giter Site logo

imranxpress / amazon-redshift-streaming-workshop Goto Github PK

View Code? Open in Web Editor NEW

This project forked from aws-samples/amazon-redshift-streaming-workshop

0.0 0.0 0.0 13.1 MB

This repository provides the resources required for the Amazon Redshift Streaming workshop

License: MIT No Attribution

Python 99.99% Batchfile 0.01%

amazon-redshift-streaming-workshop's Introduction

Amazon Redshift Streaming Workshop

Most organisations today agree that data is one of their most important asset and that the ability to act on timely data, sets data-driven organisations apart from their peers. However getting access to real-time data used to require significant investment in terms of acquiring new software or in hiring specialised engineering teams. The new Amazon Redshift streaming ingestion feature aims to democratise streaming analytics with its low-cost and minimal technical skill requirements as it is primarily defined using SQL.

In this workshop, we will show how easy it is to build a streaming analytics application using this new feature. We will create a near-real time logistics dashboard using Amazon Managed Grafana to provide augmented intelligence and situational awareness for the logistics operations team. It connects to a Redshift cluster which uses Redshift streaming to analyse data from a Kinesis data stream.

image-20220601123345968

Infrastructure Provisioning using CDK and Cloudshell

The AWS Cloud Development Kit (AWS CDK) is an open-source project that allows you to define your cloud infrastructure using familiar programming languages. In this workshop, we are using python to define the cloud infrastructure as it is one of the most commonly used programming languages used by analytics professionals.

Note: This workshop will work for any AWS region where AWS Cloudshell is available. However the workshop's instructions will be using the us-east-1 region (This can also be deployed in regions without Cloudshell but will require additional steps to provision an EC2 Linux deployment instance.)

Note: In order for you to run this code you will need elevated privileges into the AWS account you are using.

Login to the AWS Console.

https://us-east-1.console.aws.amazon.com/console/home

Open Cloudshell

https://us-east-1.console.aws.amazon.com/cloudshell/home?region=us-east-1

Upgrade CDK to the latest version

sudo npm install -g aws-cdk@latest

Clone this git repository

git clone https://github.com/aws-samples/amazon-redshift-streaming-workshop

Go to the working directory:

cd amazon-redshift-streaming-workshop

Create a virtualenv:

python3 -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

source .venv/bin/activate

Once the virtualenv is activated, you can install the required dependencies.

pip install -r requirements.txt

Bootstrap CDK. This will set-up the resources required by CDK to deploy into this AWS account. (This step is only required if you have not used CDK before in this account and region)

cdk bootstrap

image-20220601093700045

Deploy all stacks and disable prompting. The entire deployment time will take 10-15 minutes.

cdk deploy --all --require-approval never

Note: There are costs associated with provisioning resources in AWS. You can change the size of the Redshift cluster by updating the contents of app.py in the project working directory.

image-20220601154014056

Post-deployment Redshift configuration

At the moment setting up a default IAM role for the Redshift cluster could not be configured automatically using CloudFormation. The following are the steps to define a default IAM role.

Go to the Redshift console

https://us-east-1.console.aws.amazon.com/redshiftv2/home?region=us-east-1#dashboard

Select the cluster that we provisioned, redshiftstreamingcluster-xxxxxxxxx

image-20220530163446112

Go to the Properties tab:

image-20220530163556753

Scroll down to Cluster permissions. Select the IAM role that we provisioned and make this the default IAM role through the drop down menu.

image-20220530163644113

We also need to add a tag that is used by the IAM integration between Amazon Redshift and Amazon Managed Grafana.

Scroll down to Tags and click on Add tags.

image-20220601121023062

Specify GrafanaDataSource as a Key and click Save changes.

image-20220601121120524

Note: If you get disconnected from Cloudshell please follow the following steps. New Cloudshell sessions always start in the home directory and the python virtual environment will be deactivated.

Go to working directory

cd amazon-redshift-streaming-workshop

Activate python virtual environment

source .venv/bin/activate

Continue from where you left off.

(Optional step)

You can also check the status of deployment in Cloudformation.

https://us-east-1.console.aws.amazon.com/cloudformation/home

image-20220530153120427

Connecting to the Redshift Cluster

Note: This section is not compatible with accounts created using AWS Event Engine. (due to Query Editor v2 restrictions)

Login to the Redshift Query Editor v2 and connect to the redshift cluster using the drop down arrow next to the cluster name.

https://us-east-1.console.aws.amazon.com/sqlworkbench/home

image-20220601100354395

Specify cluster credentials. Select Temporary credentials as the authentication mechanism.

Database: streaming_db

User name: admin

Click Create connection

image-20220601100630463

Create an external schema to establish connection between the Redshift cluster and the Kinesis data stream.

CREATE EXTERNAL SCHEMA kinesis_schema
FROM KINESIS
IAM_ROLE default;

image-20220601101019956

Create a materialized view to parse data in the kinesis data stream, customer_stream. In this case, the whole payload is ingested as is and stored using the super data type in Redshift.

CREATE MATERIALIZED VIEW customer_stream AS
SELECT ApproximateArrivalTimestamp,
JSON_PARSE(from_varbyte(Data, 'utf-8')) as customer_data
FROM kinesis_schema.customer_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

Note: highlight the block of SQL code that you need to run in the Query Editor.

image-20220601101137670

Refresh the materialized views. This is where the actual data ingestion happens. Data gets loaded from the kinesis data stream into Amazon S3 without having to stage it first in S3.

REFRESH MATERIALIZED VIEW customer_stream;

We can now query the data in the customer_stream using standard select statement.

SELECT * FROM customer_stream;

image-20220601101309371

If we like to know the distribution of our customers across different states, we can easily unpack the contents of the JSON payload using the PartiQL syntax.

SELECT count(1), customer_data.STATE::VARCHAR
FROM customer_stream
GROUP BY customer_data.STATE;

image-20220601101418301

Now let us ingest data from the order_stream. Let us create a materialized view that unpacks the data within the order stream.

CREATE MATERIALIZED VIEW order_stream AS
SELECT ApproximateArrivalTimestamp,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'consignmentid', true)::BIGINT as consignmentid,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'timestamp', true)::VARCHAR(50) as order_timestamp,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_address', true)::VARCHAR(100) as delivery_address,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_state', true)::VARCHAR(50) as delivery_state,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_address', true)::VARCHAR(100) as origin_address,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_state', true)::VARCHAR(50) as origin_state,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delay_probability', true)::VARCHAR(10) as delay_probability,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'days_to_deliver', true)::INT as days_to_deliver,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_distance', true)::FLOAT as delivery_distance,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'userid', true)::INT as userid,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'revenue', true)::FLOAT as revenue,
JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'cost', true)::FLOAT as cost
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

Let us refresh the materialized view.

REFRESH MATERIALIZED VIEW order_stream;

And query the data within the view

SELECT * FROM order_stream;

We can query the most recent transactions that have been ingested into Redshift using this select statement

SELECT current_timestamp, current_timestamp-ApproximateArrivalTimestamp as time_diff, * FROM order_stream
order by ApproximateArrivalTimestamp desc limit 10;

image-20220601101704387

We can also join the data between the two streams and do more in depth analysis on our customer and order data. For example, we like to know what is the busiest consignment route on the state level.

SELECT os.delivery_state, cs.customer_data.state::VARCHAR as origin_state, count(1)
FROM customer_stream cs
INNER JOIN order_stream os ON cs.customer_data.userid::INT = os.userid
GROUP BY os.delivery_state, cs.customer_data.state::VARCHAR
ORDER BY count(1) desc

image-20220601101807687

Create user redshift_data_api_user for Grafana integration. Note: We need to use this specific user, 'redshift_data_api_user' as this is used for the IAM integration between Redshift and Managed Grafana.

CREATE USER redshift_data_api_user PASSWORD '<specify your own password>';

We can now grant select access to this specific user.

GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC TO redshift_data_api_user;

(Optional Step) No Action Required

Refreshing the Materialized views using Step Functions

As part of the CDK deployment, we also provisioned a Step Function that will regularly refresh the materialized views on a 10-20 second interval. You can opt to inspect this Step Function by looking at the Step Function console.

https://console.aws.amazon.com/states/home?region=us-east-1

image-20220530180606214

You can also check the Redshift Queries console to validate time interval between refreshes.

https://us-east-1.console.aws.amazon.com/redshiftv2/home?region=us-east-1#queries

image-20220601102552452

Creating a Grafana dashboard on Redshift streaming data

Note: This section is not compatible with accounts created using AWS Event Engine (due to SSO restrictions)

Here is a relevant blog that talks about how to get started with Amazon Managed Grafana.

Go to the Amazon Managed Grafana console:

https://us-east-1.console.aws.amazon.com/grafana/home?region=us-east-1

Click on Create workspace.

image-20220530180941106

Specify a workspace name: redshift_streaming_workspace

image-20220601103241863

Select AWS Single Sign-On as the authentication method and click on Create user.

image-20220601103404934

Specify user details and click Create user

image-20220601103538765

The user will receive an email to accept invitation to AWS SSO.

image-20220601103906448

Accepting the invitation will prompt for the user to specify a password.

image-20220601104031456

Click Next

image-20220601104211596

On Service managed permission settings, select Amazon Redshift as a datasource and select Amazon SNS as a notification channel.

image-20220601104510191

Review workspace creation settings and click on Create workspace.

Once the workspace is created, we will need to assign the SSO user to have access to the Grafana workspace. Click on Assign new user or group.

image-20220601105429528

Select the user we created and click Assign users and groups.

image-20220601105541755

Elevate the privileges of the user from viewer to admin and go back to the workspace screen.

image-20220601105708129

Click on the Grafana workspace URL link.

image-20220601105850286

Click on Sign in with AWS SSO

Enter username

Enter password

image-20220601110210714

You should now be logged in to the Amazon Managed Grafana dashboard.

Click on the AWS side tab and select Data sources.

image-20220601111620412

Select the Redshift service. Select US East (N. Virginia) region. Select the cluster we provisioned as part of this workshop and click on Add 1 data source.

image-20220601111747182

Click Go to settings

image-20220601111913721

Rename datasource to Redshift Streaming

Set Database User to redshift_data_api_user. Click on Save & test.

image-20220601121543957

Now let us import the pre-built dashboard. Click on the + side menu and click Import.

image-20220601121846321

Copy and paste the contents of the dashboard.json file into the Import via panel json textbox. Click Load.

image-20220601123118745

Click Import.

image-20220601123243439

Now we have the Logistics Dashboard on Amazon Managed Grafana. This dashboard refreshes every 5 seconds and runs a query against the materialized views that we previously created in Amazon Redshift.

image-20220601123345968

Clean up

This is to delete all resources created as part of this workshop.

Go back to AWS CloudShell

https://us-east-1.console.aws.amazon.com/cloudshell/home?region=us-east-1

Go to working directory

cd amazon-redshift-streaming-workshop

Activate python virtual environment

source .venv/bin/activate

Destroy resources

cdk destroy --all

image-20220601162314757

(Event Engine option) Create Redshift Streaming objects

Login to the Redshift Query Editor v1

https://us-east-1.console.aws.amazon.com/redshiftv2/home?region=us-east-1#query-editor:

Click on Connect to database

image-20220530154203415

Specify Temporary credentials to login to Redshift. Select the cluster we provisioned and specify the following:

โ€‹ Database name: streaming_db

โ€‹ Database user: admin

image-20220530154823734

Create an external schema to establish connection between the Redshift cluster and the Kinesis data stream.

CREATE EXTERNAL SCHEMA kinesis_schema
FROM KINESIS
IAM_ROLE default;

image-20220530163824705

Create a materialized view to parse data in the kinesis data stream, customer_stream. In this case, the whole payload is ingested as is and stored using the super data type in Redshift.

CREATE MATERIALIZED VIEW customer_stream AS
SELECT ApproximateArrivalTimestamp,
JSON_PARSE(from_varbyte(Data, 'utf-8')) as customer_data
FROM kinesis_schema.customer_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

Note: highlight the block of SQL code that you need to run in the Query Editor.

image-20220530164733316

Refresh the materialized views. This is where the actual data ingestion happens. Data gets loaded from the kinesis data stream into Amazon S3 without having to stage it first in S3.

REFRESH MATERIALIZED VIEW customer_stream;

We can now query the data in the customer_stream using standard select statement.

SELECT * FROM customer_stream;

image-20220530171443572

If we like to know the distribution of our customers across different states, we can easily unpack the contents of the JSON payload using the PartiQL syntax.

SELECT count(1), customer_data.STATE::VARCHAR
FROM customer_stream
GROUP BY customer_data.STATE;

image-20220530172355982

Now let us ingest data from the order_stream. Let us create a materialized view that unpacks the data within the order stream.

CREATE MATERIALIZED VIEW order_stream AS
SELECT ApproximateArrivalTimestamp,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'consignmentid', true) AS BIGINT) as consignmentid,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'timestamp', true) AS VARCHAR(50)) as order_timestamp,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_address', true) AS VARCHAR(100)) as delivery_address,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_state', true) AS VARCHAR(50)) as delivery_state,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_address', true) AS VARCHAR(100)) as origin_address,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_state', true) AS VARCHAR(50)) as origin_state,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delay_probability', true) AS VARCHAR(10)) as delay_probability,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'days_to_deliver', true) AS INT) as days_to_deliver,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_distance', true) AS FLOAT) as delivery_distance,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'userid', true) AS INT) as userid,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'revenue', true) AS FLOAT) as revenue,
CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'cost', true) AS FLOAT) as cost
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

Let us refresh the materialized view.

REFRESH MATERIALIZED VIEW order_stream;

And query the data within the view

SELECT * FROM order_stream;

We can query the most recent transactions that have been ingested into Redshift using this query

SELECT current_timestamp, current_timestamp-ApproximateArrivalTimestamp as time_diff, * FROM order_stream
order by ApproximateArrivalTimestamp desc limit 10;

image-20220530175230331

We can also join the data between the two streams and do more in depth analysis on our customer and order data. For example, we like to know what is the busiest consignment route on the state level.

SELECT os.delivery_state, cs.customer_data.state::VARCHAR as origin_state, count(1)
FROM customer_stream cs
INNER JOIN order_stream os ON cs.customer_data.userid::INT = os.userid
GROUP BY os.delivery_state, cs.customer_data.state::VARCHAR
ORDER BY count(1) desc

image-20220530180223033

FYI - No Action Required: Refreshing the Materialized views using Step Functions

As part of the CDK deployment, we also provisioned a Step Function that will regularly refresh the materialized views on a 5 second interval. You can opt to inspect this Step Function by looking at the Step Function console.

https://console.aws.amazon.com/states/home?region=us-east-1

image-20220530180606214

amazon-redshift-streaming-workshop's People

Contributors

amazon-auto avatar pgvillena avatar sean-beath avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.