Giter Site home page Giter Site logo

ververica / flink-sql-cookbook Goto Github PK

View Code? Open in Web Editor NEW
810.0 53.0 195.0 14.48 MB

The Apache Flink SQL Cookbook is a curated collection of examples, patterns, and use cases of Apache Flink SQL. Many of the recipes are completely self-contained and can be run in Ververica Platform as is.

License: Apache License 2.0

Python 16.49% Dockerfile 76.66% Shell 6.85%
apache-flink flink stream-processing sql flink-sql

flink-sql-cookbook's Introduction

Apache Flink SQL Cookbook

The Apache Flink SQL Cookbook is a curated collection of examples, patterns, and use cases of Apache Flink SQL. Many of the recipes are completely self-contained and can be run in Ververica Platform as is.

The cookbook is a living document. ๐ŸŒฑ

Table of Contents

Foundations

  1. Creating Tables
  2. Inserting Into Tables
  3. Working with Temporary Tables
  4. Filtering Data
  5. Aggregating Data
  6. Sorting Tables
  7. Encapsulating Logic with (Temporary) Views
  8. Writing Results into Multiple Tables
  9. Convert timestamps with timezones

Aggregations and Analytics

  1. Aggregating Time Series Data
  2. Watermarks
  3. Analyzing Sessions in Time Series Data
  4. Rolling Aggregations on Time Series Data
  5. Continuous Top-N
  6. Deduplication
  7. Chained (Event) Time Windows
  8. Detecting Patterns with MATCH_RECOGNIZE
  9. Maintaining Materialized Views with Change Data Capture (CDC) and Debezium
  10. Hopping Time Windows
  11. Window Top-N
  12. Retrieve previous row value without self-join

Other Built-in Functions & Operators

  1. Working with Dates and Timestamps
  2. Building the Union of Multiple Streams
  3. Filtering out Late Data
  4. Overriding table options
  5. Expanding arrays into new rows
  6. Split strings into maps

User-Defined Functions (UDFs)

  1. Extending SQL with Python UDFs

Joins

  1. Regular Joins
  2. Interval Joins
  3. Temporal Table Join between a non-compacted and compacted Kafka Topic
  4. Lookup Joins
  5. Star Schema Denormalization (N-Way Join)
  6. Lateral Table Join

Former Recipes

  1. Aggregating Time Series Data (Before Flink 1.13)

About Apache Flink

Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities.

Learn more about Flink at https://flink.apache.org/.

License

Copyright ยฉ 2020-2022 Ververica GmbH

Distributed under Apache License, Version 2.0.

flink-sql-cookbook's People

Contributors

airblader avatar ftisiot avatar genert avatar jingge avatar jneless avatar joemoe avatar knaufk avatar mans2singh avatar martijnvisser avatar morsapaes avatar sjwiesman avatar snuyanzin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-sql-cookbook's Issues

Connector "Faker" doesn't seem to work

Hi and Thanks for this repo, it's very interesting.

I'm trying to execute Day 2 on the community edition of VVP but getting the below error:

image

Is it something with my env. or anything else? Please help me troubleshoot.

Thanks in advance,

/Dan

Examples in "01 Working with Dates and Timestamps" doesn't work

Flink 1.14.0, Here's the error I get

Caused by: org.apache.flink.table.planner.codegen.CodeGenException: TIMESTAMP_LTZ only supports diff between the same type.
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$$anonfun$generateTimestampLtzMinus$2.apply(ScalarOperatorGens.scala:320)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$$anonfun$generateTimestampLtzMinus$2.apply(ScalarOperatorGens.scala:314)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$$anonfun$generateOperatorIfNotNull$1.apply(ScalarOperatorGens.scala:2409)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$$anonfun$generateOperatorIfNotNull$1.apply(ScalarOperatorGens.scala:2409)
	at org.apache.flink.table.planner.codegen.GenerateUtils$$anonfun$generateCallIfArgsNotNull$1.apply(GenerateUtils.scala:68)
	at org.apache.flink.table.planner.codegen.GenerateUtils$$anonfun$generateCallIfArgsNotNull$1.apply(GenerateUtils.scala:68)
	at org.apache.flink.table.planner.codegen.GenerateUtils$.generateCallWithStmtIfArgsNotNull(GenerateUtils.scala:96)
	at org.apache.flink.table.planner.codegen.GenerateUtils$.generateCallIfArgsNotNull(GenerateUtils.scala:67)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateOperatorIfNotNull(ScalarOperatorGens.scala:2408)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateTimestampLtzMinus(ScalarOperatorGens.scala:313)
	at org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateTemporalPlusMinus(ScalarOperatorGens.scala:286)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:550)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:500)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:491)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:491)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:500)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:491)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:491)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:500)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:491)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:491)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:500)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$9.apply(ExprCodeGenerator.scala:491)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:491)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)
	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:157)
	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:168)
	at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:48)
	at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.java:75)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank.translateToPlanInternal(StreamExecRank.java:185)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit.translateToPlanInternal(StreamExecSortLimit.java:99)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:250)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToTransformation(CommonExecLegacySink.java:172)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacySink.translateToPlanInternal(CommonExecLegacySink.java:89)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:71)
	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(StreamPlanner.scala:70)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1657)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1607)
	at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:166)

image

Recipe #21 (lateral join) is problematic

Recipe #21 has issues.

This line

    ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time) AS rownum

should be

    ROW_NUMBER() OVER (PARTITION BY id ORDER BY arrival_time DESC) AS rownum

Otherwise it only takes the first arrival for each person, and never considers any of their subsequent moves. This is why the final query as published rather quickly reaches a steady state, and ceases to have updates. It only shows the initial placement of the 100 people, as they are born.

However, the results from this updated query are messy. Once the people start to move around, the number of results displayed in the SQL client begins to rise. While there are 8 results in the beginning (the top 2 for each of the 4 states), if you leave it running then there are more and more. It's as though it switches from retracting to appending, but I'm not sure what's going on.

Also, I note the animated gif for this example doesn't go very far -- it doesn't show any results.

Mssing info for handling time zone

Hi there,

thank you for this great examples for Flink SQL. It helps a lot for getting useful tips.

I am missing some information about time zones in this post. I have in a csv file a date as string (without zone identifier). But I know that UTC is meant. In Flink SQL I would create the timestamp from the string using this function:

eventtime AS TO_TIMESTAMP(message_date, 'yyyy/MM/dd HH:mm:ss')

How can I specify that this string should be interpreted as UTC?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.