Giter Site home page Giter Site logo

cim-demos's Introduction

README

1 Overview

Bring up clusters with Cloudera Director. Modify the Director template for your needs here. The template includes configuration to use Hue Notebooks.

2 CIM-demos

Demos Created by Cloud and Infrastructure SE Specialization

Datasets loaded up include

Dataset is loaded into the Hive/Impala retailer database

Hue notebook included

Tableau workbook included

2.3 Environment and database Setup for demos

SET UP User for cluster (usually centos or gregj)

!sudouserclouduserdir
#gregjgcpuser/gregj

2.3.1 Scripts to run

Scripts reside in the ingest directory

2.3.1.1 Set up basics of environment

sudo yum -y install epel-release
sudo yum -y install python-pip wget curl telnet finger mlocate jq htop net-tools git
sudo pip install cm-api

echo "Setting up HDFS home data dir for user ${USER}"
sudo -u hdfs hdfs dfs -mkdir -p /user/${USER}/data
sudo -u hdfs hdfs dfs -chown -R ${USER} /user/${USER}

cmhost=`grep server_host= /etc/cloudera-scm-agent/config.ini |cut -d '=' -f2-`
if [ -z "$cmhost" ] ; then
    echo "Error: Cannot determine Cloudera Manager host address - does this host run Cloudera Manager Agent?"
    exit 1
fi
echo Cloudera Manager is running on $cmhost

# get cluster name
clustername=$(curl --silent -X GET -u admin:admin http://$cmhost:7180/api/v14/clusters | jq -r '.items[].name')
if [ 1 != `echo $clustername | wc -l` ] ; then
    echo "Error: I can only deal with 1 cluster managed by the CM - found: `echo $clustername`"
    exit 1
fi

# get version of cluster
cdhversion=$(curl --silent -u admin:admin http://${cmhost}:7180/api/v14/clusters/${clustername}|jq -r '.fullVersion')
cdhmajor=$(echo ${cdhversion} | cut -d'.' -f1)
cdhminor=$(echo ${cdhversion} | cut -d'.' -f2)

echo "Setting up defaults for impala-shell and Beeline"
impalad=$(curl --silent -u "admin:admin" "http://$cmhost:7180/api/v14/hosts?view=FULL" | jq -r '[.items[] | select(.roleRefs[].roleName | contains("-IMPALAD")) | .ipAddress] | first ')
echo "[impala]
impalad=${impalad}:21000
    " > ~/.impalarc
hiveserver2=$(curl --silent -u 'admin:admin' http://${cmhost}:7180/api/v14/hosts?view=full | jq -r '[.items[] | select(.roleRefs[].roleName | contains("HIVESERVER2")) .hostname] | first')
# start up a beeline command and then save config to ~/.beeline/beeline.properties
beeline -u "jdbc:hive2://${hiveserver2}:10000/default" -n ${USER} <<EOF
!save
!quit
EOF

echo "To run beeline without parameters, use 'beeline -r'"

echo "Fixing up .bashrc"
sudo yum -y install cowsay fortune-mod
tee -a ~/.bashrc <<EOF
export PS1='\u@gateway: \w #$ '
if [[ \$- =~ "i" ]] ; then
    # echo "Streamsets URL: http://`hostname -f`:18630/"
    # echo "Jupyter notebook URL: http://`hostname -f`:8880"
    # echo "RStudio URL: http://`hostname -f`:8787"
    ~/bin/cowme
fi
EOF
mkdir -p ~/bin
tee ~/bin/cowme << EOF
#!/bin/bash
if type fortune cowsay >/dev/null
then
    IFS=',' read -r -a cowopts <<< "b,g,p,s,t,w,y"
    if [ \$((RANDOM % 4)) == 0 ] ; then
        cowcmd="cowsay"
    else
        cowcmd="cowthink"
    fi
    fortune -s | \${cowcmd} -\${cowopts[\$((RANDOM % \${#cowopts[@]}))]}
fi
EOF
chmod 755 ~/bin/cowme

2.3.1.2 Script to load them all

## This script does the following:
##  1. Set up external raw S3 and optimized local Hive tables for Hive Benchmark dataset
##  2. Do the same for the Utility data set
##  3. Set up Data Journey demo (retail dataset)
echo Setting up Bay Area Bikeshare
impala-shell --var=USERDIR=${USERDIR} -f impala/99-setup-bay-area-bicycle-share.sql

echo Setting up the Data Journey data sets
echo "CREATE DATABASE IF NOT EXISTS retailer LOCATION 's3a://gregoryg/database/retailer/';" | tee impala/99-setup-data-journey.sql
for i in `hdfs dfs -ls -R s3a://gregoryg/database/retailer/|grep parq|tr -s ' '|cut -d' ' -f8`
do
    dirpath=`echo $i | sed 's,[^/]\+parq$,,'`
    tablename=`basename $dirpath`
    echo "CREATE EXTERNAL TABLE IF NOT EXISTS retailer.$tablename LIKE PARQUET '"$i"' STORED AS PARQUET LOCATION '"$dirpath"';" | tee -a impala/99-setup-data-journey.sql
done
impala-shell -f impala/99-setup-data-journey.sql

echo Setting up NYC Taxi/Uber raw data
impala-shell --var=USERDIR=${USERDIR} -f impala/99-setup-nyc-taxi.sql

echo Setting up Hive Benchmark data sets
impala-shell --var=USERDIR=${USERDIR} -f impala/99-setup-hive-benchmark.sql

echo Setting up the Utility data sets
impala-shell --var=USERDIR=${USERDIR} -f impala/99-setup-utility.sql

# echo Setting up Observations and Permutations data sets
# impala-shell --var=USERDIR=${USERDIR} -f 99-setup-impala-problem.sql

2.4 All Demos

2.4.0.1 Retail Data Journey

The retailer database is created in 02-load-all-demos.sh

Hue notebook: Data Journey - Retail

2.4.0.2 BABS - Bay Area Bikeshare

Hue notebook: Bay Area BikeShare notebook

Raw data: s3a://gregoryg/babs

This demo follows this blog post: Bay Area bike share analysis with the Hadoop Notebook and Spark & SQL

To set up this demo:

  • Create external Hive tables and optimized tables
  • Load Hue notebook

You will not need to clean any data for this demo - From the raw dataset, I found the rebalancing/status data to be problematic. It uses lots of unnecessary quoting, and has three different timestamp formats. The data was cleaned with simple shell before being uploaded to S3:

cat 201508_status_data.csv | tr -d '"' > cleaned_201508_status_data.csv 
cat 201408_status_data.csv |tr -d '"' > cleaned_201408_status_data.csv 
cat 201402_status_data.csv |tr -d '"' | sed 's,/,-,g' > cleaned_201402_status_data.csv 
-- can be run as Impala
-- Bay Area Bicycle Share (BABS) dataset
  CREATE DATABASE IF NOT EXISTS bikeshare;
  USE bikeshare;

  CREATE EXTERNAL TABLE bikeshare.station_raw (
     station_id BIGINT,
     name STRING,
     lat DOUBLE,
     long DOUBLE,
     dockcount BIGINT,
     landmark STRING,
     installation STRING )
     ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
     STORED AS TEXTFILE LOCATION 's3a://gregoryg/babs/station'
     TBLPROPERTIES ('numRows'='-1', 'rawDataSize'='-1', 'skip.header.line.count'='1' );

  CREATE EXTERNAL TABLE IF NOT EXISTS bikeshare.rebalancing_raw (
     station_id BIGINT,
     bikes_available BIGINT,
     docks_available BIGINT,
     time_recorded STRING ) 
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/babs/rebalancing'
  TBLPROPERTIES ('numRows'='-1', 'rawDataSize'='-1', 'skip.header.line.count'='1');

  CREATE EXTERNAL TABLE bikeshare.trip_raw (
     trip_id BIGINT,
     duration BIGINT,
     date_start STRING,
     start_station STRING,
     start_terminal BIGINT,
     date_end STRING,
     end_station STRING,
     end_terminal BIGINT,
     bike_num BIGINT,
     subscription_type STRING,
     zip STRING )
     ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
     STORED AS TEXTFILE
     LOCATION 's3a://gregoryg/babs/trip'
     TBLPROPERTIES ('numRows'='-1', 'rawDataSize'='-1', 'skip.header.line.count'='1');

  CREATE EXTERNAL TABLE bikeshare.weather_raw (
     `date` STRING,
     max_temperature_f BIGINT,
     mean_temperature_f BIGINT,
     min_temperaturef BIGINT,
     max_dew_point_f BIGINT,
     meandew_point_f BIGINT,
     min_dewpoint_f BIGINT,
     max_humidity BIGINT,
     mean_humidity BIGINT,
     min_humidity BIGINT,
     max_sea_level_pressure_in DOUBLE,
     mean_sea_level_pressure_in DOUBLE,
     min_sea_level_pressure_in DOUBLE,
     max_visibility_miles BIGINT,
     mean_visibility_miles BIGINT,
     min_visibility_miles BIGINT,
     max_wind_speed_mph BIGINT,
     mean_wind_speed_mph BIGINT,
     max_gust_speed_mph BIGINT,
     precipitation_in BIGINT,
     cloud_cover BIGINT,
     events STRING,
     wind_dir_degrees BIGINT,
     zip STRING )
     ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
     STORED AS TEXTFILE LOCATION 's3a://gregoryg/babs/weather'
     TBLPROPERTIES ('numRows'='-1', 'rawDataSize'='-1', 'skip.header.line.count'='1');

  -- Create optimized tables
  create table IF NOT EXISTS bikeshare.rebalancing
  STORED AS parquet
  AS select
      CAST(station_id AS int) AS station_id,
      CAST(bikes_available AS int) AS bikes_available,
      CAST(docks_available AS int) AS docks_available,
      CAST(time_recorded AS timestamp) AS time_recorded,
      time_recorded AS original_timestring
  FROM bikeshare.rebalancing_raw;

  create table IF NOT EXISTS bikeshare.trip
  STORED AS parquet
  AS SELECT
     CAST(trip_id AS INT) AS trip_id,
     CAST(duration AS INT) AS duration,
     CAST( concat(split_part(split_part(date_start, '/', 3), ' ' , 1), '-',
       lpad(split_part(date_start, '/', 1), 2, '0'), '-',
       lpad(split_part(date_start, '/', 2), 2, '0'), ' ',
       lpad(split_part(date_start, ' ', 2), 5, '0'), ':00') AS timestamp) AS date_start,
     start_station,
     CAST(start_terminal AS INT) AS start_terminal,
     CAST( concat(split_part(split_part(date_end, '/', 3), ' ' , 1), '-',
       lpad(split_part(date_end, '/', 1), 2, '0'), '-',
       lpad(split_part(date_end, '/', 2), 2, '0'), ' ',
       lpad(split_part(date_end, ' ', 2), 5, '0'), ':00') AS timestamp) AS date_end,
     end_station,
     CAST(end_terminal AS INT) AS end_terminal,
     CAST(bike_num AS INT) AS bike_num,
     subscription_type,
     zip
    FROM bikeshare.trip_raw;

    CREATE TABLE IF NOT EXISTS bikeshare.station
    STORED AS parquet
    AS SELECT
       CAST(station_id AS INT) AS station_id,
       name,
       lat,
       long,
       CAST(dockcount AS INT) AS dockcount,
       landmark,
       installation
    FROM bikeshare.station_raw;

    CREATE TABLE IF NOT EXISTS bikeshare.station
    STORED AS parquet
    AS SELECT
       CAST(station_id AS INT) AS station_id,
       name,
       lat,
       long,
       CAST(dockcount AS INT) AS dockcount,
       landmark,
       installation
    FROM bikeshare.station_raw;

    CREATE TABLE IF NOT EXISTS bikeshare.weather
    STORED AS parquet
    AS SELECT
    CAST( concat( split_part(`date`, '/', 3), '-',
       lpad(split_part(`date`, '/', 1), 2, '0'), '-',
       lpad(split_part(`date`, '/', 2), 2, '0')) AS timestamp) AS date_recorded,
     CAST(max_temperature_f AS INT) AS max_temperature,
     CAST(mean_temperature_f AS INT) AS mean_temperature_f,
     CAST(min_temperaturef AS INT) AS min_temperaturef,
     CAST(max_dew_point_f AS INT) AS max_dew_point_f,
     CAST(meandew_point_f AS INT) AS meandew_point_f,
     CAST(min_dewpoint_f AS INT) AS min_dewpoint_f,
     CAST(max_humidity AS INT) AS max_humidity,
     CAST(mean_humidity AS INT) AS mean_humidity,
     CAST(min_humidity AS INT) AS min_humidity,
     max_sea_level_pressure_in,
     mean_sea_level_pressure_in,
     min_sea_level_pressure_in,
     CAST(max_visibility_miles AS INT) AS max_visibility_miles,
     CAST(mean_visibility_miles AS INT) AS mean_visibility_miles,
     CAST(min_visibility_miles AS INT) AS min_visibility_miles,
     CAST(max_wind_speed_mph AS INT) AS max_wind_speed_mph,
     CAST(mean_wind_speed_mph AS INT) AS mean_wind_speed_mph,
     CAST(max_gust_speed_mph AS INT) AS max_gust_speed_mph,
     CAST(precipitation_in AS INT) AS precipitation_in,
     CAST(cloud_cover AS INT) AS cloud_cover,
     events,
     CAST(wind_dir_degrees AS INT) AS wind_dir_degrees,
     zip
     FROM bikeshare.weather_raw;

After all tables have been created, upload the BABS notebook into Hue

  • Bay Area BikeShare notebook

The pyspark code below can be run in CDSW (incomplete as of <2017-12-06 Wed>)

# ### Create a Spark Session
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *


spark = SparkSession.builder.appName("Bay Area Bikeshare").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

schema = StructType(
    [
    StructField("trip_id", IntegerType(), True),
    StructField("duration", IntegerType(), True),
    StructField("date_start", TimestampType(), True),
    StructField("start_station", StringType(), True),
    StructField("start_terminal", IntegerType(), True),
    StructField("date_end", TimestampType(), True),
    StructField("end_station", StringType(), True),
    StructField("end_terminal", IntegerType(), True),
    StructField("bike_num", IntegerType(), True),
    StructField("subscription_type", StringType(), True),
    StructField("zip", StringType(), True)])

trips = sc.textFile('/user/gregj/data/babs/trip/201402_trip_data.csv')

trips = trips.map(lambda line: line.split(","))

station_70 = trips.filter(lambda x: x[4] == '70')

# Emit tuple of ((date, hour), 1)
trips_by_day_hour = station_70.map(lambda x: ((x[2].split()[0], x[2].split()[1].split(':')[0]), 1))

trips_by_day_hour = trips_by_day_hour.reduceByKey(lambda a, b: a+b)

# Emit tuple of (hour, count)
trips_by_hour = trips_by_day_hour.map(lambda x: (int(x[0][1]), x[1]))

avg_trips_by_hour = trips_by_hour.combineByKey( (lambda x: (x, 1)), 
                                                (lambda x, y: (x[0] + y, x[1] + 1)), 
                                                    (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
                                                    )
avg_trips_by_hour = avg_trips_by_hour.mapValues(lambda v : v[0] / v[1]) 

avg_trips_sorted = sorted(avg_trips_by_hour.collect())
%table avg_trips_sorted

2.4.0.3 NYC Taxi and Uber

CREATE DATABASE IF NOT EXISTS nyctaxi;
USE nyctaxi;

CREATE EXTERNAL TABLE IF NOT EXISTS green_tripdata_pre2015_staging (
  vendor_id STRING,
  lpep_pickup_datetime TIMESTAMP,
  lpep_dropoff_datetime TIMESTAMP,
  store_and_fwd_flag STRING,
  rate_code_id INT,
  pickup_longitude DOUBLE,
  pickup_latitude DOUBLE,
  dropoff_longitude DOUBLE,
  dropoff_latitude DOUBLE,
  passenger_count INT,
  trip_distance DOUBLE,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  ehail_fee STRING,
  total_amount DOUBLE,
  payment_type INT,
  trip_type INT,
  junk1 STRING,
  junk2 STRING
)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/datasets/nyc-taxi/green_pre2015';

  CREATE EXTERNAL TABLE IF NOT EXISTS green_tripdata_staging (
  vendor_id STRING,
  lpep_pickup_datetime STRING,
  lpep_dropoff_datetime STRING,
  store_and_fwd_flag STRING,
  rate_code_id STRING,
  pickup_longitude DOUBLE,
  pickup_latitude DOUBLE,
  dropoff_longitude DOUBLE,
  dropoff_latitude DOUBLE,
  passenger_count STRING,
  trip_distance STRING,
  fare_amount STRING,
  extra STRING,
  mta_tax STRING,
  tip_amount STRING,
  tolls_amount STRING,
  ehail_fee STRING,
  improvement_surcharge DOUBLE,
  total_amount STRING,
  payment_type STRING,
  trip_type STRING,
  junk1 STRING,
  junk2 STRING
)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/datasets/nyc-taxi/green_2015';

-- N.B. junk columns are there because green_tripdata file headers are
-- inconsistent with the actual data, e.g. header says 20 or 21 columns per row,
-- but data actually has 22 or 23 columns per row, which COPY doesn't like.
-- junk1 and junk2 should always be null


CREATE EXTERNAL TABLE IF NOT EXISTS yellow_tripdata_2015_staging (
  vendor_id STRING,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count INT,
  trip_distance DOUBLE,
  pickup_longitude DOUBLE,
  pickup_latitude DOUBLE,
  rate_code_id INT,
  store_and_fwd_flag STRING,
  dropoff_longitude DOUBLE,
  dropoff_latitude DOUBLE,
  payment_type STRING,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/datasets/nyc-taxi/yellow_2015/';

CREATE EXTERNAL TABLE IF NOT EXISTS yellow_tripdata_pre2015_staging (
  vendor_id STRING,
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count INT,
  trip_distance DOUBLE,
  pickup_longitude DOUBLE,
  pickup_latitude DOUBLE,
  rate_code_id INT,
  store_and_fwd_flag STRING,
  dropoff_longitude DOUBLE,
  dropoff_latitude DOUBLE,
  payment_type STRING,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  total_amount DOUBLE
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/datasets/nyc-taxi/yellow_pre2015/';

CREATE EXTERNAL TABLE IF NOT EXISTS uber_trips_staging (
  pickup_datetime timestamp,
  pickup_latitude DOUBLE,
  pickup_longitude DOUBLE,
  base_code STRING
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  STORED AS TEXTFILE
  LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/MeterData';

-- CREATE TABLE uber_trips_2015 (
--   dispatching_base_num STRING,
--   pickup_datetime timestamp,
--   affiliated_base_num STRING,
--   location_id INT,
--   nyct2010_ntacode STRING
-- );

-- CREATE TABLE taxi_zone_lookups (
--   location_id integer primary key,
--   borough STRING,
--   zone STRING,
--   service_zone STRING,
--   nyct2010_ntacode STRING
-- );

-- CREATE TABLE fhv_trips (
--   id serial primary key,
--   dispatching_base_num STRING,
--   pickup_datetime timestamp without time zone,
--   location_id INT
-- );

-- CREATE TABLE fhv_bases (
--   base_number STRING primary key,
--   base_name STRING,
--   dba STRING,
--   dba_category STRING
-- );

-- CREATE INDEX index_fhv_bases_on_dba_category ON fhv_bases (dba_category);

-- CREATE TABLE cab_types (
--   id serial primary key,
--   type STRING
-- );

-- INSERT INTO cab_types (type) SELECT 'yellow';
-- INSERT INTO cab_types (type) SELECT 'green';
-- INSERT INTO cab_types (type) SELECT 'uber';

----- GJG Clean up data, put in optimized format
-- set hive.execution.engine=spark;
create table trips STORED AS PARQUET as
SELECT 'yellow' AS taxi_type,
vendor_id, 
tpep_pickup_datetime AS pickup_datetime,
tpep_dropoff_datetime AS dropoff_datetime,
passenger_count,
trip_distance,
pickup_latitude,
pickup_longitude,
rate_code_id,
store_and_fwd_flag,
dropoff_latitude,
dropoff_longitude,
payment_type,
CAST(fare_amount AS DECIMAL(8,2)) AS fare_amount,
CAST(extra AS DECIMAL(8,2)) AS extra,
CAST(mta_tax AS DECIMAL(8,2)) AS mta_tax,
CAST(tip_amount AS DECIMAL(8,2)) AS tip_amount,
CAST(tolls_amount AS DECIMAL(8,2)) AS tolls_amount,
CAST(improvement_surcharge AS DECIMAL(8,2)) AS improvement_surcharge,
CAST(total_amount AS DECIMAL(8,2)) AS total_amount,
regexp_extract(INPUT__FILE__NAME, '.+/(.+)$', 1) AS source_file
from yellow_tripdata_2015_staging;

-- set hive.execution.engine=spark;
insert INTO TABLE trips
SELECT 'yellow' AS taxi_type,
vendor_id, 
tpep_pickup_datetime AS pickup_datetime,
tpep_dropoff_datetime AS dropoff_datetime,
passenger_count,
trip_distance,
pickup_latitude,
pickup_longitude,
rate_code_id,
store_and_fwd_flag,
dropoff_latitude,
dropoff_longitude,
payment_type,
CAST(fare_amount AS DECIMAL(8,2)) AS fare_amount,
CAST(extra AS DECIMAL(8,2)) AS extra,
CAST(mta_tax AS DECIMAL(8,2)) AS mta_tax,
CAST(tip_amount AS DECIMAL(8,2)) AS tip_amount,
CAST(tolls_amount AS DECIMAL(8,2)) AS tolls_amount,
NULL AS improvement_surcharge,
CAST(total_amount AS DECIMAL(8,2)) AS total_amount,
regexp_extract(INPUT__FILE__NAME, '.+/(.+)$', 1) AS source_file
from yellow_tripdata_pre2015_staging;

insert INTO TABLE trips
SELECT 'green' AS taxi_type,
vendor_id, 
CAST(lpep_pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(lpep_dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
passenger_count,
trip_distance,
pickup_latitude,
pickup_longitude,
rate_code_id,
store_and_fwd_flag,
dropoff_latitude,
dropoff_longitude,
payment_type,
CAST(fare_amount AS DECIMAL(8,2)) AS fare_amount,
CAST(extra AS DECIMAL(8,2)) AS extra,
CAST(mta_tax AS DECIMAL(8,2)) AS mta_tax,
CAST(tip_amount AS DECIMAL(8,2)) AS tip_amount,
CAST(tolls_amount AS DECIMAL(8,2)) AS tolls_amount,
CAST(improvement_surcharge AS DECIMAL(8,2)) AS improvement_surcharge,
CAST(total_amount AS DECIMAL(8,2)) AS total_amount,
regexp_extract(INPUT__FILE__NAME, '.+/(.+)$', 1) AS source_file
from green_tripdata_staging;

insert INTO TABLE trips
SELECT 'green' AS taxi_type,
vendor_id, 
CAST(lpep_pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(lpep_dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
passenger_count,
trip_distance,
pickup_latitude,
pickup_longitude,
rate_code_id,
store_and_fwd_flag,
dropoff_latitude,
dropoff_longitude,
payment_type,
CAST(fare_amount AS DECIMAL(8,2)) AS fare_amount,
CAST(extra AS DECIMAL(8,2)) AS extra,
CAST(mta_tax AS DECIMAL(8,2)) AS mta_tax,
CAST(tip_amount AS DECIMAL(8,2)) AS tip_amount,
CAST(tolls_amount AS DECIMAL(8,2)) AS tolls_amount,
NULL AS improvement_surcharge,
CAST(total_amount AS DECIMAL(8,2)) AS total_amount,
regexp_extract(INPUT__FILE__NAME, '.+/(.+)$', 1) AS source_file
from green_tripdata_pre2015_staging;

CREATE EXTERNAL TABLE IF NOT EXISTS central_park_weather_observations_raw (
  station_id STRING,
  station_name STRING,
  weather_date STRING,
  precipitation double,
  snow_depth double,
  snowfall double,
  max_temperature double,
  min_temperature double,
  average_wind_speed double
  )
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION 's3a://gregoryg/datasets/nyc-taxi/central_park_weather';

CREATE TABLE IF NOT EXISTS central_park_weather_observations STORED AS PARQUET AS
   SELECT station_id, station_name, CAST(CONCAT(substring(weather_date, 1, 4), '-', substring(weather_date, 5, 2), '-', substring(weather_date, 7, 2)) AS TIMESTAMP) AS weather_date, snow_depth, snowfall, max_temperature, min_temperature, average_wind_speed FROM central_park_weather_observations_raw WHERE station_id <> 'STATION';




----  GJG 

-- CREATE TABLE trips (
--   id serial primary key,
--   cab_type_id INT,
--   vendor_id STRING,
--   pickup_datetime timestamp without time zone,
--   dropoff_datetime timestamp without time zone,
--   store_and_fwd_flag char(1),
--   rate_code_id INT,
--   pickup_longitude DOUBLE,
--   pickup_latitude DOUBLE,
--   dropoff_longitude DOUBLE,
--   dropoff_latitude DOUBLE,
--   passenger_count INT,
--   trip_distance numeric,
--   fare_amount numeric,
--   extra numeric,
--   mta_tax numeric,
--   tip_amount numeric,
--   tolls_amount numeric,
--   ehail_fee numeric,
--   improvement_surcharge numeric,
--   total_amount numeric,
--   payment_type STRING,
--   trip_type INT,
--   pickup_nyct2010_gid INT,
--   dropoff_nyct2010_gid INT
-- );

-- SELECT AddGeometryColumn('trips', 'pickup', 4326, 'POINT', 2);
-- SELECT AddGeometryColumn('trips', 'dropoff', 4326, 'POINT', 2);

-- CREATE TABLE central_park_weather_observations (
--   station_id STRING,
--   station_name STRING,
--   date date,
--   precipitation numeric,
--   snow_depth numeric,
--   snowfall numeric,
--   max_temperature numeric,
--   min_temperature numeric,
--   average_wind_speed numeric
-- );

-- CREATE UNIQUE INDEX index_weather_observations ON central_park_weather_observations (date);

2.4.0.4 Airlines dataset

create database if not exists airlines;
use airlines;

create external table if not exists airlines
LIKE PARQUET 's3a://gregoryg/datasets/airlines/airlines/4345e5eef217aa1b-c8f16177f35fd988_1432111844_data.0.parq'
STORED AS PARQUET
LOCATION 's3a://gregoryg/datasets/airlines/airlines/';

create external table if not exists airports_raw
(iata string,
airport string,
city string,
state string,
country string,
latitude float,
longitude float)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 's3a://gregoryg/datasets/airlines/airports';

create table if not exists airports
stored as parquet
AS
select regexp_replace(iata, '"','') AS iata,
regexp_replace(airport, '"','') AS airport,
regexp_replace(city, '"','') AS city,
regexp_replace(state, '"','') AS state,
regexp_replace(country, '"','') AS country,
latitude,
longitude
from airports_raw
where iata <> '"iata"'

create table if not exists default.airlines
STORED AS PARQUET
AS select * FROM airlines.airlines;

create table if not exists default.airports
STORED AS PARQUET
AS select * FROM airlines.airports;

2.4.0.5 Utility data

create database IF NOT EXISTS utility;
use utility;

CREATE EXTERNAL TABLE IF NOT EXISTS meterdata_raw (
       LocationName string,
       MeterName    string,
       ChannelName  string,
       IntervalReadDateTime string,
       IntervalReadRawValue float
       )
       COMMENT 'Meter Data using raw .csv files'
       ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       STORED AS TEXTFILE
       LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/MeterData';


CREATE EXTERNAL TABLE IF NOT EXISTS rateplan_raw (
       LocationName string,
       AccountName string,
       RateplanName string,
       RatePlanDate string
       )
       COMMENT 'Rate plan using raw .csv files'
       ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
       STORED AS TEXTFILE
       LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/RateplanData';

CREATE EXTERNAL TABLE IF NOT EXISTS register_raw (
       LocationName string,
       MeterName string,
       RegisterTypeName string,
       RegisterReadDate string,
       RegisterReadValue float
       )
       COMMENT 'Register data using raw .csv files'
       ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       STORED AS TEXTFILE
       LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/RegisterData';

CREATE EXTERNAL TABLE IF NOT EXISTS multiplier_raw (
       LocationName string,
       MeterName string,
       RegisterTypeName string,
       ApplicationTypeCode string,
       RegisterReadDate string,
       MultiplierFactor string
       )
       COMMENT 'Multiplier data using raw .csv files'
       ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       STORED AS TEXTFILE
       LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/MultiplierData';

CREATE EXTERNAL TABLE IF NOT EXISTS weather_raw (
       AmbientYear int,
       AmbientMonth int,
       AmbientDay int,
       AmbientHour int,
       AmbientDate string,
       WetBulbTemp double,
       DryBulbTemp double,
       BarometricPressure double,
       Precipitation double,
       RelativeHumidity double
       )
       COMMENT 'Weather data using raw .csv files'
       ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       STORED AS TEXTFILE
       LOCATION 's3a://gregoryg/utility/cdata/ds2/raw/WeatherData';


CREATE TABLE IF NOT EXISTS meterdata STORED AS PARQUET
       AS SELECT cast(substring(LocationName, 12) AS int) AS LocationName,
                 cast(substring(MeterName, 8) AS int) AS MeterName,
                 ChannelName,
                 cast(IntervalReadDateTime AS timestamp) AS IntervalReadDateTime,
                 IntervalReadRawValue
          FROM meterdata_raw
          WHERE LocationName <> 'LocationName';

CREATE TABLE IF NOT EXISTS multiplier STORED AS PARQUET
       AS SELECT cast(substring(LocationName, 12) AS int) AS LocationName,
                 cast(substring(MeterName, 11) AS int) AS MeterName,
                 RegisterTypeName,
                 ApplicationTypeCode,
                 cast(RegisterReadDate AS timestamp) AS RegisterReadDate,
                 cast(MultiplierFactor AS decimal(9,4)) AS MultiplierFactor
          FROM multiplier_raw
          WHERE LocationName <> 'LocationName';

CREATE TABLE IF NOT EXISTS rateplan STORED AS PARQUET
       AS SELECT cast(substring(LocationName, 12) AS int) AS LocationName,
                 cast(substring(AccountName, 11) AS int) AS AccountName,
                 RatePlanName,
                 cast(RatePlanDate AS timestamp) AS RatePlanDate
          FROM rateplan_raw
          WHERE LocationName <> 'LocationName';

CREATE TABLE IF NOT EXISTS register STORED AS PARQUET
       AS SELECT cast(substring(LocationName, 12) AS int) AS LocationName,
                 cast(substring(MeterName, 11) AS int) AS MeterName,
                 RegisterTypeName,
                 cast(RegisterReadDate AS timestamp) AS RegisterReadDate,
                 cast(RegisterReadValue AS decimal(9,2)) AS RegisterReadValue
          FROM register_raw
          WHERE LocationName <> 'LocationName';

CREATE TABLE IF NOT EXISTS weather STORED AS PARQUET
       AS SELECT * FROM weather_raw
       WHERE AmbientDate <> 'AmbientDate';

2.4.0.6 Connected Car Demo: Impala, Kudu, Streamsets and so much more

## TODO: Add Anaconda parcel setup
###  https://repo.continuum.io/pkgs/misc/parcels/
### curl -u "admin:admin" "http://${cmhost}:7180/api/v14/cm/config?view=full" |jq '{ "items": [.items[] | if .name=="REMOTE_PARCEL_REPO_URLS" then .value=.value + ",https://repo.continuum.io/pkgs/misc/parcels/" else . end]}'  > /tmp/gort2.json
### curl --silent -X PUT -H "Content-Type:application/json" -u "admin:admin" -d /tmp/gort.json

# if [ -z ${SSH_AUTH_SOCK} ] ; then
#     echo "SSH agent forwarding is not active in this shell session."
#     echo "Agent forwarding is required to install the Streamsets parcel on the CM host."
#     exit 1
# fi

# echo "Pulling cdh-projects, including the lovely notebooks/ subdirectory"
# cd
# git clone [email protected]:gregoryg/cdh-projects.git

# echo "Now setting up Streamsets"
# cd
# wget -q 'https://archives.streamsets.com/datacollector/2.2.1.0/rpm/streamsets-datacollector-2.2.1.0-all-rpms.tgz'
# tar -xzf streamsets-datacollector-2.2.1.0-all-rpms.tgz
# sudo yum -y localinstall streamsets*.rpm
# rm streamsets-datacollector-2.2.1.0-all-rpms.tgz streamsets*.rpm
# sudo service sdc start
# echo "Streamsets installed - browse to http://`hostname -f`:18630/"

echo "Compiling Scala programs needed for the demo"
wget -q http://mirrors.koehn.com/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar -zxf apache-maven-3.3.9-bin.tar.gz
export JAVA_HOME=$(readlink -e /usr/java/default)
export PATH=$PATH:`pwd`/apache-maven-3.3.9/bin:$JAVA_HOME/bin
echo " ... copying Connected Car demo directory"
mkdir -p demos
hdfs dfs -copyToLocal -p s3a://gregoryg/demos/ConnectedCarDemo demos/ConnectedCarDemo
cd demos/ConnectedCarDemo/demo/entity360
mvn clean package
cd bin/
# cmhost, data collector host
# zkhost=$(curl -u "admin:admin" "http://$cmhost:7180/api/v14/hosts?view=FULL" | jq -r '[.items[] | select(.roleRefs[].roleName | contains("ZOOKEEPER")) | .ipAddress] | first')
echo "Set up Jupyter notebook because notebooks are cool"
echo "And now update .bashrc"
tee -a ~/.bashrc <<EOF
# set environment variables for pyspark
export PYSPARK_DRIVER_PYTHON=/opt/cloudera/parcels/Anaconda/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False --NotebookApp.ip='*' --NotebookApp.port=8880"
export PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python
export PATH=/opt/cloudera/parcels/Anaconda/bin:$PATH:~/bin

EOF
echo "TODO: add env var to all nodes, including this gateway"
## export PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda/bin/python

echo "Downloading Jupyter notebook directory and starting Jupyter"
hdfs dfs -copyToLocal s3a://gregoryg/notebooks .
(cd notebooks ; nohup pyspark2 &)


echo 'All done!'

2.4.0.7 Hive benchmark dataset with User visits and website rankings

-- LOCATION $sudouser
create database hivebench;

use hivebench;

create external table rankings_raw
       (
        pageURL VARCHAR(300),
        pageRank INT,
        avgDuration INT
        )
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       LOCATION 's3a://gregoryg/bigtest/rankings/';
        -- LOCATION '${var:USERDIR}/data/bigtest/rankings/';

create external table uservisits_raw
       (
       sourceIP VARCHAR(116),
       destURL VARCHAR(100),
       visitDate timestamp,
       adRevenue FLOAT,
       userAgent VARCHAR(256),
       countryCode CHAR(3),
       languageCode CHAR(6),
       searchWord VARCHAR(32),
       duration INT
       )
       ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
       LOCATION 's3a://gregoryg/bigtest/uservisits/';
       -- LOCATION '${var:USERDIR}/data/bigtest/uservisits/';

create table uservisits stored as parquet as select * FROM uservisits_raw;

create table rankings stored as parquet as select * FROM rankings_raw;

cim-demos's People

Contributors

gregoryg avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.