Giter Site home page Giter Site logo

druid-io / tranquility Goto Github PK

View Code? Open in Web Editor NEW
515.0 515.0 229.0 1.03 MB

Tranquility helps you send real-time event streams to Druid and handles partitioning, replication, service discovery, and schema rollover, seamlessly and without downtime.

License: Apache License 2.0

Scala 87.06% Java 12.94%

tranquility's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tranquility's Issues

org.hyperic:sigar:jar dependency can not resolved

I added the tranquility dependency to pom.xml and got this in MyEclipses:

Failed to execute goal on project xxxxxxx: Could not resolve dependencies for project xxxxxxxx:jar:0.0.1-SNAPSHOT: Could not find artifact org.hyperic:sigar:jar:1.6.5.132 in central (http://repo.maven.apache.org/maven2) -> [Help 1]
[ERROR] 

I do some digging and found the dependency come as this:
Image of sigar dependency

DruidBeamMaker should save entire serviceName

DruidBeamMaker should save the entire serviceName in the state dict, not just the unique firehoseId part. This way, if you change your firehoseServicePattern, communication with already-created tasks does not break down.

Allow to ignore the Latest Close Date from ZooKeeper Meta-Data

While trying to insert old data into druid (for example, data i had not before or where the realtime node get stuck) i got the problem, that i get the exception "Global latestCloseTime[%s] for identifier[%s] has moved past timestamp[%s], not creating merged beam", because the latest closed beam in the zookeeper meta store is newer than the timestamp of the data i want to insert.
I didn't find a way to override this meta-data from DruidBeam, it works only, if i delete the meta data in zookeeper or i change there the meta-data to the old date. Both of them i didn't like :)
So, there should be a way, to set a override for creating a beam to ignore

} else if (timestamp <= prev.latestCloseTime) {

inside ClusteredBeam

Thanks

Spawn replacements for defunct beams

Currently, when a beam is defunct (i.e. all Druid tasks for a particular interval are failed), tranquility will just drop all data for that interval. It should be fine to actually spawn a replacement set in this case.

NB: It's not fine to replace individual failed tasks, since this causes the tasks to become out of sync. But it should be okay to spawn a new set if they all fail.

Support full tuningConfig

This could potentially be done by making these more "untyped" in the base layer of the DruidBeams builder and just taking whatever the user provided.

Reported issues with stock storm builds

There was a report on the list of trouble getting tranquility to work with stock and mostly-stock storm builds: https://groups.google.com/forum/#!topic/druid-development/VV0jKRTzRc4.

Copy/paste of the original report:

Hi,

We're just starting with tranquility and we've encountered some problems getting the data into druid.
It seems that using the current version of storm (master, 0.9.2-incubating-SNAPSHOT) nothing is sent to druid (ngrep on the overlord's port doesn't show a thing and nothing appears on the coordinator console).
The topology does not show any error though.
After looking around, we've decided to try out your version of storm (0.9.0-mmx4) and there it works.

We haven't been able to find your fork of the storm project on github so there's no way for us to determine what differences may influence the behavior of tranquility.
Do you have any idea on what may cause this ?

Thanks
Regards
Laurent

Spurious "Beam defunct" alerts after increasing windowPeriod

Tranquility won't realize that the increased windowPeriod does not apply to existing tasks, and will try to send them data according to the new, longer windowPeriod rather than the shorter one they were created with. When this is past the shutoffTime, Tranquility gets upset that the tasks have gone away and emits an alert.

Tranquility should be able to push events to replicated beam if one of the partitioned replica is unreachable

We faced an issue in our metrics cluster, where one of the druid workers (1 partition out of 30) was unreachable via network but the task status was running, tranquility kept trying to send messages to the unreachable task and kept getting exceptions -

2015-06-11 14:20:17,692 WARN  [finagle/netty3-3] com.metamx.tranquility.finagle.FutureRetry$ - Transient error, will try again in 29007 ms
java.io.IOException: Unable to push events to task: index_realtime_mmx_metrics_2015-06-11T14:00:00.000Z_10_0 (status = TaskRunning)
    at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:160)
    at com.metamx.tranquility.druid.DruidBeam$$anonfun$4$$anonfun$apply$4$$anonfun$apply$6$$anonfun$apply$7$$anonfun$apply$3$$anonfun$applyOrElse$2.apply(DruidBeam.scala:146)
    at com.twitter.util.Future$$anonfun$map$1$$anonfun$apply$6.apply(Future.scala:863)
    at com.twitter.util.Try$.apply(Try.scala:13)
    at com.twitter.util.Future$.apply(Future.scala:90)
    at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863)
    at com.twitter.util.Future$$anonfun$map$1.apply(Future.scala:863)
    at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:824)

However, the replica task and other partitions were running fine, things were restored to normal on forcefully terminating the worker.

Tranquility should be able to handle such a scenario and instead of getting stuck it can blacklist the unreachable partition and keep sending data to only the replica.

ClusteredBeam.scala - mistake about windowPeriod

Hi again Gian,

In ClusteredBeam.scala class

https://github.com/metamx/tranquility/blob/master/src/main/scala/com/metamx/tranquility/beam/ClusteredBeam.scala#L447

I think that line 447, it must be like this:

apply(segmentGranularity, warmingPeriod, windowPeriod, partitions, replicants)

instead of:

apply(segmentGranularity, warmingPeriod, warmingPeriod, partitions, replicants)

object ClusteredBeamTuning
{
  /**
   * Factory method for ClusteredBeamTuning objects.
   *
   * @param segmentGranularity Each sub-beam will cover blocks of this size in the timeline. This controls how often
   *                           segments are closed off and made immutable. {{{Granularity.HOUR}}} is usually reasonable.
   * @param warmingPeriod If nonzero, create sub-beams this early. This can be useful if sub-beams take a long time
   *                      to start up.
   * @param windowPeriod Accept events this far outside of their timeline block. e.g. with a windowPeriod of 10 minutes,
   *                     and segmentGranularity of HOUR, we will accept an event timestamped for 4:15PM anywhere from
   *                     3:50PM to 4:10PM.
   * @param partitions Create this many logically distinct sub-beams per timeline block. This is used to scale
   *                   ingestion up to handle larger streams.
   * @param replicants Create this many replicants per sub-beam. This is used to provide higher availability and
   *                   parallelism for queries.
   */
  def create(
    segmentGranularity: Granularity,
    warmingPeriod: Period,
    windowPeriod: Period,
    partitions: Int,
    replicants: Int
  ): ClusteredBeamTuning = {
    apply(segmentGranularity, warmingPeriod, warmingPeriod, partitions, replicants)
  }

Is it correct???

Andrés

[Feature] Auto fill discovery path from config file

druid.discovery.curator.path is mentioned in the properties part, but actually it is not loaded. I don't know if there is any connection to #62 .

Currently, I have to add lines to manually set the discovery path.

val dataSource = getDataSource(file)

instance =
   DruidBeams.fromConfig(dataSource)
       .discoveryPath(dataSource.propertiesBasedConfig.discoPath)
       .buildBeam()

I think it is nice to add it to DruidBeams and the function

  private def fromConfigInternal[InputType, MessageType](
    inputFnFn: (DruidRollup, TimestampSpec) => (InputType => MessageType),
    timestamperFn: TimestampSpec => Timestamper[MessageType],
    config: DataSourceConfig[_ <: PropertiesBasedConfig]
  ): Builder[InputType, MessageType]

Maybe just one line is needed.

compile fail - not found dependencies

Hello,

I am very interested in your library for storm. But when I try to compile, I can't get resolve these dependencies, they have been erased central maven:

  • com.sun.jdmk:jmxtools:jar:1.2.1
  • com.sun.jmx:jmxri:jar:1.2.1

Are needed? Where can I find?

regards,
thank you!

Need to bump Curator version

I recently faced an issue when the curator connect string having multiple hosts (comma-separated) and there shouldn't be any spaces between the comma and the host. If it does, then we will have UnknownHostException, because the string isn't trimmed after splitting on the comma.

Looks like this was fixed long time back: https://issues.apache.org/jira/browse/ZOOKEEPER-1619.

Can we please have the curator version bumped?

tranquility-kafka

An app that is both a kafka consumer and a tranquility sender. Ideally it should work with only configuration, without users having to write code.

tranquility-server

This would be an HTTP server that runs a tranquility client. The idea is to provide a way for folks to use tranquility without embedding a Scala library in their app.

[tranquility-spark] lazy val makeBeam creates a new ZK connection for each RDD of a StreamingContext

Hello,

I'm trying to figure out how to use in a good way tranquility in a Spark Streaming Context.

In the Tranquility Spark Documentation, nothing is said regarding the batchDuration of the Spark Streaming Context.

I'd like to know if there a good practice about that batchDuration, ie : should we have that duration smaller/higher than the segmentGranularity(+ windowPeriod?) or the queryGranularity ?

From what I understand, it should not be correlated (thinking about avoiding too much data loss/duplication in case of a failure of a task), but I'd like to get some confirmation about that.

It brings me to the recommended and documented pattern for creating a BeamFactory (ie, makeBeam as a lazy val), in a StreamingContext :

  • The BeamFactory instance is created on the SparkDriver
  • It is then serialized and sent to the Spark Executors

Then, for each partition of each RDD :

  • That BeamFactory is deserialized
  • The BeamFactory.makeBeam creates a new Curator/Zookeeper Connection (I was not sure about that, thinking each RDD of a given executor were sharing the same deserialized object, but adding a log line in the lazy val makeBeam = { ... } shows that the method is executed for each partition for each new RDD)

Now, given a batchDuration of 1 second and some events filling 4 partitions, we'll get 4 new Curator/Zookeeper connections each second :

  • Is that the desired behavior?
  • If yes, are these Curator/Zookeeper client closed properly by the tranquilizer? (According to the curator.close() call in the Scala example, it is the user's responsability to close the curator. How to do that using the lazy val pattern?). And then, why asking for the user to start the curator using a lazy, when it may be better to start it in the foreachPartition of the BeamRDD.propagatemethod and close it properly at the end?
  • If no, it seems we got an issue here. I'm thinking about writing an CuratorPool object to avoid this problem. (not sure that it will work)

For the record, here is some spark executor logs (with 2 batches, given batchDuration = 10 seconds), showing the problem (each RDD is empty).

Please take a look to the "EventBeamFactory: Make Beam" and "EventBeamFactory: Start Curator" messages, which are logs added respectively at the very start of the lazy val makeBeam initialization and just before the curator.start() call.

After a while, we're getting a too many connection error on Zookeeper

16/02/05 11:41:30 INFO CoarseGrainedExecutorBackend: Got assigned task 41
16/02/05 11:41:30 INFO Executor: Running task 1.0 in stage 10.0 (TID 41)
16/02/05 11:41:30 INFO TorrentBroadcast: Started reading broadcast variable 10
16/02/05 11:41:30 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 2.7 KB, free 33.7 KB)
16/02/05 11:41:30 INFO TorrentBroadcast: Reading broadcast variable 10 took 73 ms
16/02/05 11:41:30 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 5.0 KB, free 38.7 KB)
16/02/05 11:41:30 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic1 0
16/02/05 11:41:30 INFO EventBeamFactory: Make Beam
16/02/05 11:41:30 INFO EventBeamFactory: Start Curator
16/02/05 11:41:30 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:30 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@2752d6ff
16/02/05 11:41:30 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:30 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:30 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d1, negotiated timeout = 40000
16/02/05 11:41:30 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:30 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:30 INFO Executor: Finished task 1.0 in stage 10.0 (TID 41). 875 bytes result sent to driver
16/02/05 11:41:30 INFO CoarseGrainedExecutorBackend: Got assigned task 42
16/02/05 11:41:30 INFO Executor: Running task 2.0 in stage 10.0 (TID 42)
16/02/05 11:41:30 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic2 0
16/02/05 11:41:30 INFO EventBeamFactory: Make Beam
16/02/05 11:41:30 INFO EventBeamFactory: Start Curator
16/02/05 11:41:30 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:30 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@77b39141
16/02/05 11:41:30 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:30 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:30 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:30 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d3, negotiated timeout = 40000
16/02/05 11:41:30 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:30 INFO Executor: Finished task 2.0 in stage 10.0 (TID 42). 875 bytes result sent to driver
16/02/05 11:41:30 INFO BlockManager: Removing RDD 29
16/02/05 11:41:30 INFO BlockManager: Removing RDD 28
16/02/05 11:41:30 INFO BlockManager: Removing RDD 27
16/02/05 11:41:40 INFO CoarseGrainedExecutorBackend: Got assigned task 45
16/02/05 11:41:40 INFO Executor: Running task 1.0 in stage 11.0 (TID 45)
16/02/05 11:41:40 INFO TorrentBroadcast: Started reading broadcast variable 11
16/02/05 11:41:40 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 2.7 KB, free 41.4 KB)
16/02/05 11:41:40 INFO TorrentBroadcast: Reading broadcast variable 11 took 96 ms
16/02/05 11:41:40 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 5.0 KB, free 46.4 KB)
16/02/05 11:41:40 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic1 0
16/02/05 11:41:40 INFO EventBeamFactory: Make Beam
16/02/05 11:41:40 INFO EventBeamFactory: Start Curator
16/02/05 11:41:40 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:40 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@364ae98f
16/02/05 11:41:40 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:40 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:40 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:40 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d6, negotiated timeout = 40000
16/02/05 11:41:40 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:40 INFO Executor: Finished task 1.0 in stage 11.0 (TID 45). 875 bytes result sent to driver
16/02/05 11:41:40 INFO CoarseGrainedExecutorBackend: Got assigned task 47
16/02/05 11:41:40 INFO Executor: Running task 3.0 in stage 11.0 (TID 47)
16/02/05 11:41:40 INFO KafkaRDD: Beginning offset 0 is the same as ending offset skipping example-topic2 0
16/02/05 11:41:40 INFO EventBeamFactory: Make Beam
16/02/05 11:41:40 INFO EventBeamFactory: Start Curator
16/02/05 11:41:40 INFO CuratorFrameworkImpl: Starting
16/02/05 11:41:40 INFO ZooKeeper: Initiating client connection, connectString=zookeeper:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@2dd6535a
16/02/05 11:41:40 INFO ClientCnxn: Opening socket connection to server zookeeper_1/172.17.0.6:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/05 11:41:40 INFO LoggingEmitter: Start: started [true]
16/02/05 11:41:40 INFO ClientCnxn: Socket connection established to zookeeper_1/172.17.0.6:2181, initiating session
16/02/05 11:41:40 INFO ClientCnxn: Session establishment complete on server zookeeper_1/172.17.0.6:2181, sessionid = 0x152b0a3979807d8, negotiated timeout = 40000
16/02/05 11:41:40 INFO ConnectionStateManager: State change: CONNECTED
16/02/05 11:41:40 INFO Executor: Finished task 3.0 in stage 11.0 (TID 47). 875 bytes result sent to driver
16/02/05 11:41:40 INFO BlockManager: Removing RDD 32
16/02/05 11:41:40 INFO BlockManager: Removing RDD 31
16/02/05 11:41:41 INFO BlockManager: Removing RDD 30

and here is some of the BeamFactory code :

class EventBeamFactory extends BeamFactory[Map[String,Any]] {

  @transient lazy val log = Logger.getLogger(getClass.getName)

  lazy val makeBeam: Beam[Map[String,Any]] = {

    log.info("Make Beam")
    val curator = CuratorFrameworkFactory.newClient(
      ...,
      new BoundedExponentialBackoffRetry(100, 3000, 5)
    )
    log.info("Start Curator")
    curator.start()

   DruidBreams.builder(...)
      .druidBeamConfig(new DruidBeamConfig(randomizeTaskId=true))
      .curator(curator)
      .discoveryPath(...)
      .location(...)
      .rollup(...)
      .tuning(...)
      .buildBeam()
  }
}

Config file should be usable by Core API

Instead of just Server and Kafka. This should make it easier for people to use whichever module makes more sense for them, without changing a bunch of things.

Druid firehose quiet period is not respected

Tranquility complains immediately that tasks are not found by spewing warnings in the log. It should wait for a configurable quiet period (default 1 minute) before doing that, as tasks do take a nonzero amount of time to spin up.

Support for flattenSpec

Tranquility should support passing along the flattenSpecs supported by Druid.

This is particularly useful when you want tranquility-server or tranquility-kafka to handle nested data, but could also be useful for core.

ClusteredBeamTuning rejectionPolicy

Hi,
I tried to set window period for 1 year (windowPeriod = new Period("P1Y")).
We need this as sometimes we might to re-index our data.

I got the following error from druid: java.lang.UnsupportedOperationException, exceptionMessage=Cannot convert to Duration as this period contains years and years vary in length}

I guess it's not valid to send 1 year.

So I wanted to set rejectionPolicy in the tuning to 'none'.
But this option is not available.
I there a plan to add rejection to the tuning?

Thanks

DataSchema constructor mismatch with druid 0.8.2

just recording that I'm seeing this error when running tranquility with druid 0.9 jars:

 2015-11-04T23:00:50,557 ERROR [ClusteredBeam-ZkFuturePool-5998d9e6-6fc8-45c4-9769-163ed08247f7] com.metamx.tranquility.beam.ClusteredBeam - Failed to update cluster state: overlord/tranquility_test
java.lang.NoSuchMethodError: io.druid.segment.indexing.DataSchema.<init>(Ljava/lang/String;Lio/druid/data/input/impl/InputRowParser;[Lio/druid/query/aggregation/AggregatorFactory;Lio/druid/segment/indexing/granularity/GranularitySpec;)V
        at com.metamx.tranquility.druid.DruidBeamMaker.com$metamx$tranquility$druid$DruidBeamMaker$$taskObject(DruidBeamMaker.scala:84) ~[tranquility-core_2.11-0.6.2.jar:0.6.2]

This is doing "new DataSchema" with 4 arguments, but the DataSchema constructor has 5 arguments as of druid PR #1695, which I think is in druid 0.8.2.

Cannot set two metrics on a single dimension

Hi,
I created two metrics on a single dimension (sum and histogram) and Tranquility 0.4.0 keeps rejecting all messages because of "Duplicate column entries found". I tried it using Tranquility 0.3.1 and it worked perfectly without any error. Our beam:

protected void buildBeam() {
    druidBeam = DruidBeams
            .builder(new JsonTimestamper())
            .curator(curator)
            .discoveryPath(discoveryPath)
            .location(DruidLocation.create(
                            indexService,
                            firehosePattern,
                            config.getDatasource()
                    )
            )
            .rollup(DruidRollup.create(
                    DruidDimensions.schemalessWithExclusions(ImmutableList.of("responseTime")),
                    ImmutableList.of(
                            new CountAggregatorFactory("rows"),
                            new DoubleSumAggregatorFactory("publisherPriceSum", "publisherPrice"),
                            new ApproximateHistogramAggregatorFactory("publisherPriceHistogram", "publisherPrice", HISTOGRAM_RESOLUTION, HISTOGRAM_OUTPUT_BUCKETS, 0F, 2000F)
                    ),
                    QueryGranularity.MINUTE))
            .tuning(ClusteredBeamTuning
                            .builder()
                            .segmentGranularity(config.getSegmentGranularity())
                            .windowPeriod(config.getWindowPeriod())
                            .partitions(config.getPartitions())
                            .replicants(config.getReplicants())
                            .build()
            )
            .objectWriter(new JSONBytesWriter())
            .buildBeam();

}

Our BeamPacketizer:

protected BeamPacketizer<JSON> buildBeamPacketizer(MessageAcknowledgeable messageAcknowledgeable) {
    packetizer = new BeamPacketizer<>(
            druidBeam,
            new TranquilityBatchListener(messageAcknowledgeable, config.getDatasource()),
            BATCH_SIZE,
            MAX_PENDING_BATCHES
    );
    return packetizer;
}

Our batch listener:

@Override
public void fail(Throwable throwable, JSON message) {
    this.messageAcknowledgeable.nack(throwable);
    statsClient.increment("nack");

    nackCounter++;
    logger.warn("Message wasn't sent", throwable);
}

Error from batch listener:

06:53:33.865 [Thread-3] WARN  com.ibillboard.data.charger.chargers.tranquility.TranquilityBatchListener - Message wasn't sent
java.lang.IllegalStateException: Failed to save new beam for identifier[druid:prod:overlord/ssp-auction-9] timestamp[2015-03-23T06:00:00.000Z]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$2.applyOrElse(ClusteredBeam.scala:264) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$2.applyOrElse(ClusteredBeam.scala:261) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) ~[scala-library-2.10.4.jar:?]
    at com.twitter.util.Future$$anonfun$rescue$1.apply(Future.scala:809) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Future$$anonfun$rescue$1.apply(Future.scala:808) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:96) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise$Transformer.k(Promise.scala:96) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise$Transformer.apply(Promise.scala:106) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise$Transformer.apply(Promise.scala:87) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise$$anon$2.run(Promise.scala:328) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:186) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:157) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:212) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:86) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise.runq(Promise.scala:314) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.Promise.updateIfEmpty(Promise.scala:609) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at com.twitter.util.ExecutorServiceFuturePool$$anon$2.run(FuturePool.scala:111) ~[util-core_2.10-6.20.0.jar:6.20.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_25]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_25]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_25]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_25]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_25]
Caused by: com.metamx.common.parsers.ParseException: Duplicate column entries found : [publisherprice]
    at com.metamx.common.parsers.ParserUtils.validateFields(ParserUtils.java:84) ~[java-util-0.26.14.jar:?]
    at io.druid.data.input.impl.DimensionsSpec.verify(DimensionsSpec.java:99) ~[druid-api-0.3.4.jar:0.3.4]
    at io.druid.data.input.impl.DimensionsSpec.<init>(DimensionsSpec.java:46) ~[druid-api-0.3.4.jar:0.3.4]
    at io.druid.data.input.impl.DimensionsSpec.withDimensionExclusions(DimensionsSpec.java:79) ~[druid-api-0.3.4.jar:0.3.4]
    at io.druid.segment.indexing.DataSchema.<init>(DataSchema.java:67) ~[druid-server-0.7.0.jar:0.7.0]
    at com.metamx.tranquility.druid.DruidBeamMaker.com$metamx$tranquility$druid$DruidBeamMaker$$taskObject(DruidBeamMaker.scala:83) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.druid.DruidBeamMaker$$anonfun$4.apply(DruidBeamMaker.scala:139) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.druid.DruidBeamMaker$$anonfun$4.apply(DruidBeamMaker.scala:137) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[scala-library-2.10.4.jar:?]
    at com.metamx.tranquility.druid.DruidBeamMaker.newBeam(DruidBeamMaker.scala:137) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.druid.DruidBeamMaker.newBeam(DruidBeamMaker.scala:40) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14$$anonfun$16$$anonfun$apply$3.apply(ClusteredBeam.scala:243) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14$$anonfun$16$$anonfun$apply$3.apply(ClusteredBeam.scala:243) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) ~[scala-library-2.10.4.jar:?]
    at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) ~[scala-library-2.10.4.jar:?]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14$$anonfun$16.apply(ClusteredBeam.scala:240) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14$$anonfun$16.apply(ClusteredBeam.scala:239) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.immutable.Range.foreach(Range.scala:141) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[scala-library-2.10.4.jar:?]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14.apply(ClusteredBeam.scala:238) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10$$anonfun$14.apply(ClusteredBeam.scala:229) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.immutable.List.foreach(List.scala:318) ~[scala-library-2.10.4.jar:?]
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) ~[scala-library-2.10.4.jar:?]
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) ~[scala-library-2.10.4.jar:?]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10.apply(ClusteredBeam.scala:229) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$10.apply(ClusteredBeam.scala:194) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anon$1$$anonfun$modify$1.apply(ClusteredBeam.scala:152) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.metamx.tranquility.beam.ClusteredBeam$$anon$1$$anonfun$modify$1.apply(ClusteredBeam.scala:141) ~[tranquility_2.10-0.4.0.jar:0.4.0]
    at com.twitter.util.Try$.apply(Try.scala:13) ~[util-core_2.10-6.20.0.jar:6.20.0]
    ... 6 more

Server: Allow In Memory Cache of Events

Having an asynchronous endpoint on the server would be great for use cases where you want to post without having to block for task creation, or getting the metrics to a peon. This would obviously create a few caveats that need to be understood:

  1. The client will not receive a positive acknowledgement for how many metrics were delivered to druid.
  2. Metrics can be lost if the server jvm dies while metrics are still in the memory cache.

There are cases where we are sending 10K+ metrics every second from a single threaded HTTP post routine, we are bumping into some blocking for these posts on occasion and timing out. Asynchronous posting would be a nice fix.

message dropped

The tranquility version I'm using is 0.8.3

All message is dropped, and I got the error message as below:

com.metamx.tranquility.tranquilizer.MessageDroppedException: null
at com.twitter.finagle.NoStacktrace(Unknown Source) ~[na:na]

firehoseServicePattern should be optional

We could have a default. It should be based on the overlord service so things still work if people have the same named dataSource in two different druid clusters.

Default DruidBeams beamMergeFn should be rollup-aware

The default DruidBeams beam merger is just a HashPartitionBeam, which does it based on hashCode of the event type. This can lead to sub-optimal rollup because events with the same dimension values may end up in different druid partitions, just because they have different metrics.

Instead, the default should probably partition things based on dimensions and truncated timestamp, using the user-provided DruidRollup.

The workaround for this issue is to define a custom beamMergeFn, or to override the event type's hashCode to be the same for events that should roll up together.

SimpleTranquilizerAdapter.flush() should force a flush even if lingerMillis has not elapsed

Currently, flush() doesn't actively force Tranquility to send buffered messages to Druid but rather blocks until lingerMillis elapses and the send thread does its thing. This means that the method will block for up to lingerMillis + sendTime when it's called. It would be great if calling flush() would immediately begin flushing the buffer regardless of what lingerMillis is set to.

Friendlier behavior when checking overlord for task failures

Currently, tranquility talks to the overlord in two situations:

  • When creating new tasks
  • When a task seems to have disappeared, and tranquility wants to check if the overlord considers it FAILED or not (if it is FAILED, tranquility will give up)

The second one can cause a thundering herd problem if there are lots of tranquility instances checking lots of tasks. There should be better behavior there. Sleeping a random amount before checking could be enough. (There's already exponential backoff if the initial overlord call fails)

Missing third parameter in TimestampSpec

Hi, I am using Tranquility 0.4.2 and currently I am unable to start my Tranquility app because of this error:

2015-06-04 12:18:27,095 [main] ERROR org.apache.samza.container.SamzaContainerExceptionHandler - Uncaught exception in thread (name=main). Exiting process now.
java.lang.NoSuchMethodError: io.druid.data.input.impl.TimestampSpec.<init>(Ljava/lang/String;Ljava/lang/String;)V
    at com.metamx.tranquility.druid.DruidBeams$.<init>(DruidBeams.scala:78) ~[tranquility_2.10-0.4.2.jar:0.4.2]
    at com.metamx.tranquility.druid.DruidBeams$.<clinit>(DruidBeams.scala) ~[tranquility_2.10-0.4.2.jar:0.4.2]
    at com.metamx.tranquility.druid.DruidBeams.builder(DruidBeams.scala) ~[tranquility_2.10-0.4.2.jar:0.4.2]
    at com.ibillboard.data.charger.tranquility.chargers.TranquilityCharger.buildBeam(TranquilityCharger.java:81) ~[samzajobs.jar:?]
    at com.ibillboard.data.charger.tranquility.chargers.TranquilityCharger.init(TranquilityCharger.java:59) ~[samzajobs.jar:?]
    at com.ibillboard.data.charger.tasks.VideoImpressionChargerTask.initTask(VideoImpressionChargerTask.java:14) ~[samzajobs.jar:?]
    at com.ibillboard.data.common.BaseTask.init(BaseTask.java:26) ~[samzajobs.jar:?]
    at org.apache.samza.container.TaskInstance.initTask(TaskInstance.scala:100) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer$$anonfun$startTask$2.apply(SamzaContainer.scala:606) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer$$anonfun$startTask$2.apply(SamzaContainer.scala:606) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:?]
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:?]
    at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) ~[scala-library-2.10.4.jar:?]
    at org.apache.samza.container.SamzaContainer.startTask(SamzaContainer.scala:606) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:545) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) ~[samza-core_2.10-0.9.0.jar:0.9.0]
    at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) ~[samza-core_2.10-0.9.0.jar:0.9.0]

I think the problem is with this commit because it adds additional third parameter missingValue but in DruidBeams.scala:78 there is no third argument

val DefaultTimestampSpec = new TimestampSpec("timestamp", "iso")

I tried the code from the documentation and it's not working either because of missing third argument.

// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto");

EDIT: I tried it with io.druid:druid-api:0.3.6 instead of io.druid:druid-api:0.3.8 and it worked.

Tranquility causes data loss due to re-use of finangle service name

i have hit a bug where tranquility will cause data loss due to re-use of the service name of the finangle service ,for certain combination of segment granularity and window period.

Here are the details of my setup:
Segment granularity:Minute
Window period = 120 minutes
datatsource name = ds1

In case of segment granularity is minute,tranquility picks only the minute part of the task interval along with the replicant number and partition number to form the name of the finangle service.

For an interval of [2015-11-19T07:00 - 2015-11-19T07:01] and for replicant number 0 and partition number 0 the service name would be "ds1-00-0000-0000" where firehose pattern is a static value.

Tranquility uses this service name to query the zoo keeper to identify the and details where it can send data and in scenarios where same service name is used to connect to more than 1 firehose than it will get multiple destinations to for a service.

In the scenario given above same service name would be used to connect to task running at 7:00 and task running at 6:00 and since window period is 2 hours when a DruidBeam is constructed to post events to task of interval 7:00 it will get host:port details of two different peon since 6:00 task would still be alive.
If a finagle client is created with more than 1 destination then it starts sending data to them in round robin way in order to do load balancing hence causing half the data of 7:00 to go to task 6:00 which eventually drops it and hence data loss happens.

I have verified that in my setup i see no data loss in scenarios where window period is less than 60 minutes and segment granularity is minute whereas i see almost half data getting lost in the scenario explained above.

Not able to compile tranquility source code

I cloned the tranquility and tried to build with "sbt complile" but I keep getting the below error.

[info] Packaging /Users/saurav/dev-env/tranquility/target/scala-2.10/root_2.10-0.7.4-SNAPSHOT.jar ...
[info] Done packaging.
[info] Compiling 1 Java source to /Users/saurav/quantiply/dev-env/tranquility/kafka/target/scala-2.10/classes...
[error] /Users/saurav/dev-env/tranquility/kafka/src/main/java/com/metamx/tranquility/kafka/writer/WriterController.java:58: error: cannot find symbol
[error] this.dataSourceConfigList.sort(
[error] ^
[error] symbol: method sort(<anonymous Comparator<DataSourceConfig>>)
[error] location: variable dataSourceConfigList of type List<DataSourceConfig>
[error] 1 error

[warn] No main class detected
[warn] No main class detected
[warn] No main class detected
[warn] No main class detected

Following information may prove useful

Scala version - scala -version
Scala code runner version 2.11.7 -- Copyright 2002-2013, LAMP/EPFL

Sbt
[info] server/:sbtVersion
[info] 0.13.7
[info] samza/
:sbtVersion
[info] 0.13.7
[info] kafka/:sbtVersion
[info] 0.13.7
[info] spark/
:sbtVersion
[info] 0.13.7
[info] flink/:sbtVersion
[info] 0.13.7
[info] core/
:sbtVersion
[info] 0.13.7
[info] storm/:sbtVersion
[info] 0.13.7
[info] root/
:sbtVersion
[info] 0.13.7

Schema validations

Check for,

  • Duplicate dimensions
  • Duplicate metric output names
  • Metrics and dimensions with the same name

Transient failure on KafkaConsumerTest.testStartConsumersNoCommit

https://travis-ci.org/druid-io/tranquility/builds/103996525

[error] Test com.metamx.tranquility.kafka.KafkaConsumerTest.testStartConsumersNoCommit failed: org.junit.runners.model.TestTimedOutException: test timed out after 60000 milliseconds, took 60.003 sec
[error]     at sun.misc.Unsafe.park(Native Method)
[error]     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
[error]     at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
[error]     at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
[error]     at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
[error]     at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
[error]     at com.metamx.tranquility.kafka.KafkaConsumerTest.testStartConsumersNoCommit(KafkaConsumerTest.java:220)
[error]     ...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.