Giter Site home page Giter Site logo

pystorm / streamparse Goto Github PK

View Code? Open in Web Editor NEW
1.5K 103.0 217.0 6.3 MB

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.

Home Page: http://streamparse.readthedocs.io/

License: Apache License 2.0

Shell 0.15% Python 99.61% Clojure 0.24%
apache-storm storm python

streamparse's Introduction

logo Build Status Coverage

Pystorm lets you run Python code against real-time streams of data via Apache Storm. With pystorm you can create Storm bolts and spouts in Python without having to write a single line of Java. It is meant to be used under-the-hood by Storm Python libraries that will provide the command-line tools for actually building/submitting the topologies (e.g., streamparse).

Documentation

Contributors

Alphabetical, by last name:

streamparse's People

Contributors

abelsonlive avatar amontalenti avatar codywilbourn avatar cyhsutw avatar dan-blanchard avatar darkless012 avatar emmettbutler avatar gallamine avatar hellais avatar hodgesds avatar isms avatar j-bennet avatar jeffgodwyll avatar kbourgoin avatar macheins avatar mbande avatar mineo avatar motazreda avatar msukmanowsky avatar omus avatar rduplain avatar roadhead avatar rwatler avatar saeedesfandi avatar sylvainde avatar tanaysoni avatar tdhopper avatar thedrow avatar timgates42 avatar vshlapakov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamparse's Issues

Async loop interrupted!:

i'm using the framework streamparse to create storm topology. when i run my topology in local mode, my topology run on for five seconds and after that it shuttin down;
is there any solution please, tanks in advance.
(im using my toplogy to get twitter stream )

log:
topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/tmp/d47409c7-af6c-461c-a6a3-52fd9d283435", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2001, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 2, "streamparse.log.path" "/home/bena/sahbi/logs", "topology.kryo.decorators" (), "topology.name" "wordcount", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" 5000, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 60, "task.refresh.poll.secs" 10, "topology.workers" 2, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144, "streamparse.log.level" "debug"}
3287 [Thread-4] INFO backtype.storm.daemon.worker - Worker 102ed550-1958-4494-9042-4b553336c01f for storm wordcount-1-1411485247 on ddd46eee-f950-41ae-b4db-ed3997a22d27:1024 has finished loading
3293 [Thread-22-__acker] INFO backtype.storm.daemon.executor - Preparing bolt __acker:(1)
3293 [Thread-22-__acker] INFO backtype.storm.daemon.executor - Prepared bolt __acker:(1)
3294 [Thread-23-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Starting receive-thread: [stormId: wordcount-1-1411485247, port: 1024, thread-id: 0 ]
3606 [Thread-14-word-spout] INFO backtype.storm.spout.ShellSpout - Launched subprocess with pid 29050
3607 [Thread-9-count-bolt] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 29041
3607 [Thread-14-word-spout] INFO backtype.storm.daemon.executor - Opened spout word-spout:(5)
3607 [Thread-12-count-bolt] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 29043
3609 [Thread-9-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(3)
3610 [Thread-14-word-spout] INFO backtype.storm.daemon.executor - Activating spout word-spout:(5)
3610 [Thread-12-count-bolt] INFO backtype.storm.daemon.executor - Prepared bolt count-bolt:(4)
6965 [main] INFO backtype.storm.daemon.nimbus - Shutting down master
6997 [main] INFO backtype.storm.daemon.nimbus - Shut down master
7015 [main] INFO backtype.storm.daemon.supervisor - Shutting down ddd46eee-f950-41ae-b4db-ed3997a22d27:102ed550-1958-4494-9042-4b553336c01f
7015 [main] INFO backtype.storm.process-simulator - Killing process b35fe761-8f07-4b7d-98ee-9e11dff1941c
7015 [main] INFO backtype.storm.daemon.worker - Shutting down worker wordcount-1-1411485247 ddd46eee-f950-41ae-b4db-ed3997a22d27 1024
7015 [main] INFO backtype.storm.daemon.worker - Shutting down receive thread
7015 [main] INFO backtype.storm.messaging.loader - Shutting down receiving-thread: [wordcount-1-1411485247, 1024]
7016 [main] INFO backtype.storm.messaging.loader - Waiting for receiving-thread:[wordcount-1-1411485247, 1024] to die
7016 [Thread-23-worker-receiver-thread-0] INFO backtype.storm.messaging.loader - Receiving-thread:[wordcount-1-1411485247, 1024] received shutdown notice
7017 [main] INFO backtype.storm.messaging.loader - Shutdown receiving-thread: [wordcount-1-1411485247, 1024]
7017 [main] INFO backtype.storm.daemon.worker - Shut down receive thread
7017 [main] INFO backtype.storm.daemon.worker - Terminating messaging context
7017 [main] INFO backtype.storm.daemon.worker - Shutting down executors
7017 [main] INFO backtype.storm.daemon.executor - Shutting down executor count-bolt:[3 3]
7017 [Thread-9-count-bolt] INFO backtype.storm.util - Async loop interrupted!
7017 [Thread-7-disruptor-executor[3 3]-send-queue] INFO backtype.storm.util - Async loop interrupted!
7018 [main] INFO backtype.storm.daemon.executor - Shut down executor count-bolt:[3 3]
7018 [main] INFO backtype.storm.daemon.executor - Shutting down executor word-spout:[5 5]
7018 [Thread-13-disruptor-executor[5 5]-send-queue] INFO backtype.storm.util - Async loop interrupted!

Deprecate emit_many

With the latest refactor to bolt/spout.py, emit_many just performs multiple calls to emit underneath the hood and creates confusion for users, we should just deprecate emit_many and then kill entirely in future releases.

Topology Name Collision Reporting Sucks

If I try to deploy to a cluster and there's a topology running with the same name, streamparse throws an error. This is the correct behavior, but the error that's reported is a pain to sift through to figure out what just happened:

Traceback (most recent call last):
  File "/data/vRouting Python logging to /opt/storm/current/logs.
irtualenvs/storm/bin/sparse", line 9, in <module>
 Running lein command to submit topology to nimbus:
   load_entry_point('streamparse==0.0.13', 'console_scripts', 'sparse')()
  File "/home/kfb/src/ct/streamparse/streamparse/cmdln.py", line 93, in main
    args["--force"], args["--debug"])
  File "/data/virtualenvs/storm/local/lib/python2.7/site-packages/invoke/tasks.py", line 111, in __call__
    result = self.body(*args, **kwargs)
  File "/home/kfb/src/ct/streamparse/streamparse/ext/invoke.py", line 272, in submit_topology
    run(full_cmd)
  File "/data/virtualenvs/storm/local/lib/pytholein run -m streamparse.commands.submit_topology/-main topologies/visits-prod.clj --option 'topology.workers=8' --option 'topology.acker.executors=8' --option 'topology.python.path="/data/virtualenvs/visits-prod/bin/python"' --option 'streamparse.log.path="/opt/storm/current/logs"' --option 'streamparse.log.level="info"' --option topology.max.spout.pending=1500 --option topology.message.timeout.secs=300 --option supervisor.worker.timeout.secs=300 --option 'parsely.deployment_stage="prod"'
n2.7/site-packages/invoke/runner.py", line 160, in run
    raise Failure(result)
invoke.exceptions.Failure: Command execution failure!

Exit code: 1

Stderr:

Exception in thread "main" java.lang.NullPointerException
    at streamparse.commands.submit_topology$submit_topology_BANG_.invoke(submit_topology.clj:27)
    at streamparse.commands.submit_topology$_main.doInvoke(submit_topology.clj:{:option {parsely.deployment_stage prod, supervisor.worker.timeout.secs 300, topology.message.timeout.secs 300, topology.max.spout.pending 1500, streamparse.log.level info, streamparse.log.path /opt/storm/current/logs, topology.python.path /data/virtualenvs/visits-prod/bin/python, topology.acker.executors 8, topology.workers 8}, :debug false, :port 6627, :host localhost, :help false}
74)
    at clojure.lang.RestFn.invoke(RestFn.java:3204)
    at clojure.lang.VaCaught exception: Topology with name `visits` already exists on cluster
r
.invoke(Var.java:510)
    at user$eval5.invoke(form-init5814132293844863193.clj:1)
    ajava.lang.RuntimeException: Topology with name `visits` already exists on cluster
t clojure.lang.Compiler.eval(Compiler.java:6619)
    at clojure.lang.Compiler. at backtype.storm.StormSubmitter.submitTopology (StormSubmitter.java:102)
eval(Compiler.java:6609)
    at clojure.lang.Compiler.load(Compiler.java:7064    backtype.storm.StormSubmitter.submitTopology (StormSubmitter.java:70)
)
    at clojure.lang.Compiler.loadFile(Compiler.java:7020)
    at clojure.main$load_script.invoke(ma    streamparse.commands.submit_topology$submit_topology_BANG_.invoke (submit_topology.clj:22)
in.clj:294)
    at clojure.main$init_opt.invoke(main.clj:299)
    at clojure.main$initi    streamparse.commands.submit_topology$_main.doInvoke (submit_topology.clj:74)
alize.invoke(main.clj:327)
    at clojure.main$null_o    clojure.lang.RestFn.invoke (RestFn.java:3204)
pt.invoke(main.clj:362)
    at clojure.main$ma    clojure.lang.Var.invoke (Var.java:510)
in.doInvoke(main.clj:440)
    at clojure.lang.RestFn.invoke(Re    user$eval5.invoke (form-init5814132293844863193.clj:1)
stFn.java:421)
    at clojure.lang.Var.invoke(Var.java:    clojure.lang.Compiler.eval (Compiler.java:6619)
419)
    at clojure.lang.AFn.applyToHelper(AFn.java:163    clojure.lang.Compiler.eval (Compiler.java:6609)
)
    at clojure.lang.Var.applyTo(Var.java:532)
    at clo    clojure.lang.Compiler.load (Compiler.java:7064)
jure.main.main(main.java:37)



    clojure.lang.Compiler.loadFile (Compiler.java:7020)
    clojure.main$load_script.invoke (main.clj:294)
    clojure.main$init_opt.invoke (main.clj:299)
    clojure.main$initialize.invoke (main.clj:327)
    clojure.main$null_opt.invoke (main.clj:362)
    clojure.main$main.doInvoke (main.clj:440)
    clojure.lang.RestFn.invoke (RestFn.java:421)
    clojure.lang.Var.invoke (Var.java:419)
    clojure.lang.AFn.applyToHelper (AFn.java:163)
    clojure.lang.Var.applyTo (Var.java:532)
    clojure.main.main (main.java:37)

Provide example of mixed Java + Python topology using storm-kafka

Requested on the mailing list:

I am looking forward to understand how I can use clojure DSL to submit a a topology which has spout written in Java and Bolts written in Python.

The best first example would be to use the new official Kafka spout (storm-kafka) that is now included in Storm's 0.9.2 release. This combined with Python bolts processing the data from the Kafka spout would be the best first Python + Java interop example for streamparse, I think.

Passing logging path causes wordcount example to fail

I was just trying to run the wordcount example on Master 22a6d90. However, I get the following exception

tdhopper@~/wordcount (streamparse-dev) $ sparse run
Running wordcount topology...
Routing Python logging to /Users/tdhopper/wordcount/logs.
Running lein command to run local cluster:
lein run -m streamparse.commands.run/-main topologies/wordcount.clj -t 5 --option 'topology.workers=2' --option 'topology.acker.executors=2' --option 'streamparse.log.path="/Users/tdhopper/wordcount/logs"' --option 'streamparse.log.level="debug"'
891  [main] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.RuntimeException: Invalid token: /Users/tdhopper/wordcount/logs
    at clojure.lang.Util.runtimeException(Util.java:219) ~[clojure-1.5.1.jar:na]
    at clojure.lang.LispReader.interpretToken(LispReader.java:326) ~[clojure-1.5.1.jar:na]
    at clojure.lang.LispReader.read(LispReader.java:211) ~[clojure-1.5.1.jar:na]
    at clojure.lang.RT.readString(RT.java:1738) ~[clojure-1.5.1.jar:na]
    at clojure.core$read_string.invoke(core.clj:3427) ~[clojure-1.5.1.jar:na]
    at streamparse.commands.run$_parse_topo_option.invoke(run.clj:46) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
    at streamparse.cli$apply_specs.invoke(cli.clj:134) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
    at streamparse.cli$cli.doInvoke(cli.clj:205) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:486) ~[clojure-1.5.1.jar:na]
    at streamparse.commands.run$_main.doInvoke(run.clj:58) ~[streamparse-0.0.4-SNAPSHOT.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:930) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Var.invoke(Var.java:460) ~[clojure-1.5.1.jar:na]
    at user$eval5$fn__7.invoke(form-init4991295292826695665.clj:1) ~[na:na]
    at user$eval5.invoke(form-init4991295292826695665.clj:1) ~[na:na]
    at clojure.lang.Compiler.eval(Compiler.java:6619) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Compiler.eval(Compiler.java:6609) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Compiler.load(Compiler.java:7064) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Compiler.loadFile(Compiler.java:7020) ~[clojure-1.5.1.jar:na]
    at clojure.main$load_script.invoke(main.clj:294) ~[clojure-1.5.1.jar:na]
    at clojure.main$init_opt.invoke(main.clj:299) ~[clojure-1.5.1.jar:na]
    at clojure.main$initialize.invoke(main.clj:327) ~[clojure-1.5.1.jar:na]
    at clojure.main$null_opt.invoke(main.clj:362) ~[clojure-1.5.1.jar:na]
    at clojure.main$main.doInvoke(main.clj:440) ~[clojure-1.5.1.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:421) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Var.invoke(Var.java:419) ~[clojure-1.5.1.jar:na]
    at clojure.lang.AFn.applyToHelper(AFn.java:163) ~[clojure-1.5.1.jar:na]
    at clojure.lang.Var.applyTo(Var.java:532) ~[clojure-1.5.1.jar:na]
    at clojure.main.main(main.java:37) ~[clojure-1.5.1.jar:na]
Traceback (most recent call last):
  File "/Users/tdhopper/.virtualenvs/streamparse-dev/bin/sparse", line 9, in <module>
    load_entry_point('streamparse==0.0.13', 'console_scripts', 'sparse')()
  File "/Users/tdhopper/repos/parsely/streamparse/streamparse/cmdln.py", line 82, in main
    run_local_topology(args["--name"], time, par, options, args["--debug"])
  File "/Users/tdhopper/.virtualenvs/streamparse-dev/lib/python2.7/site-packages/invoke-0.8.1-py2.7.egg/invoke/tasks.py", line 108, in __call__
    result = self.body(*args, **kwargs)
  File "/Users/tdhopper/repos/parsely/streamparse/streamparse/ext/invoke.py", line 174, in run_local_topology
    run(full_cmd, pty=True)
  File "/Users/tdhopper/.virtualenvs/streamparse-dev/lib/python2.7/site-packages/invoke-0.8.1-py2.7.egg/invoke/runner.py", line 152, in run
    raise Failure(result)
invoke.exceptions.Failure: Command execution failure!

Exit code: 1

Stderr:

If I comment out these logging statements, the topology runs fine.

Even when it fails, if I try to run the clojure command printed here (including the logging bits) the topoloy runs fine.

Here's the command that works fine:

lein run -m streamparse.commands.run/-main topologies/wordcount.clj -t 5 --option 'topology.workers=2' --option 'topology.acker.executors=2' --option 'streamparse.log.path="/Users/tdhopper/wordcount/logs"' --option 'streamparse.log.level="debug"'

So that lead me to think the problem has to do with invoke.run. I tried changing pty=True to False here and suddenly things worked. @dan-blanchard added the pty flag here for Python 3 compatibility. I haven't tried running the wordcount example in Python 3 to see if it works with this change. @dan-blanchard, know if this is an easy fix?

streamparse virtualenv_path

Just beguining with storm topology i have this error: is there someone how have any idea, Thanks in advance!!!

sparse submit --name topology
Traceback (most recent call last):
  File "/usr/local/bin/sparse", line 9, in <module>
    load_entry_point('streamparse==1.0.1', 'console_scripts', 'sparse')()
  File "/usr/local/lib/python2.7/dist-packages/streamparse/cmdln.py", line 93, in main
    args["--force"], args["--debug"])
  File "/usr/local/lib/python2.7/dist-packages/invoke/tasks.py", line 111, in __call__
    result = self.body(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/invoke.py", line 202, in submit_topology
    name, "{}/{}.txt".format(config["virtualenv_specs"], name)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 171, in __call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 174, in run
    return self.wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/fabric.py", line 122, in create_or_update_virtualenvs
    with open(requirements_file, "r") as fp:
IOError: [Errno 2] No such file or directory: 'virtualenvs/topology.txt'
bena@bena-HP-EliteDesk-800-G1-TWR:~/wikipedia_editLogs_trends$ sparse submit --name topology
Traceback (most recent call last):
  File "/usr/local/bin/sparse", line 9, in <module>
    load_entry_point('streamparse==1.0.1', 'console_scripts', 'sparse')()
  File "/usr/local/lib/python2.7/dist-packages/streamparse/cmdln.py", line 93, in main
    args["--force"], args["--debug"])
  File "/usr/local/lib/python2.7/dist-packages/invoke/tasks.py", line 111, in __call__
    result = self.body(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/invoke.py", line 202, in submit_topology
    name, "{}/{}.txt".format(config["virtualenv_specs"], name)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 171, in __call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 174, in run
    return self.wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/fabric.py", line 138, in create_or_update_virtualenvs
    hosts=env.storm_workers)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 424, in execute
    results['<local-only>'] = task.run(*args, **new_kwargs)
  File "/usr/local/lib/python2.7/dist-packages/fabric/tasks.py", line 174, in run
    return self.wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/fabric/decorators.py", line 181, in inner
    return func(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/fabric.py", line 94, in _create_or_update_virtualenv
    virtualenv_path = os.path.join(virtualenv_root, virtualenv_name)
  File "/usr/lib/python2.7/posixpath.py", line 77, in join
    elif path == '' or path.endswith('/'):
AttributeError: 'NoneType' object has no attribute 'endswith'

`sparse visualize` command

Use an existing topology visualization tool like:

or some mix of these, add a visualization command for topologies:

sparse visualize [(--name | -n) <topology>] (--format | -f (png|jpg|gif)) output_filename 

Add support for remote deployment of Storm topologies

In order to properly deploy a streamparse topology, we have a few steps:

  1. Create/update virtualenvs on all Storm workers in the cluster.
  2. Somehow create a new Clojure topology definition which modifies all ShellSpout/ShellBolt Python calls from python myscript.py to /path/to/virtualenv/bin/python myscript.py.
  3. Package up required resources into an uberjar via lein uberjar (Storm requires that ShellBolt resources are in the JAR's /resources directory). In streamparse's case, this effectively means having a project's entire src directory inside the uberjar.
  4. Using invoke, create a a local tunnel to the Storm cluster's Nimbus server ssh <user>@<host> -NL 6627:localhost:<remote_nimbus_port>.
  5. Check to see if a topology with the same name is currently running and if so, kill it using something like this to list topologies and something like this to kill them.
  6. Deploy topology via stormlocal.clj to cluster using StormSubmitter (clojure example).
  7. Kill local tunnel to Nimbus

Current thinking is that we'll expose this in the command line as

$ sparse package
$ sparse deploy <topology_name> <environment_name>

One simplification, that's worth mentioning is that package doesn't take a topology name because we actually package all resources in the project's src directory.

Make virtualenvs created on remote servers "relocatable"

Apparently virtualenv has added this very handy --relocatable option.

This makes it possible to copy, move, rename virtualenvs. Between machines/architectures -- not so safe. But within the same machine, safe.

What's the benefit of this? Well, we could set up virtualenvs that have a lot of dependencies, then relocate them to another directory, and then treat them as a "base" venv on which to install additional dependencies. This could help us avoid the "pandas is constantly recompiling" problem when you decide to spin up a new topology with a new venv and the whole requirements.txt needs to be installed all over again.

Document how to develop on streamparse

Currently, I have some internal developer documentation at Parse.ly in our project readmes about how to develop on streamparse, including:

  • setting up local lein checkouts for working on the JVM interop library from a dependent project
  • using setup.py develop
  • building and installing pre-release Storm versions locally

These would be good to have somewhere in our developer wiki.

submit topology

I have this error when i try to submit my topology.(sparse submit --environment prod --name wordcount -dv)

Do you have any idea please.Thanks

Traceback (most recent call last):
File "/usr/local/bin/sparse", line 9, in
load_entry_point('streamparse==1.0.1', 'console_scripts', 'sparse')()
File "/usr/local/lib/python2.7/dist-packages/streamparse/cmdln.py", line 93, in main
args["--force"], args["--debug"])
File "/usr/local/lib/python2.7/dist-packages/invoke/tasks.py", line 111, in call
result = self.body(_args, *_kwargs)
File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/invoke.py", line 183, in submit_topology
config = get_config()
File "/usr/local/lib/python2.7/dist-packages/streamparse/decorators.py", line 22, in call
value = self.func(_args)
File "/usr/local/lib/python2.7/dist-packages/streamparse/ext/util.py", line 26, in get_config
config = json.load(fp)
File "/usr/lib/python2.7/json/init.py", line 290, in load
*_kw)
File "/usr/lib/python2.7/json/init.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib/python2.7/json/decoder.py", line 384, in raw_decode
raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

Support Python 3 using six/tox and __future__ imports

I think it's important for streamparse to be a single-source codebase with Python 2.7 and 3.4 support. To do this, we should pull in six and also use from __future__ import absolute_import, print_function in our code. Would also be good to use some sort of linter / test suite to guarantee Python 3 is working correctly, since we tend to develop on Python 2.7 still. I guess the best tool here is tox?

support for python2.6 needed

I'm using centos6, which has python2.6 installed by default.

streamparse failed to setup with python2.6.

error messages:

$ sudo python setup.py setup.py
Traceback (most recent call last):
File "setup.py", line 42, in
if sys.version_info.major < 3:
AttributeError: 'tuple' object has no attribute 'major'

$ cd wordcount && sparse run
print(' {:<18} {}'.format(green('create'), path))
ValueError: zero length field name in format

MAX_MESSAGE_SIZE check doesn't agree with comment

In ipc.read_message_lines() there's a section like:

        message_size = len(line)

        # If message size exceeds MAX_MESSAGE_SIZE, we assume that the
        # Storm worker has died, and we would be reading an infinite
        # series of blank lines. Throw an error to halt processing,
        # otherwise the task will use 100% CPU and will quickly consume
        # a huge amount of RAM.
        if message_size >= _MAX_MESSAGE_SIZE:
            raise StormIPCException(('Message {} exceeds '
                                     '{:,}'.format(line, _MAX_MESSAGE_SIZE)))

The comment makes it sound like this should be checking the total size of the message, but it's only checking the length of the current line in the message. Furthermore, if we're worried about an infinite series of blank lines, won't the MAX_BLANK_MSGS check get fired before this ever would?

Support topology settings per environment

When submitting topologies to different streamparse environments, there should be a recommended way for users to have settings available to components that are environment specific.

Idea: change how `sparse run` deals with verbose/debug output

A member of Parse.ly's team, @dfdeshom, thought that the behavior of sparse run was a little confusing, because it includes debug output for initialization of the Storm topology, but doesn't include debug output for the Python layer or tuples being emitted throughout the topology. For that, you need to flip on --debug or -d, which will enable Storm's topology debug mode.

His suggestion is that we should either make -d flipped on by default, or make sparse run's default output mostly silent, perhaps only saying things like "Running topology..." and "Topology completed running."

He also thought that we should somehow catch uses of the "print" keyword and issue warnings so users know that you can't actually use "print" within Storm bolts/spouts due to the way ShellBolts work.

It'd be great to get some opinions from the community on this. @dan-blanchard, do you have any thoughts, since you recently tried streamparse out for yourself?

Utility for using Python logging during testing instead of printing JSON messages

For the project I'm working on we have a bunch of unit tests that test a bunch of Bolt methods without actually starting up Storm. However, since we're sending all of debug logging to Storm via streamparse, debugging was becoming difficult because nose2 was not properly capturing the output. In addition to that, the JSON-formatted messages weren't exactly the easiest to read.

To make things a bit easier to work with a whipped up a little patch function that replaces Component.log with a version that will use Python logging for tests. The following is essentially what we have at the top of our test module now.

import logging

from streamparse.base import Component


LOGGING_LEVELS = {'trace': logging.DEBUG,
                  'debug': logging.DEBUG,
                  'info': logging.INFO,
                  'warn': logging.WARN,
                  'error': logging.ERROR}


def log_replacement(self, message, level=None):
    '''
    Replacement for streamparse.Component.log that will use Python logger
    instead of sending messages.  Useful for unit tests.
    '''
    klass = self.__class__
    logger = logging.getLogger(klass.__module__ + '.' + klass.__name__)
    level = LOGGING_LEVELS.get(level, logging.INFO)
    logger.log(level, str(message))

Component.log = log_replacement

I thought this might be useful for other people, but I wasn't sure where exactly you'd want this in streamparse (or if you'd want it at all 😄), so here it is as an issue. If someone wants this and tells me where to put it, I'll create a proper PR.

Separate IPC Unit Testing from Bolt/Spout Tests

Unit tests are great, and the IPC functionality needs to be thoroughly tested. However, having every bolt test spin up a subprocess is making it hard to maintain, especially in light of streamparse.run which is sensitive to the current PYTHONPATH in a way that running a file isn't.

Should use storm.yaml in some way

Storm includes a .yaml file which specifies a bunch of configuration options. The storm CLI reads this file to figure out things like nimbus and worker hosts/ports. It's also important for us to autodetect things like log directories.

The defaults.yaml file in the source tree outlines what's on menu in this file. The Configuration documentation page has some more information and the Clojure codebase has a convenient (read-storm-config) function. We could probably whip together support for this in our CLI tooling. Seems like will be especially important for working with remote clusters where we can't control the config.

Streamparse doesn't play well with python warnings

See traceback. It continues like that until it hits maximum recursion depth and dies.

Traceback (most recent call last):
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/streamparse/bolt.py", line 127, in run
    self.process(tup)
  File "bolts_cassandra_store.py", line 101, in process
    metric, apikey, url, tmsp))
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/cassandra/query.py", line 361, in bind
    return BoundStatement(self).bind(values)
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/cassandra/query.py", line 460, in bind
    self.values.append(col_type.serialize(value))
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/cassandra/cqltypes.py", line 563, in serialize
    warnings.warn("timestamp columns in Cassandra hold a number of "
  File "/data/virtualenvs/kfb-test/lib/python2.7/warnings.py", line 29, in _show_warning
    file.write(formatwarning(message, category, filename, lineno, line))
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/streamparse/ipc.py", line 39, in write
    self.logger.error(line)
  File "/usr/lib/python2.7/logging/__init__.py", line 1166, in error
    self._log(ERROR, msg, args, **kwargs)
  File "/usr/lib/python2.7/logging/__init__.py", line 1258, in _log
    self.handle(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 1268, in handle
    self.callHandlers(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 1315, in callHandlers
    " \"%s\"\n" % self.name)
  File "/data/virtualenvs/kfb-test/local/lib/python2.7/site-packages/streamparse/ipc.py", line 39, in write
    self.logger.error(line)
  File "/usr/lib/python2.7/logging/__init__.py", line 1166, in error
    self._log(ERROR, msg, args, **kwargs)
  File "/usr/lib/python2.7/logging/__init__.py", line 1258, in _log
    self.handle(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 1268, in handle
    self.callHandlers(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 1315, in callHandlers
    " \"%s\"\n" % self.name)

Clean up the examples/ folder

There is some old cruft in this folder. We also have some new examples (e.g. using official Kafka spout via Clojure) that are worth highlighting. I added a nice simple example of an upgraded project that does WordCount using Redis:

https://github.com/Parsely/streamparse/tree/master/examples/redis

I also wonder what our general approach to examples/ should be. Should we just avoid storing these in the repo and instead including the useful ones as "templates" for sparse quickstart? Or relocate to another streamparse-examples repo and have sparse quickstart download its bootstrap stuff directly from Github? I'm not sure what a good approach is here. There must be some other project that we can copy. 😁

Storm / Java / Clojure serialization-upon-submit problem (.ser files)

When we issue sparse submit command, a local Clojure process runs that scripts the StormSubmitter class and talks to our remote nimbus host over an SSH tunnel.

This means that it's technically possible for the content submitted from the local machine to have been built by a Java/Clojure/Storm dependency version that does not match the one running on the server.

Normally, I'd think this doesn't matter -- after all, it's just a Thrift API, it should work as long as the bindings didn't change. The problem, however, is that the nimbus actually receives not just a built JAR with the topology submission, but also two serialized Java files, e.g. stormcode.ser. Because the serialization approach for these files is using plain old Java serialization, Storm nimbus/supervisor/workers will crash when trying to process these files if e.g. there is a Clojure version mismatch (as happened in the 0.9.1-incubating -> 0.9.2-incubating upgrade) or if any classes changed that might have updated their serialVersionId.

The way this exception looks in the case of a Clojure mismatch is as follows:

java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local > class incompatible: stream classdesc serialVersionUID = 270281984708184947, local class > serialVersionUID = 8648225932767613808

I have a couple of ideas about how to fix this issue. One idea is to offer a way to "sync" your local repo to a remote Storm cluster by introspecting that Storm cluster to see what Storm version is running and rewriting the local project.clj file appropriately. Seems a little hacky.

The other way could be to change the way submission works altogether. Instead of using StormSubmitter, it could actually ssh into the nimbus machine and use storm jar using the storm CLI on that machine. Though this has the benefit of probably working with every version of Storm under the sun, it has more moving parts to worry about.

Rename stormlocal.clj and remove from bootstrap project

Since the command-line tool in Clojure will be used for both local & remote topology submission, we should rename it. I'm thinking something as simple as com.parsely/streamparse to match this project.

Also, right now, stormlocal.clj is included in the bootstrap project. However, this should really be source code managed by our Github project (we should make streamparse a hybrid clj lein + py setuptools project) and formally published to clojars. Then we'll just modify the bootstrap project's project.clj to include a dependency to com.parsely/streamparse and use lein run to execute the main CLI handler there.

Investigate automating lein installation with distutils

It might be possible to make lein installation an automated step using distutils. This is worth investigating because it could make setup that much easier. java would still be a requirement, but that can safely be assumed to be handled by operating system package managers. Alternatively, we could just bundle a shell script with the quickstart project that downloads & installs lein.

Remove File-Per-Component Approach

There's no real read for this approach except it made things a bit easier when you could simply run python my_spout.py. However, we should just as easily be able to pull off python -m streamparse.runner spouts.my_spout, or something similar, naming of things being up for debate.

Rename virtualenv_path to virtualenv_root

It's ambiguous because virtualenv_path could either be the path to the specific virtualenv to use, or the root for all virtualenvs (e.g. WORKON_HOME in virtualenvwrapper).

virtualenv_root sounds better to me than virtualenv_home so I'm going to go with that.

Add pre and post submit hooks to sparse submit

Ideal for tasks like "create a DB table if not exists" or "warm up a cache" before the topology submits. After submission, users can do things like send a notification that the topology ran properly or failed.

`sparse tail` subcommand

The sparse tail command should let you tail the log files across your storm worker machines, based on configuration in config.json.

`sparse stats` sub-command for fetching cluster/topo statistics

This Clojure file implements the whole Storm UI, basically:

https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/ui/core.clj

We could use a similar recipe in our Clojure interop project to fetch cluster/topology statistics from the Nimbus over the Thrift interface. We could then expose this in the command-line tool via sparse stats. I'm imaging an ideal implementation might be an htop-like curses interface that live refreshes stats from the cluster, but even some basic pretty-printed tables could be a good start.

Retool streamparse remote install

We currently run a pip install streamparse on all Storm workers when submitting a topology. This sucks because users should be in control of the version of streamparse their topologies use.

We should instead expect streamparse to be a requirement in their virtualenv files and display an error if it isn't.

Perhaps we should also check if the streamparse version they have in their active local venv matches the one they're using to submit and then display a warning.

sparse upload fails on Caught exception: org.apache.thrift7.transport.TTransportException

I tried to sparse upload example from streamparse==0.0.13 (pip)

  1. edited ~/.venv/venv/storm/lib/python2.7/site-packages/streamparse/ext/fabric.py (then removed fabric.pyc):
    in activate_env(env_name=None):
    commented out # env.disable_known_hosts = True
    commented out # env.forward_agent = True
    added env.use_ssh_config = True

(modiffications becouse of fabfile.py looks like doesn't apply changes
After that sparse run works on localhost, sparse submit looks like it work but fails with org.apache.thrift7.transport.TTransportException (log file link at the end of issue)

  1. My config.json
    {
    "library": "",
    "topology_specs": "topologies/",
    "virtualenv_specs": "virtualenvs/",
    "envs": {
    "prod": {
    "user": "ubuntu",
    "nimbus": "monster2",
    "workers": ["monster2"],
    "log_path": "",
    "virtualenv_root": "/home/ubuntu/.venv/venv"
    }
    }
    }
  2. My virtualenvs/wordcount.txt
    streamparse==0.0.13

Logs from run:
http://paste.org/73963

Host is EC2 Amazon machine

$ java -version
java version "1.7.0_51"
Java(TM) SE Runtime Environment (build 1.7.0_51-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.51-b03, mixed mode)

$ lein -version
Leiningen 2.4.2 on Java 1.7.0_51 Java HotSpot(TM) 64-Bit Server VM

`sparse tail` fails when logs are missing (make it fail more gracefully?)

sparse tail fails if any of the bash patterns (worker* supervisor* access* metrics* topo_name_*) is empty. This is probably an indication that something bad has happened, so maybe failure is the correct behavior.

Even if failure is the correct behavior, perhaps we could improve the error message from something more clear than:

 Fatal error: run() received nonzero return code 2 while executing!

Requested: cd /opt/storm/current/logs && ls worker* supervisor* access* metrics* topo_name_*
Executed: /bin/bash -l -c "cd /opt/storm/current/logs && ls worker* supervisor* access* metrics* topo_name_*"

For example, streamparse might tell which log files are missing?

Add unit testing

Most critical sections are those that deal with the multilang protocol: base.py, bolt.py, ipc.py and spout.py.

Of course we should test all the things, but those are the most mission critical parts. Not 100% sure yet what we should do for testing here but will likely involve using multiprocessing to create a fake python spout and bolt and then communicate with those processes using multilang from the Python test process.

Add sparse list command line support

$ sparse list <env_name>

Topology Name            | Status | Uptime | Workers | Executors | Tasks
============================================================================
enricher                 | Active | 52m    | 4       | 52        | 52
something-else           | Active | 1d 52m | 4       | 80        | 80
aggregator               | Killed |        |         |           |

Support Storm's custom serialization support for multi-lang. [msgpack]

A recently merged pull request 84 on the Storm project implements custom serialization support for the multi-lang protocol. This means that you can technically register other faster serializers than JSON for Storm's tuple data, e.g. we could use msgpack.

In addition, thanks to Storm's built-in support for serialization hooks with Kyro, we could actually switch Tuple serialization from Kyro's built-in JVM-based serializer to something like msgpack. This would potentially allow us to avoid the encode/decode cost associated with using ShellBolts and ShellSpouts.

It would be great for someone from the community to do a prototype of this using a streamparse project, and prove its viability. Then we could merge the support into the core library, build a support library in Java for the custom serializers, and offer documentation for how to set up Storm to leverage them. This will likely fit in very well with our lein-based Storm project management.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.