hmsonline / storm-cassandra-cql Goto Github PK
View Code? Open in Web Editor NEWStorm Cassandra Bridge built on CQL
License: Apache License 2.0
Storm Cassandra Bridge built on CQL
License: Apache License 2.0
Hi,
I'm using storm-cassandra-cql 0.1.8
from Maven central repo. I'm having issue using partitionPersist()
followed by .newValuesStream()
so that I can further process the key-value pairs but it emits nothing afterward. My topo is as simple as follows:
myTopo.newStream(...)
.each(new Fields("key", "value"), new Debug()) // It's ok right here.
.partitionPersist(new CassandraCqlStateFactory(), new Fields("key", "value"), CassandraCqlStateUpdater<>(new MyTableMapper()), new Fields("key", "value")) // Tuples are persisted to Cassandra.
.newValuesStream()
.each(new Fields("key", "value"), new Debug()) // But nothing seems to be emitted here
Am I doing something wrong?
Actually everything is private or public in CassandraCqlMapState and CassandraCqlMapStateFactory. I need to extends this class to do something specific but i can't because all attribute are private and there is no accessor
Guys,
I am quite new to trident and cassandra. While trying to run the wordcount example , am getting this exception midway through the run. Any ideas please?
DEBUG (com.hmsonline.trident.cql.CassandraCqlMapState:165) - Putting the following keys: [[went, spout1], [and, spout2]] with values: [73, 73]
ERROR (clojure.tools.logging$eval37$fn__43:16) - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745)
at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
at com.hmsonline.trident.cql.CassandraCqlMapState.checkCassandraException(CassandraCqlMapState.java:227)
at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:157)
at storm.trident.state.map.CachedMap.multiGet(CachedMap.java:52)
at storm.trident.state.map.NonTransactionalMap.multiGet(NonTransactionalMap.java:39)
at storm.trident.state.map.SnapshottableMap.multiGet(SnapshottableMap.java:37)
at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:31)
at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:28)
at storm.trident.planner.processor.StateQueryProcessor.finishBatch(StateQueryProcessor.java:84)
at storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
at storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
at storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:364)
at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630)
at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
... 6 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at storm.trident.tuple.TridentTupleView.getValue(TridentTupleView.java:227)
at storm.trident.tuple.TridentTupleView.get(TridentTupleView.java:222)
at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:41)
at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:15)
at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:137)
... 20 more
When statements are added to a batch, the statement level attributes are ignored.
We should make it apparent that we are constructing a BatchStatement under the hood, whose consistency level overrides the statement objects.
1.) Am I correct in saying that if I define a CassandraCqlMapState with a parallelism of N, then N different instances of the Session object will be created? If this is correct, does this not go against what is recommended by DataStax: http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra where they say that a single Session object should be used across the application? Or does that not make sense in the storm context where the state could be distributed across different physical nodes?
2.) For all queries, CassandraCqlStateMap prepares a BatchStatement to submit to Cassandra. Is this guaranteed to have better performance than executing the individual statements asynchronously? Ref: https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/. Basically what I am trying to get to is: Does the micro-batching in Trident really offer any amortisation of costs given how cassandra behaves with batch queries?
Thanks
Gireesh
Hi Guys,
Do you have any plan to implement a regular cassandraBolt build on CQL?
Hi
I use CassandraCqlMapState and CassandraCqlMapStateFactory on my project. I need to do two queryState in my topology. When i try to run the second stateQuery i got this :
java.lang.RuntimeException: The same metric name cassandra/readCount
was registered twice.
at backtype.storm.task.TopologyContext.registerMetric(TopologyContext.java:255) ~[storm-core-0.9.3.jar:0.9.3]
at com.hmsonline.trident.cql.CassandraCqlMapState.registerMetrics(CassandraCqlMapState.java:218) ~[storm-cassandra-cql-0.2.5.jar:na]
at com.hmsonline.trident.cql.CassandraCqlMapStateFactory.makeState(CassandraCqlMapStateFactory.java:56) ~[storm-cassandra-cql-0.2.5.jar:na]
at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69) ~[storm-core-0.9.3.jar:0.9.3]
at storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]
I look into the code and i think it's a bug because on the same MetricsContexts we always put the same metrics name.
I have found no StateFactory using the metrics so i don't have a patch for now (And i'm not sure how to fix it.)
But if someone can confirm it's a real bug, i'm going to work on it
Hello,
We are seeing degradation on performance with time.
So we are believing we have a leak somewhere.
Should not we have some sort of reset of the this.statements
field in the end of the commit
method :
this.statements.clear();
Hi!
Thanks for your driver. It is great.
Query timeout configuration does not seem to be implemented.
When do you plan to implement it ?
Do you need help ?
Cheers,
Lucas
Hi,
I'm using CassandraCqlIncrementalState to update the counter.
Here is the commit method:
@OverRide
public void commit(Long txid) {
boolean applied = false;
DriverException lastException = null;
// Read current value.
//if we failed to apply the update , maybe the state has change already , we need to calculate the new state and apply it again
for (Map.Entry<K, V> entry : aggregateValues.entrySet()) {
int attempts = 0;
while (!applied && attempts < maxAttempts) {
try{
applied = updateState(entry, txid);
} catch(QueryExecutionException e) {
lastException = e;
LOG.warn("Catching {} attempt {}"+txid+"-"+partitionIndex, e.getMessage(), attempts);
}
attempts++;
}
if(!applied) {
if(lastException != null) {
throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex, lastException);
} else {
throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex);
}
}
}
}
When the aggregateValues has multiple entries, it only updated the first entry into cassandra.
As the applied variable will be set true if the first invocation of updateState successfully,
and the following loop will not enter any more
while (!applied && attempts < maxAttempts)
Do we need to set applied to false after each entry updated successfully?
Downstream projects throw a WARN on startup because the Datastax Cassandra Java Driver (at least 2.1.x) throws it automatically if the LZ4 library isn't available, even if LZ4 compression isn't used. Since we allow the compression to be set when building the cluster configuration, should we include the dependency here so downstream projects don't need to explicitly declare it?
Here's an example WARN log:
WARN [2015-02-05 20:56:54,568] com.datastax.driver.core.FrameCompressor: Error loading LZ4 library (java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4Factory.fastDecompressor()Lnet/jpountz/lz4/LZ4FastDecompressor;). LZ4 compression will not be available for the protocol.
Example POM entry that would fix it:
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.2.0</version>
</dependency>
Please let me know if you'd like me to submit a PR here. Personally, I think upstream shouldn't be throwing a WARN for an optional dependency when its not used.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.