Giter Site home page Giter Site logo

flink-siddhi's People

Contributors

aagupta1 avatar aparup avatar dr0na avatar haoch avatar lizhizhou avatar pranjal0811 avatar tammypi avatar wittyameta avatar wujinhu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-siddhi's Issues

Control Event is broadcasted twice Bug

Control stream is broadcasted twice to the downstream operator.

As control stream broadcasted first, then union with event stream, then the union stream goes to Add Route Operator. Where all the mapping is done with the event and control stream.

Then the dynamic partitioning takes place, If the upstream is NamedControlStream then it got broadcasted again to Siddhi-cep-operator. Which degrades the performance and increases the siddhi manager threads on each parallel instance.

I am raising PR for solving this issue.

Pattern with group by query bug

I have the query like this - "from every e1=firewallStream[name == 'A']<3> -> e2=firewallStream [ name == 'B' ] within 40 seconds select 'AAAA' as ruleId group by e1[0].signatureId insert into outputstream"

when I execute it I got the error - e1 does not exist. The same query works well with the siddhi simulator.

here - https://github.com/siddhi-io/siddhi/blob/0c6b8b514e4f163c41fe8872f97040e923d0433f/modules/siddhi-query-api/src/test/java/io/siddhi/query/api/PatternQueryTestCase.java

支持动态更新CEP规则

请问flink-siddhi支持动态更新CEP规则吗?一般的套路是什么?有没有示例?谢谢!

ArrayIndexOutOfBoundsException

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinition(SiddhiStreamSchema.java:49)
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinitionExpression(SiddhiStreamSchema.java:64)
at org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext.getAllEnrichedExecutionPlan(SiddhiOperatorContext.java:113)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.validate(AbstractSiddhiOperator.java:295)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.(AbstractSiddhiOperator.java:182)
at org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator.(SiddhiStreamOperator.java:42)
at org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:36)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:431)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:417)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsRow(SiddhiStream.java:366)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsRow(SiddhiStream.java:355)

Dynamic CEP doesn't work on Flink 1.8.2

With AWS Kinesis Flink 1.8
Flink application built on flink 1.8.2 and flink-siddhi 0.2.2-SNAPSHOT or 0.2.1 both versions failed with below error:

messageType
INFO
threadName
flink-akka.actor.default-dispatcher-2
throwableInformation.0
java.lang.AbstractMethodError
throwableInformation.1
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
throwableInformation.2
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
throwableInformation.3
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
throwableInformation.4
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
throwableInformation.5
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
throwableInformation.6
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
throwableInformation.7
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:65)
throwableInformation.8
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
throwableInformation.9
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
throwableInformation.10
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
throwableInformation.11
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
throwableInformation.12
at java.lang.Thread.run(Thread.java:748)

Partition query in flink siddhi

I am defining the query like this -

partition with ( deviceID of TempStream) begin from every e1=TempStream[roomNo == 5]<3> -> e2=TempStream [ roomNo == 6] within 30 seconds select 'partition' as str, e1.roomNo as room insert into DeviceTempStream; end;

got the exception!
throw new Exception("Unhandled execution element: " + executionElement.toString());

As the support was for supporting only the query execution. I have made some changes to support the partition execution element from the Sidhhi Manager itself. So the above query is running fine. But My question is, Would it be better to use partitioning from the query itself.

I am raising PR for the code change

Dynamically creating Siddhi Streams

Hi Haoch,

We are using Flink and Siddhi to build an alerting framework for the security domain.

One thing that we'd like to be able to do is create ad-hoc Siddhi streams based on the rule that a user defines - I want to be able to pull in fields from multiple input streams / schemas and create a new Siddhi stream on the fly which consists of the field names mentioned in the rule that the user created.

We've researched both Apache Eagle and the Flink-Siddhi project and from what we can tell, this is not supported functionality.

Any idea how we could achieve this ?

--Aarti

flink-siddhi当前不支持完整的siddhi功能和扩展?

当前我想实现一个UI+siddhiEngine,功能对其siddhi-distribution + runner,其中runner用flink代替,但flink-siddhi支持实现一部分siddhi或者product-sp功能?

flink-siddhi如何支持完整的siddhi功能呢?比如:

@source(type = 'http',
receiver.url='http://localhost:8006/productionStream',
basic.auth.enabled='false',
@Map(type='json'))
define stream SweetProductionStream (name string, amount double);

@sink(type='log')
define stream TotalCountStream (totalCount long);

-- Count the incoming events
@info(name='query1')
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;

Returning Query Id in Output

When multiple QueryHandlers sharing the same Output Stream are registered with Siddhi, is there a way to tell which Query triggered the output results ?

Based on the query that triggered the output, I would like to take further action on the query results.

If this feature is not supported currently by the library, how can I add support ? Pointers on getting started would be helpful.

Defining Tables

Hi,

Is it possible to define tables in Flink siddhi wrapper?

like I have the query like - "define table test_table(name string,id string)"
with the provided cep context. I can define stream like cep.registerStream(streamId, stream,fields).

Any suggestions would be appreciated.

Thanks

在Flink环境下运行,自定义函数抛出异常 “XXX is neither a function extension nor an aggregated attribute extension”

自定义了一个函数geo:getGeohashCode,在纯siddhi环境中可以正常运行,在flink-siddhi环境下,按文档的要求增加了函数的注册
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("geo:getGeohashCode", GetGeohashCode.class);
在Idea编译器中可以正常运行,执行jar包抛出异常:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error on '1136e420-2a7d-4f2b-82df-7f674737add3' @ Line: 3. Position: 128, near 'geo:getGeohashCode(lat, lon, 10)'. 'getGeohashCode' is neither a function extension nor an aggregated attribute extension at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
请问各位大牛这个问题如何解决。

A null pointer exception occurred, but I don't know why.

details:
2019-10-11 13:42:26,364 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job (ab76398582b1ffb8b58edb1a3bcd8dcd) switched from state RUNNING to FAILING.

java.lang.NullPointerException
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:88)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
2019-10-11 13:42:26,377 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: rocketmq-source -> Filter -> Flat Map -> Map (1/1) (232afd5328528608f6531fed6b4f1179) switched from RUNNING to CANCELING.

2019-10-11 13:42:26,384 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/1) (a16760312d3a947f709749bfc727a4d8) switched from RUNNING to CANCELING.

2019-10-11 13:42:26,385 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CEP: Unnamed (0ed54446-5e10-4431-9b49-e612a20c0a4d) -> Map -> Flat Map -> Sink: Unnamed (1/1) (f945babb746258fd0ede36a4550efbe7) switched from RUNNING to CANCELING.

2019-10-11 13:42:26,399 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/1) (a16760312d3a947f709749bfc727a4d8) switched from CANCELING to CANCELED.

2019-10-11 13:42:26,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CEP: Unnamed (0ed54446-5e10-4431-9b49-e612a20c0a4d) -> Map -> Flat Map -> Sink: Unnamed (1/1) (f945babb746258fd0ede36a4550efbe7) switched from CANCELING to CANCELED.

2019-10-11 13:42:26,518 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: rocketmq-source -> Filter -> Flat Map -> Map (1/1) (232afd5328528608f6531fed6b4f1179) switched from CANCELING to CANCELED.

Integrate with kubernetes

  • Runtime: Use kubernetes as flink resource manager.
  • Metadata: Use ConfigMap as siddhi execution plan store and coordination.

有没有一个测试demo

有没有一个关于flink-siddhi的小demo,我报了一个错Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/typeutils/TypeInfoParser 不知道是什么引起的,谢谢

Are there any flink-siddhi test demos

I am using flink-siddhi, no matter how I write the CQL result, it is the data I input. The setting conditions do not work. I just came into contact with flink-siddhi.Therefore, I would like to ask seniors to show me the test demo they have written.Thanks a lot

Sample json of the ControlEvent

Hi,

Can someone provide the sample ControlEvent json that can be push to like Kafka topic please?
I'm not able to make sense the MetadataControlEvent.builder.

Thanks,
Calvin

[Question]Why need to do siddhi snapshot when processing each element ?

hi, haoch

With AbstractSiddhiOperator, we already have snapshotState using flink checkpoint, why do we still need to checkpointSiddhiRuntimeState or checkpointRecordQueueState when processing every element ?

code as

@OverRide
public void processElement(StreamRecord element) throws Exception {
if (isControlStream(element.getValue())) {
this.onEventReceived(getControlEvent(element.getValue()));
return;
}
String streamId = getStreamId(element.getValue());
StreamSchema schema = siddhiPlan.getInputStreamSchema(streamId);

    if (isProcessingTime) {
        processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
        this.checkpointSiddhiRuntimeState();
    } else {
        PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
        // event time processing
        // we have to buffer the elements until we receive the proper watermark
        if (getExecutionConfig().isObjectReuseEnabled()) {
            // copy the StreamRecord so that it cannot be changed
            priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
        } else {
            priorityQueue.offer(element);
        }
        this.checkpointRecordQueueState();
    }
}

control event pileline does not take effect

I found a strange problem,When the control event data source is in a continuous running state, such as kafka, or simple datasource with an infinite loop , the execution plan added by the control event does not take effect and the rules cannot be dynamically added or deleted.
When the control event source is in the finished state, the rule can take effect. I don't know if it is my operation problem.

Getting error "no viable alternative at input"

I have a rule like this defined.

"from inputstream[ data contains 'x' ] select data insert into outputstream;"

Can you please help me in understanding what is the error here?

Exception:

Caused by: io.siddhi.query.compiler.exception.SiddhiParserException: Error between @ Line: 1. Position: 0 and @ Line: 1. Position: 63. Syntax error in SiddhiQL, no viable alternative at input 'inputstream[ data contains'.
at io.siddhi.query.compiler.internal.SiddhiErrorListener.syntaxError(SiddhiErrorListener.java:36)
at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)
at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310)
at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136)
at io.siddhi.query.compiler.SiddhiQLParser.query_input(SiddhiQLParser.java:2818)
at io.siddhi.query.compiler.SiddhiQLParser.query(SiddhiQLParser.java:2718)
at io.siddhi.query.compiler.SiddhiQLParser.execution_element(SiddhiQLParser.java:510)
at io.siddhi.query.compiler.SiddhiQLParser.siddhi_app(SiddhiQLParser.java:444)
at io.siddhi.query.compiler.SiddhiQLParser.parse(SiddhiQLParser.java:178)
at io.siddhi.query.compiler.SiddhiCompiler.parse(SiddhiCompiler.java:73)
at org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner.parse(SiddhiExecutionPlanner.java:77)
at org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner.getStreamPartitions(SiddhiExecutionPlanner.java:249)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.handleExecutionPlan(AddRouteOperator.java:161)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.handleMetadataControlEvent(AddRouteOperator.java:123)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:62)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.base/java.lang.Thread.run(Thread.java:830)

Add support for flink 1.9

I am running some experiments lately using flink-siddhi and I saw that flink 1.9 is not supported. Is suppoting flink 1.9 in the near-feature roadmap? If so, I might be interested in collaborating with this.

custom function in an external jar

In my case user can define custom function in their own project and package a jar file, then upload it to a common local path(~/plugins). Flink application(we call it engine project) should load all jars and register the custom functions of the plugin jars.

Following is the code:
`SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);

String pluginSql = "select class_path,function_name,file_path from plugins";

List<Map<String,Object>> pluginRtn = PgUtil.getInstance(host,port,database,username,password).query(pluginSql);

for(int i=0;i<pluginRtn.size();i++){

URL url = new URL("file:"+pluginRtn.get(i).get("file_path"));

URLClassLoader classLoader = new URLClassLoader(new URL[] { url }, Thread.currentThread()
                    .getContextClassLoader());

Class myClass = classLoader.loadClass(String.valueOf(pluginRtn.get(i).get("class_path")));

cep.registerExtension(String.valueOf(pluginRtn.get(i).get("function_name")), (Class<FunctionExecutor>)myClass);

}`

My flink application run on yarn so my command like below:
./flink run -m yarn-cluster -ynm xxtest -ys 4 -yn 10 -ytm 5120 -p 1 -yt ~/plugins -c com.xxxx.MyMainClass ~/xxxxx.jar

The parameter "-yt ~/plugins" define that should ship all files in ~/plugins.

But I encounter a problem that If I upload a plugin(slen()) and use it, the application will throw an exception:
Error on '25d79ef0-b5e5-40ad-9014-0a70ae1851eb' @ Line: 1. Position: 5613, near 'slen(sip)'. 'slen' is neither a function extension nor an aggregated attribute extension

If I copy all function code from plugin project to the engine project then it will work very well.

So could you please tell me if there exist any way register and use custom function of external jar in flink-siddhi ?

Wish your reply, thanks!

Restoring controlevent state from savepoint or checkpoint

Hi Hoach,

I am using flink siddhi wrapper for the dynamic rule. when I try to restore my job from savepoint or restarting the job from failure by checkpointing. I am not able to recover the rules from the state.

Do this wrapper support state management? So, that we can recover the rule from the savepoint.

Thanks,
Pranjal

Support Siddhi QueryCallback

Hi @haoch, Siddhi supports both StreamCallback and QueryCallback.

The flink-siddhi library only has support for StreamCallback as far as I can tell.

I'd like to be able to register a QueryCallback as well.

Do you mind if I work on this ?

测试用例最后一个例子为什么会空指针异常

测试用例最后一个例子为什么会空指针异常

   @Test
    public void testDynamicalStreamSimplePatternMatch2() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Event> input1 = env.addSource(new RandomEventSource(30).setName("event_stream_1"));

        DataStream<ControlEvent> controlStream = env.addSource(new SourceFunction<ControlEvent>() {
            @Override
            public void run(SourceContext<ControlEvent> sourceContext) throws InterruptedException {
                sourceContext.collect(MetadataControlEvent.builder()
                        .addExecutionPlan("1", "from input select  *  insert into output;")
                        .build());
                sourceContext.collect(OperationControlEvent.enableQuery("1"));

            }

            @Override
            public void cancel() {
            }
        });
        SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
        cep.registerStream("input",input1,"id","name");
        cep.from("input")
                .cql(controlStream).returnAsRow("output").print();

        //SiddhiCEP.define("input", input1,"id","name")

        env.execute();
    }

If input is mixed with delayed data cep cannot output correct

Below is the code:

package org.apache.flink.streaming.siddhi.testdelaydata;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import javax.annotation.Nullable;
import java.io.Serializable;
class MyTimestampExtractor implements AssignerWithPeriodicWatermarks<MyData>,Serializable{
    private long currentMaxTimestamp;
    private long maxOutOfOrderness = 10000L;
    private long preWaterMark = 0L;
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        long currentWaterMark = 0L;
        currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
        if(preWaterMark > 0L && currentWaterMark == preWaterMark){
            currentMaxTimestamp = currentMaxTimestamp + 1000;
            currentMaxTimestamp = Math.min(System.currentTimeMillis(), currentMaxTimestamp);
            currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
        }
        preWaterMark = currentWaterMark;
        return new Watermark(currentWaterMark);
    }
    @Override
    public long extractTimestamp(MyData element, long previousElementTimestamp) {
        long timestamp = element.getTimestamp()*1000L;
        currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
        return timestamp;
    }
}
public class TestDelayData {
    public static void main(String[] args){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        ExecutionConfig executionConfig = env.getConfig();
        executionConfig.setAutoWatermarkInterval(1000L);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<MyData> inputstream = env.addSource(new SourceFunction<MyData>() {
            @Override
            public void run(SourceContext<MyData> ctx) throws Exception {
                for(int j=0;j<10;j++){
                    int time = Integer.valueOf(String.valueOf(System.currentTimeMillis()/1000));
                    MyData mydata1 = new MyData(time, "1.1.1.1", "1.1.1.2", 1);
                    ctx.collect(mydata1);
                    Thread.sleep(1000l);
                    for(int i=0;i<10000;i++){
                        MyData mydata2 = new MyData(1552890430, "1.1.1.1", "1.1.1.2", 2);
                        ctx.collect(mydata2);
                    }
                    MyData mydata3 = new MyData(time+3, "1.1.1.1", "1.1.1.2", 3);
                    ctx.collect(mydata3);
                }
                while (true){
                    Thread.sleep(1000);
                }
            }
            @Override
            public void cancel() {
            }
        });
        DataStream outstream = SiddhiCEP.define("inputstream", inputstream.keyBy("sip", "dip").assignTimestampsAndWatermarks(new MyTimestampExtractor()),
                "timestamp", "sip", "dip", "logtype")
                .cql("from every s1=inputstream[logtype==1] -> s2=inputstream[logtype==3 and sip==s1.sip and dip==s1.dip] within 10 second " +
                        "select s1.timestamp as timestamp,s1.sip as sip,s1.dip as dip,s1.logtype as logtype,'mycep' as name insert into outstream")
                .returnAsRow("outstream");
        outstream.print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

CEP output is less than expect

Hi. I use CEP SQL like every(1) followed by 2. ( every(1)->2 )
The input data like below:
1 1 1 2
And my expect output is:
{1,2} {1,2} {1,2}

But the actual output is only:
{1,2}

The siddhi stream process's output is match my expect, but the ouput of flink-siddhi is not. Now I'm every confused.

Could you please tell me how to get my expect output ?

haoch/flink-siddhi vs bahir-flink/flink-library-siddhi

The early version of https://github.com/apache/bahir-flink/tree/master/flink-library-siddhi was contributed from https://github.com/haoch/flink-siddhi, but haoch/flink-siddhi under active development has more advanced features which are pending for approval from community to be merged into apache/bahir-flink/flink-library-siddhi. As flink community may not actively run community for third-party libraries, so eventually the flink-siddhi library will be maintained on current repository directly to ensure all latest features could be published as soon as possible.

KryoSerializer Exception

StreamOutputHandler.receive exception when executing multiple Query
Is Output<StreamRecord> output thread safe?

java.lang.ArrayIndexOutOfBoundsException: -1
	at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
	at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
	at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
	at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
	at org.apache.flink.streaming.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:70)

Is any possible make flink-siddhi worked in flink 1.4.x

hi, haoch.
My company use flink 1.4.x now, since flink-siddhi declare prerequisites for flink 1.7.0, I am wondering is that possible make it works in 1.4.x? Does it use some advanced feature only support by flink 1.7.0?
thank you.

Dynamic Logical Partitioning on event and control stream

Hi @haoch

How the operator state behaves in parallelism?

As I can see in the class org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator we are using the managedOperatorState.

Is there a way to partition the rules and events based on some key and use the keyedManagedState?
Just curious about the parallelism and performance. Do we have any performance measures?

For example: I have the siddhi query which expects 3 Failure events followed by 1 successful event like this - from every e1=firewallStream[name == 'FAILURE']<3> -> e2=firewallStream [ name == 'SUCCESS'] within 1 min select 'AAAAAAAA-AAAA-AAAA-BBBB-AAAAAAAAAAAA' as ruleId, e1.externalId as eventIds insert into outputStream

If I set the Siddhi-CEP operator parallelism to 1. The above query generates the correct result as all the events go to one operator instance. But If I increase the parallelism to the operator more than 1. then the events are distributed to more instance of the operator. The query is failed to generate the result as the state is not shared between the parallel instance of the operator.

处理速度慢

你好,我们在9台机器搭建的flink机器(每台机器的配置都是8core-16g内存)上对flink-siddhi进行测试。发现这九个机器的cpu都比较高几乎都达到95%以上。对高cpu的任务进行了排查。发现下面这个算子占用的cpu比较高。
不知道你们有没有发现这个问题。
image
而且我们测试1分钟/400万条的数据,设置1个规则,9台机器并行度为40,1分钟也消费不了400万条的数据量。不知道是不是我们使用方式有问题。

Flink 1.2 Snapshot

Hello,

Current release of flink-siddhi is using 1.2-SNAPSHOT flink right?
I'm getting the following error which seems to be a class from previous versions (until 1.1.4 this class was there):

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer
        at org.apache.flink.contrib.siddhi.operator.SiddhiStreamOperator.createStreamRecordSerializer(SiddhiStreamOperator.java:46)
        at org.apache.flink.contrib.siddhi.operator.AbstractSiddhiOperator.registerStreamRecordSerializers(AbstractSiddhiOperator.java:120)
        at org.apache.flink.contrib.siddhi.operator.AbstractSiddhiOperator.<init>(AbstractSiddhiOperator.java:112)
        at org.apache.flink.contrib.siddhi.operator.SiddhiStreamOperator.<init>(SiddhiStreamOperator.java:40)
        at org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:31)
        at org.apache.flink.contrib.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:276)
        at org.apache.flink.contrib.siddhi.SiddhiStream$ExecutionSiddhiStream.returns(SiddhiStream.java:239)

Thank you

custom function cannot use in retturn()

If I register a custom function like this:
cep.registerExtension("str:groupConcat", GroupConcatFunctionExtension.class);

Below code will return an exception like 'groupConcat() is neither a function or aggregate':
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returns("outstream2");

But if I change returns() to returnAsMap(), then it will works very well:
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returnAsMap("outstream2");

flink+siddhi vs 单纯siddhi性能对比

在不考虑扩展性的前提下
flink+siddhi,flink一个主要做数据分发,真正计算的是siddhi在操作,这个过程,数据分发的网络开销(数据的序列化和反序列化),还有flink的数据结构转化成siddhi的结构
如果只用siddhi,这些都可以省去,性能会不会更好,因为是单机程序运维成本也会更好

比如,5台64G机器,应用跑在flink siddhi上,和用siddhi 分5个应用跑在5个机器上,不考虑数据准确性前提下,那个性能会更好
感谢!

data ttl

we are using siddhi to process the realtime event collected online. i want to know the TTL of event processed.
eg. when an event belong to stream testStream arrived,will the event always be available untill restart or shutdown siddhi cluster? if the event group by time window, the event will be discard once it is expired?

i guess the strategy above will affect something like memory use and the perfomance of docker run the siddhi.

TimeWindowSample NOT worked as expected

我将WSO2 Siddhi repo下的示例 TimeWindowSample 用 flink-siddhi 库改写并运行,发现和原生的示例输出结果并不相同,难道是我哪里写得不对吗?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple4<String, Float, Long, Long>> stockStream = env.fromElements(
        Tuple4.of("IBM", 100f, 100L, 1000L),
        Tuple4.of("IBM", 200f, 300L, 2000L),
        Tuple4.of("WSO2", 60f, 200L, 2000L),
        Tuple4.of("WSO2", 70f, 400L, 3000L),
        Tuple4.of("GOOG", 50f, 30L, 3000L),
        Tuple4.of("IBM", 200f, 400L, 4000L),
        Tuple4.of("WSO2", 70f, 50L, 6000L),
        Tuple4.of("WSO2", 80f, 400L, 8000L),
        Tuple4.of("GOOG", 60f, 30L, 8000L)
);
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
SiddhiStream.SingleSiddhiStream stream = cep.define("StockStream", stockStream, "symbol", "price", "volume", "timestamp");
DataStream output = stream
        .cql("from StockStream#window.time(5 sec) select symbol, sum(price) as price, sum(volume) as volume group by symbol insert into AggregateStockStream")
        .returnsTransformRow("AggregateStockStream");
output.print();
env.execute();

输出结果是

IBM,100.0,100
GOOG,60.0,30
WSO2,80.0,400
WSO2,150.0,450
IBM,300.0,500
GOOG,110.0,60
WSO2,220.0,850
WSO2,280.0,1050
IBM,500.0,800

原生输出结果是

[Event{timestamp=1576396964357, data=[IBM, 100.0, 100], isExpired=false}]
[Event{timestamp=1576396965368, data=[IBM, 300.0, 400], isExpired=false}]
[Event{timestamp=1576396965369, data=[WSO2, 60.0, 200], isExpired=false}]
[Event{timestamp=1576396966373, data=[WSO2, 130.0, 600], isExpired=false}]
[Event{timestamp=1576396966373, data=[GOOG, 50.0, 30], isExpired=false}]
[Event{timestamp=1576396967373, data=[IBM, 500.0, 800], isExpired=false}]
[Event{timestamp=1576396969375, data=[WSO2, 200.0, 650], isExpired=false}]
[Event{timestamp=1576396971380, data=[WSO2, 150.0, 450], isExpired=false}]
[Event{timestamp=1576396971380, data=[GOOG, 60.0, 30], isExpired=false}]

Bug: ArrayOutofBoundsException thrown when using a Siddhi event stream of type DataStream<Map<String,Object>>>

Stack trace -

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinition(SiddhiStreamSchema.java:49)
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinitionExpression(SiddhiStreamSchema.java:64)
at org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext.getAllEnrichedExecutionPlan(SiddhiOperatorContext.java:113)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.validate(AbstractSiddhiOperator.java:291)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.(AbstractSiddhiOperator.java:178)
at org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator.(SiddhiStreamOperator.java:41)
at org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:31)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:331)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:327)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsMap(SiddhiStream.java:287)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.