haoch / flink-siddhi Goto Github PK
View Code? Open in Web Editor NEWA CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
License: Apache License 2.0
A CEP library to run Siddhi within Apache Flink™ Streaming Application (Not maintained)
License: Apache License 2.0
Control stream is broadcasted twice to the downstream operator.
As control stream broadcasted first, then union with event stream, then the union stream goes to Add Route Operator. Where all the mapping is done with the event and control stream.
Then the dynamic partitioning takes place, If the upstream is NamedControlStream then it got broadcasted again to Siddhi-cep-operator. Which degrades the performance and increases the siddhi manager threads on each parallel instance.
I am raising PR for solving this issue.
I have the query like this - "from every e1=firewallStream[name == 'A']<3> -> e2=firewallStream [ name == 'B' ] within 40 seconds select 'AAAA' as ruleId group by e1[0].signatureId insert into outputstream"
when I execute it I got the error - e1 does not exist. The same query works well with the siddhi simulator.
用官方的org.wso2.extension.siddhi.execution下的扩展不注册也能使用
请问flink-siddhi支持动态更新CEP规则吗?一般的套路是什么?有没有示例?谢谢!
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinition(SiddhiStreamSchema.java:49)
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinitionExpression(SiddhiStreamSchema.java:64)
at org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext.getAllEnrichedExecutionPlan(SiddhiOperatorContext.java:113)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.validate(AbstractSiddhiOperator.java:295)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.(AbstractSiddhiOperator.java:182)
at org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator.(SiddhiStreamOperator.java:42)
at org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:36)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:431)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:417)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsRow(SiddhiStream.java:366)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsRow(SiddhiStream.java:355)
With AWS Kinesis Flink 1.8
Flink application built on flink 1.8.2 and flink-siddhi 0.2.2-SNAPSHOT or 0.2.1 both versions failed with below error:
messageType
INFO
threadName
flink-akka.actor.default-dispatcher-2
throwableInformation.0
java.lang.AbstractMethodError
throwableInformation.1
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
throwableInformation.2
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
throwableInformation.3
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
throwableInformation.4
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
throwableInformation.5
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
throwableInformation.6
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
throwableInformation.7
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:65)
throwableInformation.8
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
throwableInformation.9
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
throwableInformation.10
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
throwableInformation.11
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
throwableInformation.12
at java.lang.Thread.run(Thread.java:748)
I am defining the query like this -
partition with ( deviceID of TempStream) begin from every e1=TempStream[roomNo == 5]<3> -> e2=TempStream [ roomNo == 6] within 30 seconds select 'partition' as str, e1.roomNo as room insert into DeviceTempStream; end;
got the exception!
throw new Exception("Unhandled execution element: " + executionElement.toString());
As the support was for supporting only the query execution. I have made some changes to support the partition execution element from the Sidhhi Manager itself. So the above query is running fine. But My question is, Would it be better to use partitioning from the query itself.
I am raising PR for the code change
Hi Haoch,
We are using Flink and Siddhi to build an alerting framework for the security domain.
One thing that we'd like to be able to do is create ad-hoc Siddhi streams based on the rule that a user defines - I want to be able to pull in fields from multiple input streams / schemas and create a new Siddhi stream on the fly which consists of the field names mentioned in the rule that the user created.
We've researched both Apache Eagle and the Flink-Siddhi project and from what we can tell, this is not supported functionality.
Any idea how we could achieve this ?
--Aarti
I don't konw how to chose siddhi and flink-cep
当前我想实现一个UI+siddhiEngine,功能对其siddhi-distribution + runner,其中runner用flink代替,但flink-siddhi支持实现一部分siddhi或者product-sp功能?
flink-siddhi如何支持完整的siddhi功能呢?比如:
@source(type = 'http',
receiver.url='http://localhost:8006/productionStream',
basic.auth.enabled='false',
@Map(type='json'))
define stream SweetProductionStream (name string, amount double);
@sink(type='log')
define stream TotalCountStream (totalCount long);
-- Count the incoming events
@info(name='query1')
from SweetProductionStream
select count() as totalCount
insert into TotalCountStream;
From OperationControlEvent.java
public static OperationControlEvent enableQuery(String queryId) {
return new OperationControlEvent(Action.DISABLE_QUERY, queryId);
}
public static OperationControlEvent disableQuery(String queryId) {
return new OperationControlEvent(Action.ENABLE_QUERY, queryId);
}
When multiple QueryHandlers sharing the same Output Stream are registered with Siddhi, is there a way to tell which Query triggered the output results ?
Based on the query that triggered the output, I would like to take further action on the query results.
If this feature is not supported currently by the library, how can I add support ? Pointers on getting started would be helpful.
Use this topic for discussion the use cases are using flink-siddhi
.
You are very welcome to kindly share your project or experience about it.
Thanks very much!
[图片]
Hi,
Is it possible to define tables in Flink siddhi wrapper?
like I have the query like - "define table test_table(name string,id string)"
with the provided cep context. I can define stream like cep.registerStream(streamId, stream,fields).
Any suggestions would be appreciated.
Thanks
自定义了一个函数geo:getGeohashCode,在纯siddhi环境中可以正常运行,在flink-siddhi环境下,按文档的要求增加了函数的注册
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("geo:getGeohashCode", GetGeohashCode.class);
在Idea编译器中可以正常运行,执行jar包抛出异常:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Error on '1136e420-2a7d-4f2b-82df-7f674737add3' @ Line: 3. Position: 128, near 'geo:getGeohashCode(lat, lon, 10)'. 'getGeohashCode' is neither a function extension nor an aggregated attribute extension at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
请问各位大牛这个问题如何解决。
details:
2019-10-11 13:42:26,364 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job (ab76398582b1ffb8b58edb1a3bcd8dcd) switched from state RUNNING to FAILING.
java.lang.NullPointerException
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:88)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
2019-10-11 13:42:26,377 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: rocketmq-source -> Filter -> Flat Map -> Map (1/1) (232afd5328528608f6531fed6b4f1179) switched from RUNNING to CANCELING.
2019-10-11 13:42:26,384 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/1) (a16760312d3a947f709749bfc727a4d8) switched from RUNNING to CANCELING.
2019-10-11 13:42:26,385 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CEP: Unnamed (0ed54446-5e10-4431-9b49-e612a20c0a4d) -> Map -> Flat Map -> Sink: Unnamed (1/1) (f945babb746258fd0ede36a4550efbe7) switched from RUNNING to CANCELING.
2019-10-11 13:42:26,399 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map (1/1) (a16760312d3a947f709749bfc727a4d8) switched from CANCELING to CANCELED.
2019-10-11 13:42:26,403 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CEP: Unnamed (0ed54446-5e10-4431-9b49-e612a20c0a4d) -> Map -> Flat Map -> Sink: Unnamed (1/1) (f945babb746258fd0ede36a4550efbe7) switched from CANCELING to CANCELED.
2019-10-11 13:42:26,518 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: rocketmq-source -> Filter -> Flat Map -> Map (1/1) (232afd5328528608f6531fed6b4f1179) switched from CANCELING to CANCELED.
ConfigMap
as siddhi execution plan store and coordination.有没有一个关于flink-siddhi的小demo,我报了一个错Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/java/typeutils/TypeInfoParser 不知道是什么引起的,谢谢
I am using flink-siddhi, no matter how I write the CQL result, it is the data I input. The setting conditions do not work. I just came into contact with flink-siddhi.Therefore, I would like to ask seniors to show me the test demo they have written.Thanks a lot
Hi,
Can someone provide the sample ControlEvent json that can be push to like Kafka topic please?
I'm not able to make sense the MetadataControlEvent.builder.
Thanks,
Calvin
hi, haoch
With AbstractSiddhiOperator, we already have snapshotState using flink checkpoint, why do we still need to checkpointSiddhiRuntimeState or checkpointRecordQueueState when processing every element ?
code as
@OverRide
public void processElement(StreamRecord element) throws Exception {
if (isControlStream(element.getValue())) {
this.onEventReceived(getControlEvent(element.getValue()));
return;
}
String streamId = getStreamId(element.getValue());
StreamSchema schema = siddhiPlan.getInputStreamSchema(streamId);
if (isProcessingTime) {
processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
this.checkpointSiddhiRuntimeState();
} else {
PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
// event time processing
// we have to buffer the elements until we receive the proper watermark
if (getExecutionConfig().isObjectReuseEnabled()) {
// copy the StreamRecord so that it cannot be changed
priorityQueue.offer(new StreamRecord<>(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
} else {
priorityQueue.offer(element);
}
this.checkpointRecordQueueState();
}
}
Moving AddRouteOperator members in flink state for restoring after failure.
I found a strange problem,When the control event data source is in a continuous running state, such as kafka, or simple datasource with an infinite loop , the execution plan added by the control event does not take effect and the rules cannot be dynamically added or deleted.
When the control event source is in the finished state, the rule can take effect. I don't know if it is my operation problem.
I have a rule like this defined.
"from inputstream[ data contains 'x' ] select data insert into outputstream;"
Can you please help me in understanding what is the error here?
Exception:
Caused by: io.siddhi.query.compiler.exception.SiddhiParserException: Error between @ Line: 1. Position: 0 and @ Line: 1. Position: 63. Syntax error in SiddhiQL, no viable alternative at input 'inputstream[ data contains'.
at io.siddhi.query.compiler.internal.SiddhiErrorListener.syntaxError(SiddhiErrorListener.java:36)
at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41)
at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:544)
at org.antlr.v4.runtime.DefaultErrorStrategy.reportNoViableAlternative(DefaultErrorStrategy.java:310)
at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:136)
at io.siddhi.query.compiler.SiddhiQLParser.query_input(SiddhiQLParser.java:2818)
at io.siddhi.query.compiler.SiddhiQLParser.query(SiddhiQLParser.java:2718)
at io.siddhi.query.compiler.SiddhiQLParser.execution_element(SiddhiQLParser.java:510)
at io.siddhi.query.compiler.SiddhiQLParser.siddhi_app(SiddhiQLParser.java:444)
at io.siddhi.query.compiler.SiddhiQLParser.parse(SiddhiQLParser.java:178)
at io.siddhi.query.compiler.SiddhiCompiler.parse(SiddhiCompiler.java:73)
at org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner.parse(SiddhiExecutionPlanner.java:77)
at org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner.getStreamPartitions(SiddhiExecutionPlanner.java:249)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.handleExecutionPlan(AddRouteOperator.java:161)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.handleMetadataControlEvent(AddRouteOperator.java:123)
at org.apache.flink.streaming.siddhi.router.AddRouteOperator.processElement(AddRouteOperator.java:62)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.base/java.lang.Thread.run(Thread.java:830)
I am running some experiments lately using flink-siddhi and I saw that flink 1.9 is not supported. Is suppoting flink 1.9 in the near-feature roadmap? If so, I might be interested in collaborating with this.
In my case user can define custom function in their own project and package a jar file, then upload it to a common local path(~/plugins). Flink application(we call it engine project) should load all jars and register the custom functions of the plugin jars.
Following is the code:
`SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
String pluginSql = "select class_path,function_name,file_path from plugins";
List<Map<String,Object>> pluginRtn = PgUtil.getInstance(host,port,database,username,password).query(pluginSql);
for(int i=0;i<pluginRtn.size();i++){
URL url = new URL("file:"+pluginRtn.get(i).get("file_path"));
URLClassLoader classLoader = new URLClassLoader(new URL[] { url }, Thread.currentThread()
.getContextClassLoader());
Class myClass = classLoader.loadClass(String.valueOf(pluginRtn.get(i).get("class_path")));
cep.registerExtension(String.valueOf(pluginRtn.get(i).get("function_name")), (Class<FunctionExecutor>)myClass);
}`
My flink application run on yarn so my command like below:
./flink run -m yarn-cluster -ynm xxtest -ys 4 -yn 10 -ytm 5120 -p 1 -yt ~/plugins -c com.xxxx.MyMainClass ~/xxxxx.jar
The parameter "-yt ~/plugins" define that should ship all files in ~/plugins.
But I encounter a problem that If I upload a plugin(slen()) and use it, the application will throw an exception:
Error on '25d79ef0-b5e5-40ad-9014-0a70ae1851eb' @ Line: 1. Position: 5613, near 'slen(sip)'. 'slen' is neither a function extension nor an aggregated attribute extension
If I copy all function code from plugin project to the engine project then it will work very well.
So could you please tell me if there exist any way register and use custom function of external jar in flink-siddhi ?
Wish your reply, thanks!
Hi Hoach,
I am using flink siddhi wrapper for the dynamic rule. when I try to restore my job from savepoint or restarting the job from failure by checkpointing. I am not able to recover the rules from the state.
Do this wrapper support state management? So, that we can recover the rule from the savepoint.
Thanks,
Pranjal
Hi @haoch, Siddhi supports both StreamCallback and QueryCallback.
The flink-siddhi library only has support for StreamCallback as far as I can tell.
I'd like to be able to register a QueryCallback as well.
Do you mind if I work on this ?
测试用例最后一个例子为什么会空指针异常
@Test
public void testDynamicalStreamSimplePatternMatch2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input1 = env.addSource(new RandomEventSource(30).setName("event_stream_1"));
DataStream<ControlEvent> controlStream = env.addSource(new SourceFunction<ControlEvent>() {
@Override
public void run(SourceContext<ControlEvent> sourceContext) throws InterruptedException {
sourceContext.collect(MetadataControlEvent.builder()
.addExecutionPlan("1", "from input select * insert into output;")
.build());
sourceContext.collect(OperationControlEvent.enableQuery("1"));
}
@Override
public void cancel() {
}
});
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerStream("input",input1,"id","name");
cep.from("input")
.cql(controlStream).returnAsRow("output").print();
//SiddhiCEP.define("input", input1,"id","name")
env.execute();
}
非常感谢提供如此工具,目前测试发现,有60个cep规则时,事件数量350万时,处理完成需要1小时左右。
Below is the code:
package org.apache.flink.streaming.siddhi.testdelaydata;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import javax.annotation.Nullable;
import java.io.Serializable;
class MyTimestampExtractor implements AssignerWithPeriodicWatermarks<MyData>,Serializable{
private long currentMaxTimestamp;
private long maxOutOfOrderness = 10000L;
private long preWaterMark = 0L;
@Nullable
@Override
public Watermark getCurrentWatermark() {
long currentWaterMark = 0L;
currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
if(preWaterMark > 0L && currentWaterMark == preWaterMark){
currentMaxTimestamp = currentMaxTimestamp + 1000;
currentMaxTimestamp = Math.min(System.currentTimeMillis(), currentMaxTimestamp);
currentWaterMark = currentMaxTimestamp - maxOutOfOrderness;
}
preWaterMark = currentWaterMark;
return new Watermark(currentWaterMark);
}
@Override
public long extractTimestamp(MyData element, long previousElementTimestamp) {
long timestamp = element.getTimestamp()*1000L;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
return timestamp;
}
}
public class TestDelayData {
public static void main(String[] args){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(1000L);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyData> inputstream = env.addSource(new SourceFunction<MyData>() {
@Override
public void run(SourceContext<MyData> ctx) throws Exception {
for(int j=0;j<10;j++){
int time = Integer.valueOf(String.valueOf(System.currentTimeMillis()/1000));
MyData mydata1 = new MyData(time, "1.1.1.1", "1.1.1.2", 1);
ctx.collect(mydata1);
Thread.sleep(1000l);
for(int i=0;i<10000;i++){
MyData mydata2 = new MyData(1552890430, "1.1.1.1", "1.1.1.2", 2);
ctx.collect(mydata2);
}
MyData mydata3 = new MyData(time+3, "1.1.1.1", "1.1.1.2", 3);
ctx.collect(mydata3);
}
while (true){
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
DataStream outstream = SiddhiCEP.define("inputstream", inputstream.keyBy("sip", "dip").assignTimestampsAndWatermarks(new MyTimestampExtractor()),
"timestamp", "sip", "dip", "logtype")
.cql("from every s1=inputstream[logtype==1] -> s2=inputstream[logtype==3 and sip==s1.sip and dip==s1.dip] within 10 second " +
"select s1.timestamp as timestamp,s1.sip as sip,s1.dip as dip,s1.logtype as logtype,'mycep' as name insert into outstream")
.returnAsRow("outstream");
outstream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Hi. I use CEP SQL like every(1) followed by 2. ( every(1)->2 )
The input data like below:
1 1 1 2
And my expect output is:
{1,2} {1,2} {1,2}
But the actual output is only:
{1,2}
The siddhi stream process's output is match my expect, but the ouput of flink-siddhi is not. Now I'm every confused.
Could you please tell me how to get my expect output ?
The early version of https://github.com/apache/bahir-flink/tree/master/flink-library-siddhi was contributed from https://github.com/haoch/flink-siddhi, but haoch/flink-siddhi
under active development has more advanced features which are pending for approval from community to be merged into apache/bahir-flink/flink-library-siddhi
. As flink community may not actively run community for third-party libraries, so eventually the flink-siddhi library will be maintained on current repository directly to ensure all latest features could be published as soon as possible.
StreamOutputHandler.receive exception when executing multiple Query
Is Output<StreamRecord> output thread safe?
java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157) at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:70)
hi, haoch.
My company use flink 1.4.x now, since flink-siddhi declare prerequisites for flink 1.7.0, I am wondering is that possible make it works in 1.4.x? Does it use some advanced feature only support by flink 1.7.0?
thank you.
Hi @haoch
How the operator state behaves in parallelism?
As I can see in the class org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator
we are using the managedOperatorState.
Is there a way to partition the rules and events based on some key and use the keyedManagedState?
Just curious about the parallelism and performance. Do we have any performance measures?
For example: I have the siddhi query which expects 3 Failure events followed by 1 successful event like this - from every e1=firewallStream[name == 'FAILURE']<3> -> e2=firewallStream [ name == 'SUCCESS'] within 1 min select 'AAAAAAAA-AAAA-AAAA-BBBB-AAAAAAAAAAAA' as ruleId, e1.externalId as eventIds insert into outputStream
If I set the Siddhi-CEP operator parallelism to 1. The above query generates the correct result as all the events go to one operator instance. But If I increase the parallelism to the operator more than 1. then the events are distributed to more instance of the operator. The query is failed to generate the result as the state is not shared between the parallel instance of the operator.
Source ~> <Route, Event> ~> RoutePartitioner ~> CepOperator
Hello,
Current release of flink-siddhi is using 1.2-SNAPSHOT flink right?
I'm getting the following error which seems to be a class from previous versions (until 1.1.4 this class was there):
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer
at org.apache.flink.contrib.siddhi.operator.SiddhiStreamOperator.createStreamRecordSerializer(SiddhiStreamOperator.java:46)
at org.apache.flink.contrib.siddhi.operator.AbstractSiddhiOperator.registerStreamRecordSerializers(AbstractSiddhiOperator.java:120)
at org.apache.flink.contrib.siddhi.operator.AbstractSiddhiOperator.<init>(AbstractSiddhiOperator.java:112)
at org.apache.flink.contrib.siddhi.operator.SiddhiStreamOperator.<init>(SiddhiStreamOperator.java:40)
at org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:31)
at org.apache.flink.contrib.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:276)
at org.apache.flink.contrib.siddhi.SiddhiStream$ExecutionSiddhiStream.returns(SiddhiStream.java:239)
Thank you
In StreamSchema
there are methods isRowType()
and is isCompositeType()
. But they are not used in StreamSerializer
, which makes it impossible to create SiddhiStream for those types.
If I register a custom function like this:
cep.registerExtension("str:groupConcat", GroupConcatFunctionExtension.class);
Below code will return an exception like 'groupConcat() is neither a function or aggregate':
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returns("outstream2");
But if I change returns() to returnAsMap(), then it will works very well:
DataStream outstream2 = cep.from("inputstream2").cql("from inputstream2#window.timeBatch(1 sec) " + "select str:groupConcat(dip) as related_alerts " + "group by sip " + "insert into outstream2") .returnAsMap("outstream2");
在不考虑扩展性的前提下
flink+siddhi,flink一个主要做数据分发,真正计算的是siddhi在操作,这个过程,数据分发的网络开销(数据的序列化和反序列化),还有flink的数据结构转化成siddhi的结构
如果只用siddhi,这些都可以省去,性能会不会更好,因为是单机程序运维成本也会更好
比如,5台64G机器,应用跑在flink siddhi上,和用siddhi 分5个应用跑在5个机器上,不考虑数据准确性前提下,那个性能会更好
感谢!
we are using siddhi to process the realtime event collected online. i want to know the TTL of event processed.
eg. when an event belong to stream testStream arrived,will the event always be available untill restart or shutdown siddhi cluster? if the event group by time window, the event will be discard once it is expired?
i guess the strategy above will affect something like memory use and the perfomance of docker run the siddhi.
我将WSO2 Siddhi repo下的示例 TimeWindowSample 用 flink-siddhi 库改写并运行,发现和原生的示例输出结果并不相同,难道是我哪里写得不对吗?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple4<String, Float, Long, Long>> stockStream = env.fromElements(
Tuple4.of("IBM", 100f, 100L, 1000L),
Tuple4.of("IBM", 200f, 300L, 2000L),
Tuple4.of("WSO2", 60f, 200L, 2000L),
Tuple4.of("WSO2", 70f, 400L, 3000L),
Tuple4.of("GOOG", 50f, 30L, 3000L),
Tuple4.of("IBM", 200f, 400L, 4000L),
Tuple4.of("WSO2", 70f, 50L, 6000L),
Tuple4.of("WSO2", 80f, 400L, 8000L),
Tuple4.of("GOOG", 60f, 30L, 8000L)
);
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
SiddhiStream.SingleSiddhiStream stream = cep.define("StockStream", stockStream, "symbol", "price", "volume", "timestamp");
DataStream output = stream
.cql("from StockStream#window.time(5 sec) select symbol, sum(price) as price, sum(volume) as volume group by symbol insert into AggregateStockStream")
.returnsTransformRow("AggregateStockStream");
output.print();
env.execute();
输出结果是
IBM,100.0,100
GOOG,60.0,30
WSO2,80.0,400
WSO2,150.0,450
IBM,300.0,500
GOOG,110.0,60
WSO2,220.0,850
WSO2,280.0,1050
IBM,500.0,800
原生输出结果是
[Event{timestamp=1576396964357, data=[IBM, 100.0, 100], isExpired=false}]
[Event{timestamp=1576396965368, data=[IBM, 300.0, 400], isExpired=false}]
[Event{timestamp=1576396965369, data=[WSO2, 60.0, 200], isExpired=false}]
[Event{timestamp=1576396966373, data=[WSO2, 130.0, 600], isExpired=false}]
[Event{timestamp=1576396966373, data=[GOOG, 50.0, 30], isExpired=false}]
[Event{timestamp=1576396967373, data=[IBM, 500.0, 800], isExpired=false}]
[Event{timestamp=1576396969375, data=[WSO2, 200.0, 650], isExpired=false}]
[Event{timestamp=1576396971380, data=[WSO2, 150.0, 450], isExpired=false}]
[Event{timestamp=1576396971380, data=[GOOG, 60.0, 30], isExpired=false}]
Stack trace -
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinition(SiddhiStreamSchema.java:49)
at org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema.getStreamDefinitionExpression(SiddhiStreamSchema.java:64)
at org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext.getAllEnrichedExecutionPlan(SiddhiOperatorContext.java:113)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.validate(AbstractSiddhiOperator.java:291)
at org.apache.flink.streaming.siddhi.operator.AbstractSiddhiOperator.(AbstractSiddhiOperator.java:178)
at org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator.(SiddhiStreamOperator.java:41)
at org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory.createDataStream(SiddhiStreamFactory.java:31)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:331)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnsInternal(SiddhiStream.java:327)
at org.apache.flink.streaming.siddhi.SiddhiStream$ExecutionSiddhiStream.returnAsMap(SiddhiStream.java:287)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.