Giter Site home page Giter Site logo

spark-rabbitmq's Introduction

[Coverage Status] (https://coveralls.io/github/Stratio/spark-rabbitmq?branch=master)

RabbitMQ Spark Streaming Receiver

RabbitMQ-Receiver is a library that allows the user to read data with Apache Spark Streaming from RabbitMQ.

Requirements

This library requires Spark 2.0+, Scala 2.11+, RabbitMQ 3.5+

Using the library

There are two ways of using RabbitMQ-Receiver library:

The first one is to add the next dependency in your pom.xml:

<dependency>
  <groupId>com.stratio.receiver</groupId>
  <artifactId>spark-rabbitmq</artifactId>
  <version>LATEST</version>
</dependency>

The other one is to clone the full repository and build the project:

git clone https://github.com/Stratio/spark-rabbitmq.git
mvn clean install

This library includes two implementations for consuming messages from RabbitMQ with Spark Streaming:

Build

mvn clean package

Distributed Approach

This advanced consumer has been implemented extending the Spark InputDStream class. With this approach the user can consume message from multiple rabbitMQ clusters or multiple rabbitMQ queues. In addition is possible to parallelize the consumption in one node starting more than one consumer, one for each Spark RDD Partition.

  • One executor with multiple parallelized consumer from one queue

Single-Parallelized

  • One executor or more with multiple consumers from multiple queues

Multiple-Parallelized

  • Cluster consumer

Cluster

The consumption options establish the number of partitions of the RDD generated by the RabbitMQDStream, each Spark node consumes messages and these messages define the data that are included in the partitions. Is not necessary to save the data into the Spark Block Manager. The future transformations and actions use the data stored on each executor.

When the Streaming starts the time window is divided, to consume and to compute, by default the time for consuming messages from rabbitMQ is 0.9 times the Spark Window. It is possible to limit the time in order to have better performance, in the configuration the user can choose the "maxReceiveTime" in milliseconds. In addition is possible to limit the number of consumed messages with the configuration parameter "maxMessagesPerPartition"

This receiver has optimized the RDD functions count and countAprox.

Each executor has one connection pool and are reused on each streaming batch window in order to have better performance. The actual kafka direct approach implemented by Spark does not have one connection pool, this provoke that on each iteration, the RDDs create a new kafka connection.

This consumer has a limitation, the minimum storage level selected for this RabbitMQDStream is MEMORY_ONLY, the user can't select NONE, because on each Spark action the RDD will be re-computed

Scala API

  • String
val receiverStream = RabbitMQUtils.createDistributedStream[String](sparkStreamingContext, params, distributedKeys)
  • Generic user Type
val receiverStream = RabbitMQUtils.createDistributedStream[R](sparkStreamingContext, params, distributedKeys, Array[Byte] => R))

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaDistributedStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

Spark Parameters Options

Parameter Description Optional
maxMessagesPerPartition Maximum number of messages Yes
levelParallelism Num. of partitions by executor Yes (default: 1)
MaxReceiveTime Max time to receive messages Yes (default: 0) (auto)
rememberDuration Remember duration for Spark Dstreams Yes (default: 60s)

Receiver-based Approach

This is a basic consumer, when the Streaming Context starts Spark run one process in one executor for consuming messages from RabbitMQ. This consumer has one singleton consumer instance for consuming messages asynchronously, on each spark window the consumer receives messages and saves the blocks received inside the Spark Block Memory. All the data is replicated to other nodes. The receiver extends one Akka Actor, this makes that the receiver-base approach implementation has the Akka dependency. In future versions of Spark the Akka dependency will be removed.

Scala API

  • String
val receiverStream = RabbitMQUtils.createStream[String](sparkStreamingContext, params)
  • Generic user Type
val receiverStream = RabbitMQUtils.createStream[R](sparkStreamingContext, params, Array[Byte] => R)

Java API

JavaReceiverInputDStream receiverStream = RabbitMQUtils.createJavaStream[R](javaSparkStreamingContext, params, JFunction[Array[Byte], R]);

RabbitMQ Parameters Options

Parameter Description Optional
hosts RabbitMQ hosts Yes (default: localhost)
virtualHost RabbitMQ virtual Host Yes
queueName Queue name Yes
exchangeName Exchange name Yes
exchangeType Exchange type Yes
routingKeys Routing keys comma separated Yes
userName RabbitMQ username Yes
password RabbitMQ password Yes
durable durable Yes (default: true)
exclusive exclusive Yes (default: false)
autoDelete autoDelete Yes (default: false)
ackType basic/auto Yes (default: basic)
fairDispatch fairDispatch Yes (default: false)
prefetchCount prefetchCount Yes (default: 1)
storageLevel Apache Spark storage level Yes (default: MEMORY_ONLY)
x-max-length RabbitMQ queue property Yes
x-message-ttl RabbitMQ queue property Yes
x-expires RabbitMQ queue property Yes
x-max-length-bytes RabbitMQ queue property Yes
x-dead-letter-exchange RabbitMQ queue property Yes
x-dead-letter-routing-key RabbitMQ queue property Yes
x-max-priority RabbitMQ queue property Yes

License

Licensed to STRATIO (C) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The STRATIO (C) licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

spark-rabbitmq's People

Contributors

compae avatar danielcsant avatar dcarroza-stratio avatar gschiavon avatar witokondoria avatar maimonoded avatar aalfonso-stratio avatar becaresss avatar stephaneseng avatar vetler avatar sgomezg avatar nelsou avatar croofec avatar kevinmellott91 avatar fjavierjimenez avatar anistal avatar avalcepina avatar ahvargas avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.