Giter Site home page Giter Site logo

play-n-go-platform-services / kinesis-sql Goto Github PK

View Code? Open in Web Editor NEW

This project forked from qubole/kinesis-sql

0.0 1.0 1.0 154 KB

Kinesis Connector for Structured Streaming

Home Page: http://www.qubole.com

License: Apache License 2.0

Scala 98.48% Java 1.52%

kinesis-sql's Introduction

Kinesis Connector for Structured Streaming

Implementation of Kinesis Source Provider in Spark Structured Streaming. SPARK-18165 describes the need for such implementation.

Developer Setup

Checkout kinesis-sql branch depending upon your Spark version. Use Master branch for the latest Spark version

Spark version 2.2.0
git clone [email protected]:qubole/kinesis-sql.git
git checkout 2.2.0
cd kinesis-sql
mvn install -DskipTests

This will create target/spark-sql-kinesis_2.11-2.2.0.jar file which contains the connector code and its dependency jars.

Spark version 2.3.2
git clone [email protected]:qubole/kinesis-sql.git
git checkout 2.3.2
cd kinesis-sql
mvn install -DskipTests

This will create target/spark-sql-kinesis_2.11-2.3.2.jar file which contains the connector code and its dependency jars.

How to use it

Setup Kinesis

Refer Amazon Docs for more options

Create Kinesis Stream
$ aws kinesis create-stream --stream-name test --shard-count 2
Add Records in the stream
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Kinesis'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Connector'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'for'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Apache'
$ aws kinesis put-record --stream-name test --partition-key 1 --data 'Spark'

Example Streaming Job

Refering $SPARK_HOME to the Spark installation directory.

Open Spark-Shell
$SPARK_HOME/bin/spark-shell --jars target/spark-sql-kinesis_2.11-2.2.0.jar
Subscribe to Kinesis Source
// Subscribe the "test" stream
scala> :paste
val kinesis = spark
	.readStream
	.format("kinesis")
	.option("streamName", "spark-streaming-example")
   	.option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .option("startingposition", "TRIM_HORIZON")
	.load
Check Schema
scala> kinesis.printSchema
root
|-- data: binary (nullable = true)
|-- streamName: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- sequenceNumber: string (nullable = true)
|-- approximateArrivalTimestamp: timestamp (nullable = true)
Word Count
// Cast data into string and group by data column
scala> :paste

	 kinesis
    .selectExpr("CAST(data AS STRING)").as[(String)]
    .groupBy("data").count()
	.writeStream
	.format("console")
    .outputMode("complete") 
	.start()
	.awaitTermination()
Output in Console
+------------+-----+
|        data|count|
+------------+-----+
|         for|    1|
|      Apache|    1|
|       Spark|    1|
|     Kinesis|    1|
|   Connector|    1|
+------------+-----+ 
Using the Kinesis Sink
// Cast data into string and group by data column
    scala> :paste
    kinesis
    .selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)]
    .groupBy("data").count()
    .writeStream
    .format("kinesis")
    .outputMode("update") 
    .option("streamName", "spark-sink-example")
    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("awsAccessKeyId", [ACCESS_KEY])
    .option("awsSecretKey", [SECRET_KEY])
    .start()
    .awaitTermination()

Kinesis Source Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to read from
endpointUrl https://kinesis.us-east-1.amazonaws.com end-point URL for Kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record
startingPosition LATEST Starting Position in Kinesis to fetch data from. Possible values are "latest", "trim_horizon", "earliest" (alias for trim_horizon)
kinesis.executor.maxFetchTimeInMs 1000 Maximum time spent in executor to fetch record from Kinesis per Shard
kinesis.executor.maxFetchRecordsPerShard 100000 Maximum Number of records to fetch per shard
kinesis.executor.maxRecordPerRead 10000 Maximum Number of records to fetch per getRecords API call
kinesis.client.describeShardInterval 1s (1 second) Minimum Interval between two DescribeStream API calls to consider resharding
kinesis.client.numRetries 3 Maximum Number of retries for Kinesis API requests
kinesis.client.retryIntervalMs 1000 Cool-off period before retrying Kinesis API

Kinesis Sink Configuration

Option-Name Default-Value Description
streamName - Name of the stream in Kinesis to write to
endpointUrl https://kinesis.us-east-1.amazonaws.com The aws endpoint of the kinesis Stream
awsAccessKeyId - AWS Credentials for Kinesis describe, read record operations
awsSecretKey - AWS Credentials for Kinesis describe, read record
kinesis.executor.recordMaxBufferedTime 1000 (millis) Specify the maximum buffered time of a record
kinesis.executor.maxConnections 1 Specify the maximum connections to Kinesis
kinesis.executor.aggregationEnabled true Specify if records should be aggregated before sending them to Kinesis

Roadmap

  • We need to migrate to DataSource V2 APIs for MicroBatchExecution.
  • Maintain Per Micro-Batch Shard Commit state in Dynamo DB

Acknowledgement

This connector would not have been possible without reference implemetation of Kafka connector for Structured streaming, Kinesis Connector for Legacy Streaming and Kinesis Client Library. Structure of some part of the code is influenced by the excellent work done by various Apache Spark Contributors.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.