Giter Site home page Giter Site logo

brooksian / flink2kafka Goto Github PK

View Code? Open in Web Editor NEW
20.0 3.0 7.0 3.05 MB

A Flink applcation that demonstrates reading and writing to/from Apache Kafka with Apache Flink

Python 8.17% Java 91.83%
flink-stream-processing flink-examples flink-kafka intellij-idea

flink2kafka's Introduction

There And Back Again

A Story of Apache Kafka & Apache Flink.

ProjectRA

Project Details

This project has heavily inspired by two existing efforts from Data In Motion's FLaNK Stack and Data Artisan's blog on stateful streaming applications. The goal of this project is to provide insight into connecting an Apache Flink applications to Apache Kafka.

Church of the FLaNK Stack

Flank

Data Artisan's

NYC Taxi Ride Data Set

Project Scope

This project includes the Apache Flink application code and NiFi flow required to get the data into and out Apache Kafka. It doesn't include installation steps NiFi, Kafka, or Flink, but links to installation documentations have been provided below.

Project Prerequisites

  1. Apache NiFi local server
  2. Apache Kafka with an empty topic called "rawinput" and "enriched"
  3. IntelliJ IDE installed with Scala plug-in installed
  4. A cloned copy of this Git repository

Project Set Up and Run

Apache NiFi to Apache Kafka Setup

With Apache NiFi, the records from the source CSV file will be converted into individual JSON records. These records will be written to an Apache Kafka topic called "rawInput".

  • In the NiFi UI, import the NiFi Flow template (XML file in this Git repo). For help, please review the following documentation. Cloudera Documemnetation Link.

  • Upload NiFi Flow template using the UI icons.

UploadTemp

  • To add NiFi Flow Template the to canvas, click on the "Add Template icon" in the NiFI UI.

AddTemp2UI

  • Select the NiFi FLow Template to add.

SelectTempToAdd

  • Once the NiFi template is loaded, the left side of the NiFi flow will look like this.

nifiFlow

  • Right click on the GetFileCSV processor, open Properties tab, and set the path to the source CSV file in the Input Directory option. Please note, the CSV file is located in the data directory of this Git repo.

readCSVFile

  • Right click on the SplitRecord processor, open Properties tab, and click on the CSVReader.

CP

  • Before the NiFi Flow will work, all of these services need to be enabled.

ESC0

ESC1

ESC2

  • Right click on the PublishKafkaRecord processor, open Properties tab, and verify the location of your Kafka broker and topic name.

prodKConf

  • Verify the JSON records are being written to rawInput Kafka topic. This can be accomplished with right side the NiFi flow. Once this has been verified please turn off Kafka Consumer processor.

conKRaw

  • Validate the JSON record in the Flow File

FFJson

Flink Application Development In IntelliJ

For Development purposes, a running Flink cluster isn't required for application development. This application was built inside of the IntelliJ IDE because it will stand up Flink when your application is running, and the shut it down. This of course isn't required, but it will does make your life easier.

Flink Application - Connect to Kafka Topic

Once JSON files are being written to the Kafka topic, Flink can create a connection to the topic and create a Flink table on top of it, which can later be queried with SQL. This Github repository contains a Flink application that demonstrates this capability.

Java Libraries Required

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

Define Flink Streaming Environment

  • In Flink, the following java code defines the Flink Stream Execution and Stream Table Environments
 //Class Member Static Variables
    static StreamExecutionEnvironment fsEnv;
    static StreamTableEnvironment fsTableEnv;
    static EnvironmentSettings fsSettings;

 // create execution environment
    fsSettings = EnvironmentSettings.newInstance()
       .useBlinkPlanner()
       .inStreamingMode()
       .withBuiltInCatalogName("default_catalog")
       .withBuiltInDatabaseName("default_database")
       .build();

    fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
  // configure event-time and watermarks
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    fsEnv.getConfig().enableForceAvro();
    fsEnv.getConfig().setAutoWatermarkInterval(1000L);

  //Create Streaming Table Environment
     fsTableEnv  = StreamTableEnvironment.create(fsEnv, fsSettings);

Establish Flink Table Connection

  • In Flink, the following java code establishes a Flink Table connection with a Kafka topic. Please note, the schema has been set as JSON and the schema has been provided.
// create table environment
       fsTableEnv.connect(
               new Kafka()
                       .version("universal")
                       .topic("rawInput")
                       .startFromLatest()
                       .property("zookeeper.connect", "localhost:2181")
                       .property("bootstrap.servers", "localhost:9092")
                       .property("group.id", "test")
       )
       // declare a format for this system
       .withFormat(
          new Json()
       )
       // declare the schema of the table
       .withSchema(
          new Schema()
               .field("medallion", DataTypes.STRING())
               .field("licenseId", DataTypes.STRING())
               .field("pickUpTime", DataTypes.STRING())
               .field("dropOffTime", DataTypes.STRING())
               .field("trip_time_in_secs", DataTypes.BIGINT())
               .field("trip_distance", DataTypes.FLOAT())
               .field("pickUpLon", DataTypes.FLOAT())
               .field("pickUpLat", DataTypes.FLOAT())
               .field("dropOffLon", DataTypes.FLOAT())
               .field("dropOffLat", DataTypes.FLOAT())
               .field("payment_type", DataTypes.STRING())
               .field("fare_amount", DataTypes.FLOAT())
               .field("surcharge", DataTypes.FLOAT())
               .field("mta_tax", DataTypes.FLOAT())
               .field("tip_amount", DataTypes.FLOAT())
               .field("tolls_amount", DataTypes.FLOAT())
               .field("total", DataTypes.FLOAT())
       )
       .inAppendMode()
       // create a table with given name
       .createTemporaryTable("TaxiRides");
  • The Flink application will display the following if everything is working as expected.

KafkaRead

Query Flink Table Built On Kafka Topic

  • In Flink, the following Java code will query the newly established Flink Table and print to the screen
// define SQL query to compute average total per area and hour
    Table result = fsTableEnv.sqlQuery(
            "SELECT " +
                    " * " +
                    "FROM TaxiRides"
    );

    // convert result table into a stream and print it
    fsTableEnv.toAppendStream(result, Row.class).print();

Establish a Connection to Destination Kafka Topic

  • In Flink, the following java code will create a connection to a Kafka topic "enriched". Please note, the schema has been set as JSON and the schema has been provided.
 // create table environment
        fsTableEnv.connect(
            new Kafka()
            .version("universal")
            .topic("enriched")
            .startFromLatest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test")
        )
        // declare a format for this system
        .withFormat(
            new Json()
        )
        // declare the schema of the table
        .withSchema(
            new Schema()
                .field("medallion", DataTypes.STRING())
                .field("TimeStamp", DataTypes.TIMESTAMP(3) )
        )
        .inAppendMode()
        // create a table with given name
        .createTemporaryTable("KafkaSink");

Write to Kafka Topic From Flink Query

  • In Flink, the following code will write the query results to a Kafka topic that was established in the previous step.
// define SQL query to compute average total per area and hour
        Table result = fsTableEnv.sqlQuery(
                "SELECT " +
                " medallion, CURRENT_TIMESTAMP, " +
                " FROM  TaxiRides"
        );

        result.insertInto("KafkaSink");
  • The following output is expected in the application. Pleas note the last value in this images was removed from the code example.

KafkaRead

Apache Kafka to NiFi

Read from Kafka Topic "enriched"

  • In the NiFi UI, find the following section of the flow.

conFromKafka1

  • Validate the Kafka settings are correct.

conKConfig

  • Active the Consumer Kafka Processor and validate results.

Helpful Installation Links

Apache NiFi

Apache Kafka

Apache Flink

IntelliJ

Additional Helpful Links

Apache Flink

Apache Kafka + Apache Flink

Apache Flink and Apache Kafka Code Examples

Apache Kafka + Apache Druid

Additional Apache Project Install Links

Additional Apache Projects On Docker

flink2kafka's People

Contributors

brooksian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.