Giter Site home page Giter Site logo

confluentinc / ksql Goto Github PK

View Code? Open in Web Editor NEW
5.8K 431.0 1.0K 178.98 MB

The database purpose-built for stream processing applications.

Home Page: https://ksqldb.io

License: Other

Java 99.62% ANTLR 0.11% Shell 0.09% HTML 0.04% Dockerfile 0.01% JavaScript 0.01% Python 0.12%
stream-processing real-time sql interactive kafka kafka-connect materialized-views ksqldb ksqldb-documentation ksqldb-tutorials

ksql's Introduction

KSQL rocket ksqlDB

The database purpose-built for stream processing applications

Overview

ksqlDB is a database for building stream processing applications on top of Apache Kafka. It is distributed, scalable, reliable, and real-time. ksqlDB combines the power of real-time stream processing with the approachable feel of a relational database through a familiar, lightweight SQL syntax. ksqlDB offers these core primitives:

  • Streams and tables - Create relations with schemas over your Apache Kafka topic data
  • Materialized views - Define real-time, incrementally updated materialized views over streams using SQL
  • Push queries- Continuous queries that push incremental results to clients in real time
  • Pull queries - Query materialized views on demand, much like with a traditional database
  • Connect - Integrate with any Kafka Connect data source or sink, entirely from within ksqlDB

Composing these powerful primitives enables you to build a complete streaming app with just SQL statements, minimizing complexity and operational overhead. ksqlDB supports a wide range of operations including aggregations, joins, windowing, sessionization, and much more. You can find more ksqlDB tutorials and resources here.

Getting Started

Documentation

See the ksqlDB documentation for the latest stable release.

Use Cases and Examples

Materialized views

ksqlDB allows you to define materialized views over your streams and tables. Materialized views are defined by what is known as a "persistent query". These queries are known as persistent because they maintain their incrementally updated results using a table.

CREATE TABLE hourly_metrics AS
  SELECT url, COUNT(*)
  FROM page_views
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY url EMIT CHANGES;

Results may be "pulled" from materialized views on demand via SELECT queries. The following query will return a single row:

SELECT * FROM hourly_metrics
  WHERE url = 'http://myurl.com' AND WINDOWSTART = '2019-11-20T19:00';

Results may also be continuously "pushed" to clients via streaming SELECT queries. The following streaming query will push to the client all incremental changes made to the materialized view:

SELECT * FROM hourly_metrics EMIT CHANGES;

Streaming queries will run perpetually until they are explicitly terminated.

Streaming ETL

Apache Kafka is a popular choice for powering data pipelines. ksqlDB makes it simple to transform data within the pipeline, readying messages to cleanly land in another system.

CREATE STREAM vip_actions AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id
  WHERE u.level = 'Platinum' EMIT CHANGES;

Anomaly Detection

ksqlDB is a good fit for identifying patterns or anomalies on real-time data. By processing the stream as data arrives you can identify and properly surface out of the ordinary events with millisecond latency.

CREATE TABLE possible_fraud AS
  SELECT card_number, count(*)
  FROM authorization_attempts
  WINDOW TUMBLING (SIZE 5 SECONDS)
  GROUP BY card_number
  HAVING count(*) > 3 EMIT CHANGES;

Monitoring

Kafka's ability to provide scalable ordered records with stream processing make it a common solution for log data monitoring and alerting. ksqlDB lends a familiar syntax for tracking, understanding, and managing alerts.

CREATE TABLE error_counts AS
  SELECT error_code, count(*)
  FROM monitoring_stream
  WINDOW TUMBLING (SIZE 1 MINUTE)
  WHERE  type = 'ERROR'
  GROUP BY error_code EMIT CHANGES;

Integration with External Data Sources and Sinks

ksqlDB includes native integration with Kafka Connect data sources and sinks, effectively providing a unified SQL interface over a broad variety of external systems.

The following query is a simple persistent streaming query that will produce all of its output into a topic named clicks_transformed:

CREATE STREAM clicks_transformed AS
  SELECT userid, page, action
  FROM clickstream c
  LEFT JOIN users u ON c.userid = u.user_id EMIT CHANGES;

Rather than simply send all continuous query output into a Kafka topic, it is often very useful to route the output into another datastore. ksqlDB's Kafka Connect integration makes this pattern very easy.

The following statement will create a Kafka Connect sink connector that continuously sends all output from the above streaming ETL query directly into Elasticsearch:

 CREATE SINK CONNECTOR es_sink WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'topics'          = 'clicks_transformed',
  'key.ignore'      = 'true',
  'schema.ignore'   = 'true',
  'type.name'       = '',
  'connection.url'  = 'http://elasticsearch:9200');

Join the Community

For user help, questions or queries about ksqlDB please use our user Google Group or our public Slack channel #ksqldb in Confluent Community Slack. Everyone is welcome!

You can get help, learn how to contribute to ksqlDB, and find the latest news by connecting with the Confluent community.

For more general questions about the Confluent Platform please post in the Confluent Google group.

Contributing and building from source

Contributions to the code, examples, documentation, etc. are very much appreciated.

License

The project is licensed under the Confluent Community License.

Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation.

ksql's People

Contributors

agavra avatar alanconfluent avatar andrewegel avatar apurvam avatar big-andy-coates avatar bvarghese1 avatar cadonna avatar confluentjenkins avatar confluentsemaphore avatar cprasad1 avatar dguy avatar gerrrr avatar hjafarpour avatar jimgalasyn avatar jnh5y avatar joel-hamill avatar lihaosky avatar maxzheng avatar michaeldrogalis avatar miguno avatar mjsax avatar nateab avatar purplefox avatar rodesai avatar spena avatar stevenpyzhang avatar vcrfxia avatar vpapavas avatar vvcephei avatar wcarlson5 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ksql's Issues

Queries involving multiple joins fail

The following test fails with a NullPointerException when added to the KQLParserTest.java class:

  @Test
  public void testMultipleJoins() throws Exception {
    MetaStore tempMetaStore = new MetaStoreImpl();

    for (KQLTopic kqlTopic : metaStore.getAllKQLTopics().values()) {
      tempMetaStore.putTopic(kqlTopic);
    }
    for (StructuredDataSource dataSource : metaStore.getAllStructuredDataSources().values()) {
      tempMetaStore.putSource(dataSource);
    }

    SchemaBuilder schemaBuilder = SchemaBuilder.struct()
        .field("COL0", SchemaBuilder.INT64_SCHEMA)
        .field("COL1", SchemaBuilder.FLOAT64_SCHEMA)
        .field("COL2", SchemaBuilder.STRING_SCHEMA)
        .field("COL3", SchemaBuilder.INT64_SCHEMA);

    KQLTopic
        kqlTopic =
        new KQLTopic("TEST3", "test3", new KQLJsonTopicSerDe());

    KQLStream kqlStream = new KQLStream("TEST3", schemaBuilder, schemaBuilder.field("COL0"),
        kqlTopic);

    tempMetaStore.putTopic(kqlTopic);
    tempMetaStore.putSource(kqlStream);

    String
        queryStr =
        "SELECT t1.col1, t2.col2, t3.col3 FROM "
            + "(test1 t1 LEFT JOIN test2 t2 ON t1.col1 = t2.col1) "
            + "LEFT JOIN test3 t3 ON t1.col1 = t3.col1;";
    Statement statement = kqlParser.buildAST(queryStr, tempMetaStore).get(0);
    Assert.assertTrue("testSimpleQuery fails", statement instanceof Query);
    Query query = (Query) statement;
    Assert.assertTrue("testTripleJoin fails", query.getQueryBody() instanceof QuerySpecification);
  }

The problem appears to be in the DataSourceExtractor.visitAliasedRelation() method, where it is assumed that the relationPrimary component of an aliasedRelation rule is a qualifiedName, when in fact it can also be another relation contained between parentheses, like in (test1 t1 LEFT JOIN test2 t2 ON t1.col1 = t2.col1). The result is that this relationPrimary is then visited as a joinRelation, which causes the visitJoinRelation() method to be called, which then returns null all the way up the call chain back to the original invocation of visitAliasedRelation, which then causes the NPE when the result (assumed to be a Table, actually null) is accessed.

start ksql throw FileNotFoundException

when I used command bin/ksql-cli local to start ksql, it throws a FileNotException. which told me that it can't find xxx/ksql-cli-0.1-SNAPSHOT-package/config/log4j-file.properties. I checked the project and found that there was no config directory in ksql-cli-0.1-SNAPSHOT-package while a etc directory after I package the project . So what can I do to solve this problem?

Python/Go/Node examples

After reading the documentation, I still have no idea how I'd make a connection to KSQL from another non-JVM language. Would a PostgreSQL driver for Node work to connect? The top FAQ question speaks to KSQL being useful to non-JVM language, yet there's no indication on how this is possible.

ksql standalone mode error

Hello i a mrunning the example of pageviews . i dont have the same behaviour in the local mode and in the standalone mode .. did i miss something ?

$ bin/ksql-cli local
.........

ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_58
Page_30
Page_74
LIMIT reached for the partition.
Query terminated
ksql> exit
Exiting KSQL.
$ cat./ksql-examples/src/main/resources/simpleQuery.sql
SELECT pageid FROM pageviews_original LIMIT 3;
$ bin/ksql-cli standalone ./ksql-examples/src/main/resources/simpleQuery.sql
io.confluent.ksql.exception.ParseFailedException: Parsing failed on KsqlEngine msg:PAGEVIEWS_ORIGINAL does not exist.
at io.confluent.ksql.KsqlEngine.parseQueries(KsqlEngine.java:204)
at io.confluent.ksql.cli.StandaloneExecutor.executeStatements(StandaloneExecutor.java:50)
at io.confluent.ksql.cli.commands.Standalone.run(Standalone.java:74)

bestsR
phil

Roadmap for additional serialization formats

Hi there, I'm super excited for the potential ksql can bring. I'm curious to hear what the roadmap looks like for adding new serialization formats. In particular we would require protobuf support to integrate with our existing topics.

Table creation throws io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to guarantee existence of topic

On creating tables in the CLI, we ran into

io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to guarantee existence of topic PAGEVIEWS_AVG_1

when creating a table like so:

CREATE TABLE pageviews_avg_1b \
    AS SELECT \
        userid, \
      SUM(viewtime) AS sum_viewtime, \
      MIN(viewtime) AS min_viewtime, \
      MAX(viewtime) AS max_viewtime, \
      COUNT(*) AS view_count \
    FROM pageviews \
    WINDOW SESSION (60 SECONDS) GROUP BY userid;

If we re-issued that statement, the table would create fine and SELECT worked.

This could be reproduced within the session (CREATE TABLE pageviews_avg_1c etc) , and also happened on another CLI session by another user. Exiting the CLI and starting it again healed the issue, and the tables then created straight away.

After a while, the issue started happening again in the session where it just worked

The CLI log showed

[2017-09-22 14:35:34,795] ERROR io.confluent.ksql.exception.KafkaResponseGetFailedException: Failed to guarantee existence of topic PAGEVIEWS_AVG_1A
	at io.confluent.ksql.util.KafkaTopicClientImpl.createTopic(KafkaTopicClientImpl.java:63)
	at io.confluent.ksql.structured.SchemaKStream.createSinkTopic(SchemaKStream.java:336)
	at io.confluent.ksql.structured.SchemaKTable.into(SchemaKTable.java:70)
	at io.confluent.ksql.structured.SchemaKTable.into(SchemaKTable.java:51)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:208)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
	at io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
	at io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
	at io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
	at io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.startQuery(StatementExecutor.java:331)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:274)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:215)
	at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatement(StatementExecutor.java:114)
	at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:109)
	at io.confluent.ksql.rest.server.computation.CommandRunner.run(CommandRunner.java:75)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'PAGEVIEWS_AVG_1A' already exists.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
	at io.confluent.ksql.util.KafkaTopicClientImpl.createTopic(KafkaTopicClientImpl.java:61)
	... 18 more
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'PAGEVIEWS_AVG_1A' already exists.
 (io.confluent.ksql.rest.server.computation.StatementExecutor:220)

Can't build

There's a dependency for KQL:kql-build-tools:1.0-SNAPSHOT specified in the top-level pom.xml file, but we don't appear to have anything like that in our internal Nexus server. Commenting out the surrounding element allows things to build just fine.

I'm guessing the solution is to deploy kql-build-tools to Nexus, which sounds pretty simple, but I don't know how to do that yet (sorry) so here's an issue instead.

No value present when joining two tables

Given two tables

CREATE TABLE pageviews_avg_1b \
    AS SELECT \
        userid, \
      SUM(viewtime) AS sum_viewtime, \
      MIN(viewtime) AS min_viewtime, \
      MAX(viewtime) AS max_viewtime, \
      COUNT(*) AS view_count \
    FROM pageviews \
    WINDOW SESSION (60 SECONDS) GROUP BY userid;
    
    CREATE TABLE pageviews_avg_5b \
    AS SELECT \
        userid, \
      SUM(viewtime) AS sum_viewtime, \
      MIN(viewtime) AS min_viewtime, \
      MAX(viewtime) AS max_viewtime, \
      COUNT(*) AS view_count \
    FROM pageviews \
    WINDOW SESSION (300 SECONDS) GROUP BY userid;

which individually return results

We want to compare them with a query

SELECT \
    p1.view_count AS view_count_p1, \
    p5.view_count AS view_count_p5 \
FROM \
    pageviews_avg_1b p1 \
    LEFT JOIN pageviews_avg_5b p5 \
    ON p1.user_id   = p5.user_id;

The query returns immediately and the user interface returns "No value present".

Is this not supported?

[2017-09-22 15:27:02,565] ERROR No value present
java.util.Optional.get(Optional.java:135)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildJoin(PhysicalPlanBuilder.java:506)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:112)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildProject(PhysicalPlanBuilder.java:363)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:119)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:137)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter.<init>(QueryStreamWriter.java:53)
io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource.streamQuery(StreamedQueryResource.java:68)
sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160)
org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)
org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)
org.glassfish.jersey.internal.Errors.process(Errors.java:315)
org.glassfish.jersey.internal.Errors.process(Errors.java:297)
org.glassfish.jersey.internal.Errors.process(Errors.java:267)
org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)
org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)
org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)
org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)
org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:408)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:583)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:524)
org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:461)
org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)
org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)
org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
org.eclipse.jetty.server.Server.handle(Server.java:499)
org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)
org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
java.lang.Thread.run(Thread.java:748)
 (io.confluent.ksql.cli.console.Console:130)

Does ksql support user define function?

Hi all,

We are using siddhi now, And some of our logic can't write as sql, so we use siddhi's UDF to deal with it.

I had read ksql's document but can't find UDF, so I want know does ksql have plan to support UDF.

Case (in)sensitivity

Right now, the core code base is full of redundant calls to toUpperCase() and equalsIgnoreCase() so that case insensitivity can be supported. These calls should be greatly reduced, but first we should decide on how far we actually want to go with case-insensitivity in KSQL.

I can think of two ways to approach case-sensitivity:

  1. Everything except string literals and anything that references the external filesystem (names of Kafka source topics, locations of Avro schema files, possibly names of state stores, etc.) is treated as case-insensitive.
  2. Only keywords are case-insensitive. This would mean that stream/table names, all fields, KQL topics, etc. are all case-sensitive.

From what I've seen of the code base, it looks like door number 2 would be easier to implement--it might be as simple as changing SqlBase.g4 and flat-out removing virtually every call to toUpperCase and equalsIgnoreCase. However, door number 1 seems closer to idiomatic SQL syntax and may be more familiar to the data scientists who may end up using this some day.

I'm leaning slightly towards door number 2, but I'd love to hear everyone else's thoughts on the matter.

I've already implemented rough drafts of both options, which can be seen here and here for options 1 and 2, respectively, but I'd rather not open an official PR until which way we decide we want to go.

@ewencp, @hjafarpour, @blueedgenick, thoughts?

Ksql-cli: When message with wrong format or blank line is published, ksql stops processing the entire stream/table altogether

If Any 1 message with wrong format or blank line is published, it stops processing the entire stream/table altogether. Unless the faulty entry is removed by topic recreation. On recreating the stream/table will only fix the issue, else Ksql keeps throwing exceptions as below.

Steps to reproduce:
Create a table from another stream which is of json format message. Publish a blank line to topic, observe the exception on ksql.

`Exception in thread "ksql_query_10-f8dae1f2-2fc1-4d1e-a8d2-927da88bbab7-StreamThread-67" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=beacons-input2, partition=0, offset=3000046
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3838)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:74)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:66)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)

`

ABS, ROUND, FLOOR, CEIL return NULL values for BIGINT data types

Given a BIGINT column, or an integer value, null is returned in the result set unless a mathematical operation is applied

For example, looking at the example pageviews stream

ksql> select viewtime, abs(viewtime), abs (viewtime+0.0), abs (-1), abs (1), abs (-1.1), round (1), floor (1), ceil (1)  from pageviews;
1506088116031 | null | 1.506088116031E12 | null | null | 1.1 | null | null | null

We would expect values to be returned for all columns - even if a value is already a absolute number (in the case of ABS) or an Integer (in the case of FLOOR/CEIL/ROUND)

The CLI log shows:

[2017-09-22 13:48:36,283] ERROR Error calculating column with index 1 : KSQL_COL_1 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 3 : KSQL_COL_3 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 4 : KSQL_COL_4 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 6 : KSQL_COL_6 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 7 : KSQL_COL_7 (io.confluent.ksql.structured.SchemaKStream:180)
[2017-09-22 13:48:36,284] ERROR Error calculating column with index 8 : KSQL_COL_8 (io.confluent.ksql.structured.SchemaKStream:180)

KSQL risk estimation

I read a warning about not using KSQL in a production environment. I would like to know what are the risks involved in this. I would love to use KSQL on production environment on my isolated topic with debug data not related to operational status of production, I am completely fine with KSQL not behaving as expected, would love to reassure that using KSQL is not a threat to the kafka cluster itself and other streams.

Creating stream and associated stream with new topic, appears to be attempting to associate with another topic

Existing topic, with data is mysite. All data if flowing through with two associated streams:

ksql> SHOW STREAMS;

 Stream Name        | Kafka Topic  | Format 
--------------------------------------------
 BIRF_BROWSER_SESSID | mysite | JSON   
 RUM_TTL_SESSID      | mysite | JSON 
ksql> CREATE STREAM ttl_browser_request WITH (kafka_topic='browser_requests', value_format='JSON') AS SELECT r.rum_ttl AS ttl, b.brand_name AS browser, r.sessid AS sessid, b.camg_reqid AS requestid FROM rum_ttl_sessid r LEFT JOIN birf_browser_sessid b ON r.requestid=b.camg_reqid;
Invalid topology building: Topic mysite has already been registered by another source.
ksql> LIST TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas 
-------------------------------------------------------------------
 _confluent-metrics | false      | 12         | 1                  
 _schemas           | false      | 1          | 1                  
 browser_requests   | true       | 2          | 1                  
 mysite             | true       | 1          | 1                  
 ksql__commands     | true       | 1          | 1                  
 pageviews          | false      | 1          | 1                  
 users              | false      | 1          | 1                  
ksql> 

Am I reading this correctly? How can I associate ttl_browser_request with browser_requests.

EDIT: Formatting

Possible to avoid running tests?

When compiling/packaging using mvn, it is not possible to skip compiling and running the unit tests (via mvn package -Dmaven.test.skip=true) as attempts to do so yield the following:

[ERROR] Failed to execute goal on project ksql-cli: Could not resolve dependencies for project io.confluent.ksql:ksql-cli:jar:0.1-SNAPSHOT: Could not find artifact io.confluent.ksql:ksql-core:jar:tests:0.1-SNAPSHOT in confluent (http://packages.confluent.io/maven/) -> [Help 1]

It's a bit tedious to have to run the unit tests each time as they take a few minutes for each run.

Let me know if there's a way to get around having to sit through the test execution each time.

Thanks.

Need a way to sample a topic

It's natural to want to create a stream, but what data exists in a topic? It may be completely free form without a schema.

How do I sample the topic a bit to see what data is coming through in order to know what I want to create?

Perhaps to start just something simple like:

select * from mytopic limit 5;

A more powerful query would be something like:

select random_sample(*, 100, 0) from mytopic;

selecting all the columns and 100 randomly sampled messages starting at offset 0

Cli exec command

Issue with the CLI exec command -- docker-compose exec ksql-cli ksql-cli local --bootstrap-server kafka:29092 returned a OCI path error.

This command worked. docker-compose exec ksql-cli java -jar /usr/share/confluent/ksql-cli-0.1-SNAPSHOT-standalone.jar local --bootstrap-server kafka:29092

After starting the docker-quickstart, I can't see any Kafka topics

After completing the quickstart, I run SHOW TOPICS; within KSQL and get the following output:

Kafka Topic        | Registered | Partitions | Partition Replicas
-------------------------------------------------------------------
 _confluent-metrics | false      | 12         | 1
 _schemas           | false      | 1          | 1
 ksql__commands     | true       | 1          | 1

The two demonstration topics - pageviews, and users don't appear. Running docker container ls from bash, I get the following output:

CONTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORT                                  S                                                    NAMES
d2853fffdfac        confluentinc/ksql-cli:latest              "perl -e 'while(1)..."   13 minutes ago      Up 13 minutes                                                                                                  quickstart_ksql-cli_1
6f54121daf3e        confluentinc/cp-schema-registry:latest    "/etc/confluent/do..."   13 minutes ago      Up 13 minutes       0.0.                                  0.0:8081->8081/tcp                                   quickstart_schema-registry_1
7dedcd9fba36        confluentinc/cp-enterprise-kafka:latest   "/etc/confluent/do..."   13 minutes ago      Up 13 minutes       0.0.                                  0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         quickstart_kafka_1
4337617d71f6        confluentinc/cp-zookeeper:latest          "/etc/confluent/do..."   13 minutes ago      Up 13 minutes       2181                                  /tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   quickstart_zookeeper_1

The remedy turns out to be running docker-compose up -d again from the /ksql/docs/quickstart directory:

user@server:~/ksql/docs/quickstart$ docker-compose up -d
quickstart_zookeeper_1 is up-to-date
quickstart_kafka_1 is up-to-date
quickstart_schema-registry_1 is up-to-date
Starting quickstart_ksql-datagen-users_1
Starting quickstart_ksql-datagen-pageviews_1
quickstart_ksql-cli_1 is up-to-date

I then get:

CONTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORTS                                                                                    NAMES
d2853fffdfac        confluentinc/ksql-cli:latest              "perl -e 'while(1)..."   15 minutes ago      Up 15 minutes                                                                                                quickstart_ksql-cli_1
27a412186e20        confluentinc/ksql-examples:latest         "bash -c 'echo Wai..."   15 minutes ago      Up 18 seconds                                                                                                quickstart_ksql-datagen-pageviews_1
82ffabc651e3        confluentinc/ksql-examples:latest         "bash -c 'echo Wai..."   15 minutes ago      Up 17 seconds                                                                                                quickstart_ksql-datagen-users_1
6f54121daf3e        confluentinc/cp-schema-registry:latest    "/etc/confluent/do..."   15 minutes ago      Up 15 minutes       0.0.0.                                0:8081->8081/tcp                                   quickstart_schema-registry_1
7dedcd9fba36        confluentinc/cp-enterprise-kafka:latest   "/etc/confluent/do..."   15 minutes ago      Up 15 minutes       0.0.0.                                0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         quickstart_kafka_1
4337617d71f6        confluentinc/cp-zookeeper:latest          "/etc/confluent/do..."   15 minutes ago      Up 15 minutes       2181/t                                cp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   quickstart_zookeeper_1

And can see the topics from ksql:

ksql> show topics;

 Kafka Topic        | Registered | Partitions | Partition Replicas
-------------------------------------------------------------------
 _confluent-metrics | false      | 12         | 1
 _schemas           | false      | 1          | 1
 ksql__commands     | true       | 1          | 1
 pageviews          | false      | 1          | 1
 users              | false      | 1          | 1

I'm running this on Ubuntu Server 16.04.

Looks like the scripts for the pageviews and users topics need to retry creation a few times?

Add stop and clear commands to introduction

I figured the stop command, but after going through the demo, how to clean Grafana entries and stop the ksql server were the first things I looked but couldn't find in the doc.

Differentiate streams from tables

Currently, the LIST STREAMS; statement lists both streams and tables, and includes a column in the output that specifies which each of the listed items is which. Couldn't we limit the LIST STREAMS; statement to only listing streams, and add a new LIST TABLES; statement which then lists tables (and also includes statestore information as well)? @hjafarpour maybe you can provide some insight into why LIST STREAMS; works as it currently does?

can not load data in grafana

os:suse 12 sp2
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # ./ksql-tables-to-grafana.sh
Loading Clickstream-Demo TABLES to Confluent-Connect => Elastic => Grafana datasource
Logging to: /tmp/ksql-connect.log
Charting CLICK_USER_SESSIONS_TS
Charting USER_IP_ACTIVITY_TS
Charting CLICKSTREAM_STATUS_CODES_TS
Charting ENRICHED_ERROR_CODES_TS
Charting ERRORS_PER_MIN_ALERT_TS
Charting ERRORS_PER_MIN_TS
Charting EVENTS_PER_MIN_MAX_AVG_TS
Charting EVENTS_PER_MIN_TS
Charting PAGES_PER_MIN_TS
Navigate to http://localhost:3000/dashboard/db/click-stream-analysis
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # ./clickstream-analysis-dashboard.sh
Loading Grafana ClickStream Dashboard
{"slug":"click-stream-analysis","status":"success","version":5}

ksql> SELECT * FROM EVENTS_PER_MIN_TS LIMIT 5;
1505526580000 | 4^�� | 1505526580000 | 4 | 1 1505526580000 | 27^�� | 1505526580000 | 27 | 2
1505526580000 | 2^�� | 1505526580000 | 2 | 1 1505526580000 | 15^�� | 1505526580000 | 15 | 1
1505526580000 | 39^�`� | 1505526580000 | 39 | 1
LIMIT reached for the partition.
Query terminated

it seem there is no index in elasticsearch
NKG1000114449:/home/ksql/ksql-0.1.x/ksql-clickstream-demo/demo # curl -XGET 'localhost:9200/_cat/indices?v&pretty'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size

Clickstream Demo: Recovering from install errors

After a permission error I had to restart the ksql clickstream installation.

On re-running ksql-clickstream-demo/demo/clickstream-schema.sql, the exception



_io.confluent.ksql.exception.KafkaTopicException: Topic 'EVENTS_PER_MIN' does not conform to the requirements Partitions:1 v 4. Replication: 1 v 1

was raised. At that point, I had no tables

ksql> show tables;

Table Name | Kafka Topic | Format | Windowed

but plenty of leftover topics

ksql> show topics;

 Kafka Topic                 | Registered | Partitions | Partition Replicas
----------------------------------------------------------------------------
 _schemas                    | false      | 1          | 1
 CLICK_USER_SESSIONS         | false      | 1          | 1
 CLICK_USER_SESSIONS_TS      | false      | 1          | 1
 clickstream                 | true       | 1          | 1
 clickstream_codes           | false      | 1          | 1
 CLICKSTREAM_STATUS_CODES_TS | false      | 1          | 1
 clickstream_users           | false      | 1          | 1
 connect-configs             | false      | 1          | 1
 connect-offsets             | false      | 25         | 1
 connect-statuses            | false      | 5          | 1
 CUSTOMER_CLICKSTREAM        | false      | 1          | 1
 ENRICHED_ERROR_CODES        | false      | 1          | 1
 ENRICHED_ERROR_CODES_COUNT  | false      | 1          | 1
 ENRICHED_ERROR_CODES_TS     | false      | 1          | 1
 ERRORS_PER_MIN              | false      | 1          | 1
 ERRORS_PER_MIN_ALERT        | false      | 1          | 1
 ERRORS_PER_MIN_ALERT_TS     | false      | 1          | 1
 ERRORS_PER_MIN_TS           | false      | 1          | 1
 EVENTS_PER_MIN              | false      | 1          | 1
 EVENTS_PER_MIN_MAX_AVG      | false      | 1          | 1
 EVENTS_PER_MIN_MAX_AVG_TS   | false      | 1          | 1
 EVENTS_PER_MIN_TS           | false      | 1          | 1
 ksql__commands              | true       | 1          | 1
 PAGES_PER_MIN               | false      | 1          | 1
 PAGES_PER_MIN_TS            | false      | 1          | 1
  USER_IP_ACTIVITY            | false      | 1          | 1
 USER_IP_ACTIVITY_TS         | false      | 1          | 1

That could be recovered from by removing the topic and restarting the script; however it then ran into a similar error with EVENTS_PER_MIN_TS. This time we attempted to simply increase the partitions to 4.

On the next run of the script, we now seem to fail when the EVENTS_PER_MIN table appears to be dropped - in this case we run into



_io.confluent.ksql.exception.ParseFailedException: Parsing failed on KsqlEngine msg:Cannot add the new data source. Another data source with the same name already exists: KsqlStream name:EVENTS_PER_MIN_

in the previous step, the script attempts to drop the table; when you execute that manually, you fail with 



-------------------------------------------------------------------------------
io.confluent.ksql.util.KsqlException: No topic with name true was registered.

in a secenario where



ksql> show tables;

 Table Name     | Kafka Topic    | Format | Windowed
-----------------------------------------------------
 EVENTS_PER_MIN | EVENTS_PER_MIN | JSON   | true

and

ksql> show topics; 

 Kafka Topic                 | Registered | Partitions | Partition Replicas
----------------------------------------------------------------------------
...
...
 EVENTS_PER_MIN              | false      | 4          | 1
...

Are these messages expected? Is there a better way to recover once you've got a similar failure?

Implement protobuf format reader

According to the documentation ksql supports JSON and csv as VALUE_FORMAT.

It would be really nice to start testing the concept with higher velocity streams of protobuf data, any plans to support this?

CREATE STREAM fails when field name is "group"

Hi,

I tried to let KSQL loose on a topic that contains (RSVP data from Meetup.com) as data in JSON format.

One of the fields in this dataset is called "group", which is also a keyword in KSQL (GROUP by). The parser fails with

ksql> CREATE STREAM rsvps2 (group VARCHAR) WITH (value_format='json', kafka_topic='rsvps');
line 1:23: extraneous input 'group' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}

I tried to escape the name by using double quotes, but then KSQL seems to include those quotes in the name and never finds any data.

DROP STREAM can be used to drop a TABLE

create a table:
command: CREATE table users_count as select user, count(*) from users group by user;
response: Table created and running

drop:
command: drop stream users_count;
response: Source USERS_COUNT was dropped

result: users_count table is dropped

cannot run client-server mode with docker-compose

I have few issues running ksql-server-start in Docker Env.

  1. First argument is mandatory i.e., ksql-server-start /etc/ksql/ksqlserver.properties
    It should be optional and if not supplied, should use default file based on KSQL_CONFIG_DIR Environment variable.
  2. We should be able to provide bootstrap.servers property via Environment variable. STREAMS_BOOTSTRAP_SERVERS has no effect.

inconsistent with statement at https://github.com/confluentinc/ksql/blob/master/docs/concepts.md

Use with default settings:

  ./bin/ksql-server-start

Test docker-compose.yml

version: '2.1'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    volumes:
      - zookeeper:/var/lib/zookeeper
    networks:
      - reactive-network

  kafka:
    image: confluentinc/cp-kafka
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_DELETE_TOPIC_ENABLE: "true"
    volumes:
      - kafka:/var/lib/kafka
    networks:
      - reactive-network
    links:
      - zookeeper
    depends_on:
      - zookeeper

  ksql-server:
    image: confluentinc/ksql-cli
    ports:
      - 9098:8080
    networks:
      - reactive-network
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: kafka:9092
    entrypoint: 'ksql-server-start /etc/ksql/ksqlserver.properties'

networks:
  reactive-network:
    driver: bridge

volumes:
  zookeeper:
  kafka:

Support for REST APIs

Once you start a ksql-server, you can do cool REST API calls such as

Running a simple request

## Simple KSQL
curl -X "POST" "http://server:port/ksql";; \
     -H "Content-Type: application/json; charset=utf-8" \
     -d $'{
  "ksql": "LIST STREAMS;",
  "streamsProperties": {}
}'

or retrieving streaming data

## Stream Query
curl -X "POST" "http://server:port/query";; \
     -H "Content-Type: application/json; charset=utf-8" \
     -d $'{
  "ksql": "SELECT * FROM TEST_A;",
  "streamsProperties": {}
}'

This is a neat way to query KSQL from elsewhere, particularly for ad-hoc information. Even better when it runs under https. Some authorization/authentication capabilities in future would also be valuable.

In the documentation, the REST component is only getting a short mention, and it's used internally within the CLI and on quickstart.html. Is it planned to support it in the production release? #

Self join throws Invalid topology building: Topic has already been registered by another source.

In SQL, we often use self joins to obtain data from the same table multiple times in the same query, for example on parent-child relationships etc.

This statement for example

SELECT \
    p1.view_count AS view_count_p1, \
    p5.view_count AS view_count_p5 \
FROM \
    pageviews_avg_1b p1 \
    LEFT JOIN pageviews_avg_1b p5 \
    ON p1.min_viewtime  = p5.max_viewtime;

throws "Invalid topology building: Topic PAGEVIEWS_AVG_1B has already been registered by another source.".

Are self joins not supported yet?

The log shows


 ERROR Invalid topology building: Topic PAGEVIEWS_AVG_1B has already been registered by another source.
org.apache.kafka.streams.processor.TopologyBuilder.validateTopicNotAlreadyRegistered(TopologyBuilder.java:659)
org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:538)
org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:240)
org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:182)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildSource(PhysicalPlanBuilder.java:422)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:110)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildJoin(PhysicalPlanBuilder.java:497)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:112)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildProject(PhysicalPlanBuilder.java:363)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:119)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildOutput(PhysicalPlanBuilder.java:137)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:127)
io.confluent.ksql.physical.PhysicalPlanBuilder.kafkaStreamsDsl(PhysicalPlanBuilder.java:104)
io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:100)
io.confluent.ksql.QueryEngine.buildQueryPhysicalPlan(QueryEngine.java:221)
io.confluent.ksql.QueryEngine.buildPhysicalPlans(QueryEngine.java:199)
io.confluent.ksql.KsqlEngine.planQueries(KsqlEngine.java:139)
io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:125)
io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter.<init>(QueryStreamWriter.java:53)
io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource.streamQuery(StreamedQueryResource.java:68)

DESCRIBE provides ksql definition

it would be great to have an ability to get KSQL definition of existing tables and streams.
use case: experimenting in development and moving structures to production

Unexpected token "@"

ksql> CREATE STREAM my_logs_stream  ( "@timestamp" varchar, "@version" bigint, message 
varchar, logger_name varchar, thread_name varchar, level varchar, level_value bigint, HOSTNAME 
varchar, app varchar) WITH (kafka_topic='applogs', value_format='JSON');

SELECT app,  HOSTNAME, "@version" FROM my_logs_stream LIMIT 3;

Error:

Line 1, Column 17: Unexpected token "@"

unfortunately my JSON data looks like this

{  
   "@timestamp":"2017-08-30T06:48:05.149+00:00",
   "@version":1,
   "message":"cancel()",
   "logger_name":"ss-QuoteGenerator",
   "thread_name":"reactor-http-nio-4",
   "level":"INFO",
   "level_value":20000,
   "HOSTNAME":"weqeqe",
   "app":"stream-service"
}

LEFT JOIN query results in java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed

Initially we ran into this on on of our own queries. We have reproduced it with the pageviews table from the quickstart:

CREATE STREAM pageviews \
   (viewtime BIGINT, \
    userid VARCHAR, \
    pageid VARCHAR) \
   WITH (kafka_topic='pageviews', \
         value_format='DELIMITED', \
         key='pageid', \
         timestamp='viewtime');

create table pageviews_avg as select userid, sum(viewtime) / count(*) as avg_viewtime, min(viewtime) as min_viewtime, max(viewtime) as max_viewtime, count(*) as view_count from pageviews window session (60 seconds) group by userid;

SELECT p.userid, p.viewtime, pa.avg_viewtime FROM pageviews p LEFT JOIN pageviews_avg pa ON p.userid = pa.userid;

On the SELECT statement, KSQL errors with

java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
Query terminated


and the CLI log shows these errors

[2017-09-22 12:23:20,633] ERROR stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Streams application error during processing:  (org.apache.kafka.streams.processor.internals.StreamThread:527)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
	at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
	at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:20,660] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,028] ERROR Exception occurred while writing to connection stream:  (io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter:105)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
	at org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
	at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
	at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
	at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
[2017-09-22 12:23:21,032] WARN stream-thread [ksql_transient_7722472958265653440_1506082995954-66bcd070-20e1-4769-a599-43dfcaa2d723-StreamThread-47] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:985)
[2017-09-22 12:23:21,036] ERROR java.lang.String cannot be cast to org.apache.kafka.streams.kstream.Windowed
org.apache.kafka.streams.kstream.internals.WindowedSerializer.serialize(WindowedSerializer.java:33)
org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:173)
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:113)
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:190)
org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:671)
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
 (io.confluent.ksql.cli.console.Console:130)

Question about query

Apologies if this is not the right channel to ask about this but I would like to know if it is possible to create a query such as:

CREATE STREAM active_users AS SELECT user_id, user_name FROM user_created_events WHERE user_id NOT IN (SELECT user_id FROM user_deleted_events)

Or somehow implement this behaviour in a different way.

CREATE TABLE hangs with null key in the source topic

I am trying to create tables from my own data via Connect / JDBC (JSON converted with default /etc/kafka/connect-standalone.properties) and it seems the Connect's output topic that I use as a source for the KSQL CREATE TABLE statement needs a key to make it work (I had to use a ValueToKey transform in Connect so the topic's key is not null). Otherwise, the table is created but the SELECT * FROM table hangs.

I have also tried the WITH KEY option at table creation, without success. If I use CREATE STREAM I don't need this key.

Can you confirm or infirm this need of a key in the source topic for table creation ?

Running 'ksql-cli local' on different kafka port from default

Hi all,

I'm trying to run 'ksql-cli local' using another kafka server instance which runs on a different port than the default one (port 10092) and is not working.

Everytime when ksql-cli starts it uses the default port for the bootstrap.server (TCP:9092), even that in the config file I set another port (TCP:10092).

The setting was made after the ksql compilation and also before the compilation and in the both cases the result was the same.

The contents of the ksqlserver.properties config file:

cat config/ksqlserver.properties

bootstrap.servers=localhost:10092
application.id=ksql_server_quickstart
ksql.command.topic.suffix=commands

listeners=http://localhost:8080

The output of ./bin/ksql-run-class io.confluent.ksql.Ksql "local":

(io.confluent.ksql.util.KsqlConfig:223)
[2017-10-03 17:30:22,204] INFO KsqlConfig values:
        application.id = ksql_
        application.server =
        bootstrap.servers = [localhost:9092]
        buffered.records.per.partition = 1000
        cache.max.bytes.buffering = 10000000
        client.id =
        commit.interval.ms = 2000
        connections.max.idle.ms = 540000
        default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
        default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
        default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
        key.serde = null
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        num.standby.replicas = 0
        num.stream.threads = 4
        partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
        poll.ms = 100
        processing.guarantee = at_least_once
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        replication.factor = 1
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        rocksdb.config.setter = null
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        state.cleanup.delay.ms = 600000
        state.dir = /tmp/kafka-streams
        timestamp.extractor = null
        value.serde = null
        windowstore.changelog.additional.retention.ms = 86400000
        zookeeper.connect =

ksql-cli hangs after server maintenance, WARN No records received after 30 seconds of polling in log

We had to cycle our ksql dev box. Sadly KSQL hadn't been played with for a few days; however after the server maintenance we attempted to go back into the CLI in local mode (non-dockerized) which did not show any response or error after launching it.

The CLI started to repeat

[2017-09-21 15:17:33,781] WARN No records received after 30 seconds of polling; something may be wrong (io.confluent.ksql.rest.server.computation.CommandStore:153)

After updating our build to the latest release the issue persisted. We then deleted test topics in this order

ksql_transient*

ksql_transient_528162554338760763_1505245884471-TEST_Atransient_-changelog
ksql_transient_528162554338760763_1505245884471-TEST_Atransient_-repartition
ksql_transient_6259632277308348632_1505245661426-TEST_Atransient_-changelog
ksql_transient_6259632277308348632_1505245661426-TEST_Atransient_-repartition
ksql_transient_7529857592400492031_1505246139152-TEST_Atransient_-changelog
ksql_transient_7529857592400492031_1505246139152-TEST_Atransient_-repartition

ksql_query_1-KSQL_Agg_Query*

ksql_query_1-KSQL_Agg_Query_1505245639135-changelog	
ksql_query_1-KSQL_Agg_Query_1505245639135-repartition	

and finally
ksql__commands

The CLI was stopped and started after each of the 3 groups of topics. After deleting ksql_commands (the topic was empty) and restarting the CLI we got to the Welcome Screen and command prompt again.

A full log file is here ksql_hung.txt

I could not reproduce the issue after that, will add info if a pattern emerges.

CREATE STREAM AS SELECT is not exists KEY property

First
I execute a create stream as select statement, but recieve a error message

ksql> CREATE STREAM test_timestamp2  WITH (kafka_topic='test_time',key='flag',partitions=3,replications=3,timestamp='genTime') as select flag,id,username,money,stringtotimestamp(create_time,'yyyy-MM-dd HH:mm:ss') as genTime, random from test1;
io.confluent.ksql.util.KsqlException: Invalid config variable in the WITH clause: KEY

then I review the source code, that the actually property is PARTITION_BY.
The document is inconsistent with the code

Kafka avro schema data not reflecting from topic

When creating a stream from kafka topic having records using avro schema values are not getting mapped to column names properly. Some field does not read values and others have first character truncated.

"Query terminated" not appropriate

when running a query in ksql such as:

SELECT * FROM users

and then when you press Ctrl-C the message "Query terminated" appears. The wording is inappropriate because the query continues to run as documented and as can be seen using "show queries".

A better wording would be: "Query output interrupted" or maybe "Query output stopped".

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.