Giter Site home page Giter Site logo

dhs-2019-demo's Introduction

Don't forget to add packages to your streamsets container and replace <ACCESS_KEY> and <SECRET_KEY>with your aws credentials.

Spark writer

docker exec -it adhoc /bin/bash

# Orders
/spark/bin/spark-submit \
--packages org.apache.hadoop:hadoop-aws:2.7.7 \
--conf spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.executor.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.hadoop.fs.s3a.endpoint=s3.ap-south-1.amazonaws.com \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/var/demo/config/spark-packages/hudi-utilities-bundle-0.5.0-incubating.jar \
--storage-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field order_date \
--target-base-path s3a://atlan-dhs/lake/orders \
--target-table orders \
--props /var/demo/config/deltastreamer/orders-kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--continuous
--filter-dupes

# Customers
/spark/bin/spark-submit \
--packages org.apache.hadoop:hadoop-aws:2.7.7 \
--conf spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.executor.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.hadoop.fs.s3a.endpoint=s3.ap-south-1.amazonaws.com \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/var/demo/config/spark-packages/hudi-utilities-bundle-0.5.0-incubating.jar \
--storage-type COPY_ON_WRITE \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field partition_date \
--target-base-path s3a://atlan-dhs/lake/customers \
--target-table customers \
--props /var/demo/config/deltastreamer/customers-kafka-source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

Spark queries

docker exec -it adhoc /bin/bash

/spark/bin/spark-shell \
--packages org.apache.hadoop:hadoop-aws:2.7.7 \
--conf spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.executor.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.hadoop.fs.s3a.endpoint=s3.ap-south-1.amazonaws.com \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--jars /var/demo/config/spark-packages/hudi-spark-bundle-0.5.0-incubating.jar
val ordersDF = spark.
    read.
    format("org.apache.hudi").
    load("s3a://atlan-dhs/lake/orders/*/*/*/*")


ordersDF.show()

ordersDF.registerTempTable("orders")
spark.sql("select count(*) from orders").show()

val customersDF = spark.
    read.
    format("org.apache.hudi").
    load("s3a://atlan-dhs/lake/customers/*/*/*/*")

customersDF.show()

customersDF.registerTempTable("customers")
spark.sql("select count(*) from customers").show()


// Analysis
spark.sql("select contact_title, count(*) as c from customers group by contact_title order by c desc").show()

// Get num of orders placed by customer designation
spark.sql("select contact_title, count(*) as count from customers c inner join orders o on c.customer_id=o.customer_id group by contact_title order by count desc").show()

Postgres setup

docker exec -it postgres /bin/bash

psql -U postgres
create database dhs;
\q

psql -U postgres -d dhs
CREATE TABLE customers (
    customer_id varchar(255) primary key,
    company_name varchar(255),
    contact_name varchar(255),
    contact_title varchar(255),
    address varchar(255),
    city varchar(255),
    region varchar(255),
    postal_code varchar(255),
    country varchar(255),
    phone varchar(255),
    fax varchar(255),
    partition_date varchar(255)
);

COPY customers(customer_id,company_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax,partition_date) 
FROM '/var/demo/config/deltastreamer/customers.csv' DELIMITER ',' CSV HEADER;

dhs-2019-demo's People

Contributors

firecast avatar

Watchers

 avatar

Forkers

gitgirish2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.