Giter Site home page Giter Site logo

storm-cassandra-cql's People

Contributors

baontq avatar bflad avatar boneill42 avatar irieksts avatar lbruand avatar leerobert avatar lrenn avatar pabrahamsson avatar ylorenza-poctu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

storm-cassandra-cql's Issues

[Help needed] partitionPersist().newValuesStream() emits nothing

Hi,

I'm using storm-cassandra-cql 0.1.8 from Maven central repo. I'm having issue using partitionPersist() followed by .newValuesStream() so that I can further process the key-value pairs but it emits nothing afterward. My topo is as simple as follows:

myTopo.newStream(...)
      .each(new Fields("key", "value"), new Debug()) // It's ok right here.
      .partitionPersist(new CassandraCqlStateFactory(), new Fields("key", "value"), CassandraCqlStateUpdater<>(new MyTableMapper()), new Fields("key", "value")) // Tuples are persisted to Cassandra.
      .newValuesStream()
      .each(new Fields("key", "value"), new Debug()) // But nothing seems to be emitted here

Am I doing something wrong?

add extends possibility for CassandraCqlMapState

Actually everything is private or public in CassandraCqlMapState and CassandraCqlMapStateFactory. I need to extends this class to do something specific but i can't because all attribute are private and there is no accessor

Error while running example WordCountTopology

Guys,

I am quite new to trident and cassandra. While trying to run the wordcount example , am getting this exception midway through the run. Any ideas please?

DEBUG (com.hmsonline.trident.cql.CassandraCqlMapState:165) - Putting the following keys: [[went, spout1], [and, spout2]] with values: [73, 73]
ERROR (clojure.tools.logging$eval37$fn__43:16) - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
    at backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745)
    at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.ArrayIndexOutOfBoundsException: 1
    at com.hmsonline.trident.cql.CassandraCqlMapState.checkCassandraException(CassandraCqlMapState.java:227)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:157)
    at storm.trident.state.map.CachedMap.multiGet(CachedMap.java:52)
    at storm.trident.state.map.NonTransactionalMap.multiGet(NonTransactionalMap.java:39)
    at storm.trident.state.map.SnapshottableMap.multiGet(SnapshottableMap.java:37)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:31)
    at storm.trident.operation.builtin.MapGet.batchRetrieve(MapGet.java:28)
    at storm.trident.planner.processor.StateQueryProcessor.finishBatch(StateQueryProcessor.java:84)
    at storm.trident.planner.SubtopologyBolt.finishBatch(SubtopologyBolt.java:152)
    at storm.trident.topology.TridentBoltExecutor.finishBatch(TridentBoltExecutor.java:252)
    at storm.trident.topology.TridentBoltExecutor.checkFinish(TridentBoltExecutor.java:285)
    at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:364)
    at backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630)
    at backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
    at backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
    ... 6 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at storm.trident.tuple.TridentTupleView.getValue(TridentTupleView.java:227)
    at storm.trident.tuple.TridentTupleView.get(TridentTupleView.java:222)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:41)
    at com.hmsonline.trident.cql.example.wordcount.WordCountAndSourceMapper.retrieve(WordCountAndSourceMapper.java:15)
    at com.hmsonline.trident.cql.CassandraCqlMapState.multiGet(CassandraCqlMapState.java:137)
    ... 20 more

Questions on cassandra querying scheme

1.) Am I correct in saying that if I define a CassandraCqlMapState with a parallelism of N, then N different instances of the Session object will be created? If this is correct, does this not go against what is recommended by DataStax: http://www.datastax.com/dev/blog/4-simple-rules-when-using-the-datastax-drivers-for-cassandra where they say that a single Session object should be used across the application? Or does that not make sense in the storm context where the state could be distributed across different physical nodes?

2.) For all queries, CassandraCqlStateMap prepares a BatchStatement to submit to Cassandra. Is this guaranteed to have better performance than executing the individual statements asynchronously? Ref: https://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/. Basically what I am trying to get to is: Does the micro-batching in Trident really offer any amortisation of costs given how cassandra behaves with batch queries?

Thanks
Gireesh

RuntimeException: The same metric name `cassandra/readCount` was registered twice

Hi

I use CassandraCqlMapState and CassandraCqlMapStateFactory on my project. I need to do two queryState in my topology. When i try to run the second stateQuery i got this :

java.lang.RuntimeException: The same metric name cassandra/readCount was registered twice.
at backtype.storm.task.TopologyContext.registerMetric(TopologyContext.java:255) ~[storm-core-0.9.3.jar:0.9.3]
at com.hmsonline.trident.cql.CassandraCqlMapState.registerMetrics(CassandraCqlMapState.java:218) ~[storm-cassandra-cql-0.2.5.jar:na]
at com.hmsonline.trident.cql.CassandraCqlMapStateFactory.makeState(CassandraCqlMapStateFactory.java:56) ~[storm-cassandra-cql-0.2.5.jar:na]
at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:69) ~[storm-core-0.9.3.jar:0.9.3]
at storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:231) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]

I look into the code and i think it's a bug because on the same MetricsContexts we always put the same metrics name.

I have found no StateFactory using the metrics so i don't have a patch for now (And i'm not sure how to fix it.)

But if someone can confirm it's a real bug, i'm going to work on it

CassandraCqlIncrementalState commit only one entry

Hi,
I'm using CassandraCqlIncrementalState to update the counter.

https://github.com/hmsonline/storm-cassandra-cql/blob/master/src/main/java/com/hmsonline/trident/cql/incremental/CassandraCqlIncrementalState.java

Here is the commit method:

@OverRide
public void commit(Long txid) {
boolean applied = false;
DriverException lastException = null;
// Read current value.
//if we failed to apply the update , maybe the state has change already , we need to calculate the new state and apply it again
for (Map.Entry<K, V> entry : aggregateValues.entrySet()) {
int attempts = 0;
while (!applied && attempts < maxAttempts) {
try{
applied = updateState(entry, txid);
} catch(QueryExecutionException e) {
lastException = e;
LOG.warn("Catching {} attempt {}"+txid+"-"+partitionIndex, e.getMessage(), attempts);
}
attempts++;
}
if(!applied) {
if(lastException != null) {
throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex, lastException);
} else {
throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex);
}
}
}
}

When the aggregateValues has multiple entries, it only updated the first entry into cassandra.
As the applied variable will be set true if the first invocation of updateState successfully,
and the following loop will not enter any more

while (!applied && attempts < maxAttempts)

Do we need to set applied to false after each entry updated successfully?

Add LZ4 Dependency?

Downstream projects throw a WARN on startup because the Datastax Cassandra Java Driver (at least 2.1.x) throws it automatically if the LZ4 library isn't available, even if LZ4 compression isn't used. Since we allow the compression to be set when building the cluster configuration, should we include the dependency here so downstream projects don't need to explicitly declare it?

Here's an example WARN log:

WARN  [2015-02-05 20:56:54,568] com.datastax.driver.core.FrameCompressor: Error loading LZ4 library (java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4Factory.fastDecompressor()Lnet/jpountz/lz4/LZ4FastDecompressor;). LZ4 compression will not be available for the protocol.

Example POM entry that would fix it:

<dependency>
    <groupId>net.jpountz.lz4</groupId>
    <artifactId>lz4</artifactId>
    <version>1.2.0</version>
</dependency>

Please let me know if you'd like me to submit a PR here. Personally, I think upstream shouldn't be throwing a WARN for an optional dependency when its not used.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.