Giter Site home page Giter Site logo

robinhood / faust Goto Github PK

View Code? Open in Web Editor NEW
6.7K 141.0 536.0 8.5 MB

Python Stream Processing

License: Other

Makefile 0.33% Python 98.51% HTML 0.46% Shell 0.05% Cython 0.65%
kafka-streams kafka python asyncio distributed-systems stream-processing

faust's Introduction

image

Deprecation Notice

This library has been deprecated and no longer managed or supported. The current active community project can be found at https://github.com/faust-streaming/faust

Python Stream Processing

Build status coverage BSD License faust can be installed via wheel Supported Python versions. Support Python implementations.

Version

1.10.4

Web

http://faust.readthedocs.io/

Download

http://pypi.org/project/faust

Source

http://github.com/robinhood/faust

Keywords

distributed, stream, async, processing, data, queue, state management

python

# Python Streams # Forever scalable event processing & in-memory durable K/V store; # as a library w/ asyncio & static typing. import faust

Faust is a stream processing library, porting the ideas from Kafka Streams to Python.

It is used at Robinhood to build high performance distributed systems and real-time data pipelines that process billions of events every day.

Faust provides both stream processing and event processing, sharing similarity with tools such as Kafka Streams, Apache Spark/Storm/Samza/Flink,

It does not use a DSL, it's just Python! This means you can use all your favorite Python libraries when stream processing: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++

Faust requires Python 3.6 or later for the new async/await syntax, and variable type annotations.

Here's an example processing a stream of incoming orders:

python

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized: # {"account_id": "3fae-...", amount": 3} class Order(faust.Record): account_id: str amount: int

@app.agent(value_type=Order) async def order(orders): async for order in orders: # process infinite stream of orders. print(f'Order for {order.account_id}: {order.amount}')

The Agent decorator defines a "stream processor" that essentially consumes from a Kafka topic and does something for every event it receives.

The agent is an async def function, so can also perform other operations asynchronously, such as web requests.

This system can persist state, acting like a database. Tables are named distributed key/value stores you can use as regular Python dictionaries.

Tables are stored locally on each machine using a super fast embedded database written in C++, called RocksDB.

Tables can also store aggregate counts that are optionally "windowed" so you can keep track of "number of clicks from the last day," or "number of clicks in the last hour." for example. Like Kafka Streams, we support tumbling, hopping and sliding windows of time, and old windows can be expired to stop data from filling up.

For reliability we use a Kafka topic as "write-ahead-log". Whenever a key is changed we publish to the changelog. Standby nodes consume from this changelog to keep an exact replica of the data and enables instant recovery should any of the nodes fail.

To the user a table is just a dictionary, but data is persisted between restarts and replicated across nodes so on failover other nodes can take over automatically.

You can count page views by URL:

python

# data sent to 'clicks' topic sharded by URL key. # e.g. key="http://example.com" value="1" click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with default=int counts = app.Table('click_counts', default=int)

@app.agent(click_topic) async def count_click(clicks): async for url, count in clicks.items(): counts[url] += count

The data sent to the Kafka topic is partitioned, which means the clicks will be sharded by URL in such a way that every count for the same URL will be delivered to the same Faust worker instance.

Faust supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use modern Python syntax to describe how keys and values in streams are serialized:

python

# Order is a json serialized dictionary, # having these fields:

class Order(faust.Record):

account_id: str product_id: str price: float quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic) async def process_order(orders): async for order in orders: # process each order using regular Python total_price = order.price * order.quantity await send_order_received_email(order.account_id, order)

Faust is statically typed, using the mypy type checker, so you can take advantage of static types when writing applications.

The Faust source code is small, well organized, and serves as a good resource for learning the implementation of Kafka Streams.

Learn more about Faust in the introduction introduction page

to read more about Faust, system requirements, installation instructions, community resources, and more.

or go directly to the quickstart tutorial

to see Faust in action by programming a streaming application.

then explore the User Guide

for in-depth information organized by topic.

Faust is...

Simple

Faust is extremely easy to use. To get started using other stream processing solutions you have complicated hello-world projects, and infrastructure requirements. Faust only requires Kafka, the rest is just Python, so If you know Python you can already use Faust to do stream processing, and it can integrate with just about anything.

Here's one of the easier applications you can make:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

You're probably a bit intimidated by the async and await keywords, but you don't have to know how asyncio works to use Faust: just mimic the examples, and you'll be fine.

The example application starts two tasks: one is processing a stream, the other is a background thread sending events to that stream. In a real-life application, your system will publish events to Kafka topics that your processors can consume from, and the background thread is only needed to feed data into our example.

Highly Available

Faust is highly available and can survive network problems and server crashes. In the case of node failure, it can automatically recover, and tables have standby nodes that will take over.

Distributed

Start more instances of your application as needed.

Fast

A single-core Faust worker instance can already process tens of thousands of events every second, and we are reasonably confident that throughput will increase once we can support a more optimized Kafka client.

Flexible

Faust is just Python, and a stream is an infinite asynchronous iterator. If you know how to use Python, you already know how to use Faust, and it works with your favorite Python libraries like Django, Flask, SQLAlchemy, NTLK, NumPy, SciPy, TensorFlow, etc.

Installation

You can install Faust either via the Python Package Index (PyPI) or from source.

To install using `pip`:

console

$ pip install -U faust

Bundles

Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature.

You can specify these in your requirements or on the pip command-line by using brackets. Separate multiple bundles using the comma:

console

$ pip install "faust[rocksdb]"

$ pip install "faust[rocksdb,uvloop,fast,redis]"

The following bundles are available:

Stores

faust[rocksdb]

for using RocksDB for storing Faust table state.

Recommended in production.

Caching

faust[redis]

for using Redis_ as a simple caching backend (Memcached-style).

Codecs

faust[yaml]

for using YAML and the PyYAML library in streams.

Optimization

faust[fast]

for installing all the available C speedup extensions to Faust core.

Sensors

faust[datadog]

for using the Datadog Faust monitor.

faust[statsd]

for using the Statsd Faust monitor.

Event Loops

faust[uvloop]

for using Faust with uvloop.

faust[eventlet]

for using Faust with eventlet

Debugging

faust[debug]

for using aiomonitor to connect and debug a running Faust worker.

faust[setproctitle]

when the setproctitle module is installed the Faust worker will use it to set a nicer process name in ps/top listings. Also installed with the fast and debug bundles.

Downloading and installing from source

Download the latest version of Faust from http://pypi.org/project/faust

You can install it by doing:

console

$ tar xvfz faust-0.0.0.tar.gz $ cd faust-0.0.0 $ python setup.py build # python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version

With pip

You can install the latest snapshot of Faust using the following pip command:

FAQ

Can I use Faust with Django/Flask/etc.?

Yes! Use eventlet as a bridge to integrate with asyncio.

Using eventlet

This approach works with any blocking Python library that can work with eventlet.

Using eventlet requires you to install the aioeventlet module, and you can install this as a bundle along with Faust:

console

$ pip install -U faust[eventlet]

Then to actually use eventlet as the event loop you have to either use the -L <faust --loop> argument to the faust program:

console

$ faust -L eventlet -A myproj worker -l info

or add import mode.loop.eventlet at the top of your entry point script:

python

#!/usr/bin/env python3 import mode.loop.eventlet # noqa

Warning

It's very important this is at the very top of the module, and that it executes before you import libraries.

Can I use Faust with Tornado?

Yes! Use the tornado.platform.asyncio bridge: http://www.tornadoweb.org/en/stable/asyncio.html

Can I use Faust with Twisted?

Yes! Use the asyncio reactor implementation: https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html

Will you support Python 2.7 or Python 3.5?

No. Faust requires Python 3.6 or later, since it heavily uses features that were introduced in Python 3.6 (async, await, variable type annotations).

I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this?

You may need to increase the limit for the maximum number of open files. The following post explains how to do so on OS X: https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan/

What kafka versions faust supports?

Faust supports kafka with version >= 0.10.

Getting Help

Slack

For discussions about the usage, development, and future of Faust, please join the fauststream Slack.

Resources

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/robinhood/faust/issues/

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

Contributing

Development of Faust happens at GitHub: https://github.com/robinhood/faust

You're highly encouraged to participate in the development of Faust.

Be sure to also read the Contributing to Faust section in the documentation.

Code of Conduct

Everyone interacting in the project's code bases, issue trackers, chat rooms, and mailing lists is expected to follow the Faust Code of Conduct.

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other's private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

This Code of Conduct is adapted from the Contributor Covenant, version 1.2.0 available at http://contributor-covenant.org/version/1/2/0/.

faust's People

Contributors

ask avatar atomsforpeace avatar billaram avatar bobh66 avatar bryantbiggs avatar cdeil avatar cesarpantoja avatar dhruvapatil98 avatar espenalbert avatar fr-ser avatar hustclf avatar jamshedvesuna avatar jerrylinew avatar jsurloppe avatar kataev avatar lsabi avatar lvwerra avatar marcosschroh avatar martinmaillard avatar mihatroha avatar nemosupremo avatar omarrayward avatar patkivikram avatar robertzk avatar rubyw avatar seifertm avatar sheshtawy avatar swist avatar trauter avatar witekest avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

faust's Issues

Changelog should check earliest offset

Since we have tables expiring records in changelog according to log compaction as well as time based expiry, we should also max each offset with earliest available offset

RebalanceInProgressError

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Ran an application that has a relatively large (larger than the simple example) number of messages being transferred. Tried to rebalance the cluster of workers by adding another worker.

Expected behavior

Worker addition/removal should be graceful.

Actual behavior

Getting a RebalanceInProgress Exception from Kafka upon adding/removing workers.

Full traceback

[2017-08-08 22:34:18,945: ERROR] OffsetCommit failed for group referrals-device-check due to group error ([Error 27] RebalanceInProgressError: referrals-device-check), will rejoin
[2017-08-08 22:34:18,945: ERROR] User provided subscription listener <faust.transport.aiokafka.ConsumerRebalanceListener object at 0x1058c5ac8> for group referrals-device-check failed on_partitions_revoked
Traceback (most recent call last):
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 308, in _on_join_prepare
    yield from res
  File "/Users/vineet/faust/faust/transport/aiokafka.py", line 77, in on_partitions_revoked
    cast(Iterable[TopicPartition], revoked))
  File "/Users/vineet/faust/faust/transport/base.py", line 146, in on_partitions_revoked
    await self._on_partitions_revoked(revoked)
  File "/Users/vineet/faust/faust/app.py", line 647, in on_partitions_revoked
    await self.consumer.wait_empty()
  File "/Users/vineet/faust/faust/transport/base.py", line 180, in wait_empty
    await self.commit()
  File "/Users/vineet/faust/faust/transport/base.py", line 221, in commit
    await self._do_commit(tp, offset, meta='')
  File "/Users/vineet/faust/faust/transport/base.py", line 263, in _do_commit
    await self._commit({tp: self._new_offsetandmetadata(offset, meta)})
  File "/Users/vineet/faust/faust/transport/aiokafka.py", line 210, in _commit
    await self._consumer.commit(offsets)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/consumer.py", line 397, in commit
    yield from self._coordinator.commit_offsets(offsets)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/aiokafka/group_coordinator.py", line 583, in commit_offsets
    raise error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: referrals-device-check

Versions

  • Python version - 3.6.1
  • Faust version - 1.0.0 (master)
  • Operating system
  • Kafka version - 0.10.2.1

Windowed table expires memory leak

Currently we do not add window ends for keys into the timestamps heap upon changelog recovery. Hence, keys added upon recovery (and never updated) would never get garbage collected by the faust app.

Note: this isn't an issue with the actual kafka topic as it uses kafka's retention.

Create agent topics appropriately

Agent topics should have the following properties:

  • Should use the partition count from the app configuration (bug) - We should just create the topic using create_topic.
  • Should use the app id (with version) in the name (enhancement)

Make standby buffer size configurable

Currently, while reading from the changelog, we update the faust table only after reading 1000 events. While this is configurable currently, we should be able to have different configurations for changelog reader and standby reader.

Hang on stream wait empty

Our apps are currently using stream_wait_empty=False, so for the release we should either fix the problem or make it the default.

Application unable to recover from Kafka outage/broker failure

This is reproducible by running the simple example locally and taking down a broker and bringing it back.

Looks like the consumer is able to come back up in most cases (except when there is an OffsetCommit error). However the producer stalls. We don't produce any more messages at this point.

These are some of the alternatives in mind:

  • Killing the application when the consumer/producer gets derped
    • For starters we can look at the message sent future for exceptions (these don't get handled by us)
    • See if any catchable exceptions are thrown, I did see a few getting logged
  • Restarting a new producer instance, not sure if that will help
    • In this case it is necessary we still send all the messages in the correct order? (aiokafka seems to have some issue around this)

Should not commit offsets upon crash

I added a random KafkaTimeoutError in App._commit_attached to simulate a KafkaTimeoutError while producing. I catched this error and triggered a crash: self.crash(exc). This should ideally crash the application. Looks like upon crash we end up committing offsets which can be seen in the log trace below. This is bad because a crash means something bad happened and we should not commit offsets.

[2018-03-03 17:51:07,840: INFO]: [^--TableManager]: Restore complete!
[2018-03-03 17:51:07,841: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Released resume partitions lock
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: Attempting to start standbys
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: New assignments handled
[2018-03-03 17:51:07,871: INFO]: [^--Table: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^--Table: country_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: country_to_total]: Starting...
[2018-03-03 17:51:07,913: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 744: k=b'"country_4"' v=b'{"user": "user_434", "country": "country_4", "amount": 1832.8454878058342, "date": "2018-03-04T01:41:26.137348Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 1428: k=b'"country_1"' v=b'{"user": "user_246", "country": "country_1", "amount": 16163.629349934505, "date": "2018-03-04T01:39:52.217088Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 747: k=b'"country_2"' v=b'{"user": "user_114", "country": "country_2", "amount": 7532.398390989598, "date": "2018-03-04T01:41:42.050016Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 32392: k=b'"country_0"' v=b'{"user": "user_239", "country": "country_0", "amount": 24807.034890744173, "date": "2018-03-04T01:40:31.821356Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8506: k=b'"user_220"' v=b'{"user": "user_220", "country": "country_1", "amount": 11502.542493932206, "date": "2018-03-04T01:41:49.087564Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8327: k=b'"user_243"' v=b'{"user": "user_243", "country": "country_0", "amount": 8941.192697487695, "date": "2018-03-04T01:41:51.035743Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8600: k=b'"user_410"' v=b'{"user": "user_410", "country": "country_0", "amount": 22052.862798686932, "date": "2018-03-04T01:40:21.664204Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8973: k=b'"user_396"' v=b'{"user": "user_396", "country": "country_2", "amount": 18885.378500303657, "date": "2018-03-04T01:40:22.124953Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,919: INFO]: [^Worker]: Ready
[2018-03-03 17:51:08,770: INFO]: [^--Consumer]: COMMITTING OFFSETS:
โ”ŒCommit Offsetsโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ TP                                                                                         โ”‚ Offset โ”‚ Metadata โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) โ”‚ 751    โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) โ”‚ 1466   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) โ”‚ 754    โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) โ”‚ 32546  โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=3)                                          โ”‚ 8712   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=1)                                          โ”‚ 8532   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=2)                                          โ”‚ 8805   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=0)                                          โ”‚ 9178   โ”‚          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
[2018-03-03 17:51:11,787: ERROR]: [^-App]: Crashed reason=KafkaTimeoutError()
Traceback (most recent call last):
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2018-03-03 17:51:11,806: INFO]: [^Worker]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^-App]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^--Fetcher]: Stopping...
[2018-03-03 17:51:11,807: INFO]: [^--Consumer]: Consumer shutting down for user cancel.
[2018-03-03 17:51:11,817: INFO]: [^--Fetcher]: -Stopped!
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Stopping...
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Aborting ongoing recovery to start over
[2018-03-03 17:51:11,817: INFO]: [^--Table: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: -Stopped!
[2018-03-03 17:51:11,828: INFO]: [^--Table: country_to_total]: -Stopped!
[2018-03-03 17:51:11,831: INFO]: [^--TableManager]: -Stopped!
[2018-03-03 17:51:11,832: INFO]: [^--TopicConductor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^--TopicConductor]: -Stopped!
[2018-03-03 17:51:11,837: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: Stopping...
[2018-03-03 17:51:11,840: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,842: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: Stopping...
[2018-03-03 17:51:11,864: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--ReplyConsumer]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--ReplyConsumer]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--Consumer]: Stopping...
[2018-03-03 17:51:12,082: INFO]: [^--Consumer]: COMMITTING OFFSETS:
โ”ŒCommit Offsetsโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ TP                                                                                         โ”‚ Offset โ”‚ Metadata โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) โ”‚ 767    โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) โ”‚ 1506   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) โ”‚ 776    โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) โ”‚ 33289  โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=3)                                          โ”‚ 9071   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=1)                                          โ”‚ 8833   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=2)                                          โ”‚ 9274   โ”‚          โ”‚
โ”‚ TopicPartition(topic='withdrawals2', partition=0)                                          โ”‚ 9652   โ”‚          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
[2018-03-03 17:51:12,084: INFO]: LeaveGroup request succeeded
[2018-03-03 17:51:12,085: INFO]: [^--Consumer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: Stopping...
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--MonitorService]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--MonitorService]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-App]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-Website]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--Web]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^---ServerThread]: Stopping...
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Closing server
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for server to close handle
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Shutting down web application
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for handler to shut down
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Cleanup
[2018-03-03 17:51:12,087: INFO]: [^---ServerThread]: -Stopped!
[2018-03-03 17:51:12,087: INFO]: [^--Web]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^-Website]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering service tasks...
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering all futures...
[2018-03-03 17:51:17,227: INFO]: [^Worker]: Closing event loop
[2018-03-03 17:51:17,228: CRITICAL]: [^Worker]: We experienced a crash! Reraising original exception...
Traceback (most recent call last):
  File "/Users/vineet/.virtualenvs/faust/bin/faust", line 11, in <module>
    load_entry_point('faust', 'console_scripts', 'faust')()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/Users/vineet/faust/faust/cli/base.py", line 317, in _inner
    return cmd()
  File "/Users/vineet/faust/faust/cli/worker.py", line 106, in __call__
    **{**self.kwargs, **kwargs})
  File "/Users/vineet/faust/faust/cli/worker.py", line 128, in start_worker
    return worker.execute_from_commandline()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 155, in execute_from_commandline
    self.stop_and_shutdown()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 160, in stop_and_shutdown
    self._shutdown_loop()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 188, in _shutdown_loop
    raise self._crash_reason from self._crash_reason
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

Hard shutdown app on RebalanceError

Upon receiving a rebalance error, we should hard shutdown the application.

Currently, the application tries to recover which results in a zombie assignments.

Cannot update tables in asyncio.wait

I get the following error if I try to update faust Table from an agent if I use await asyncio.gather(fut) or await asyncio.wait([fut]) when the fut updates the table.

Here is the exception traceback:

[2018-03-04 18:39:34,215: ERROR]: Task exception was never retrieved
future: <Task finished coro=<ActivitiesStore.add() done, defined at /Users/vineet/scroll/scroll/store.py:75> exception=RuntimeError('Cannot modify table outside of agent/stream.',)>
Traceback (most recent call last):
  File "/Users/vineet/scroll/scroll/store.py", line 76, in add
    self._add_activity(activity)
  File "/Users/vineet/scroll/scroll/store.py", line 85, in _add_activity
    self.activities[activity.key] = activity
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/utils/collections.py", line 296, in __setitem__
    self.on_key_set(key, value)
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/table.py", line 58, in on_key_set
    self._send_changelog(key, value)
  File "/Users/vineet/.virtualenvs/scroll/lib/python3.6/site-packages/faust/tables/base.py", line 161, in _send_changelog
    raise RuntimeError('Cannot modify table outside of agent/stream.')
RuntimeError: Cannot modify table outside of agent/stream.

Recursion maximum depth reached error resulting in app crash

In trebuchet I found the following stack trace which had made the app crash:

[2018-07-06 22:18:51,007: ERROR]: [^-App]: Crashed reason=RecursionError('maximum recursion depth exceeded in comparison',)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/base.py", line 845, in _on_partitions_revoked
    await self.consumer.wait_empty()
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 305, in wait_empty
    await self.commit()
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in commit
    return await self.force_commit(topics)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 374, in force_commit
    did_commit = await self._commit_tps(commit_tps)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 384, in _commit_tps
    await self._handle_attached(commit_offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 408, in _handle_attached
    pending = await attachments.publish_for_tp_offset(tp, offset)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in publish_for_tp_offset
    for fut in attached
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/app/_attached.py", line 140, in <listcomp>
    for fut in attached
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/topics.py", line 303, in publish_message
    topic, key, value, partition=message.partition)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 668, in send
    topic, value, key=key, partition=partition))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 311, in send
    timestamp_ms=timestamp_ms)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  [Previous line repeated 934 more times]
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/message_accumulator.py", line 252, in add_message
    yield from batch.wait_drain(timeout)
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 301, in wait
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/coroutines.py", line 270, in iscoroutine
    return isinstance(obj, _COROUTINE_TYPES)
  File "/home/robinhood/env/lib/python3.6/abc.py", line 188, in __instancecheck__
    subclass in cls._abc_negative_cache):
  File "/home/robinhood/env/lib/python3.6/_weakrefset.py", line 75, in __contains__
    return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison

Faust consumes from different topics at different and highly variable rates

Actual Behavior

Trebuchet consumes from both the 'goku' and 'adjust_data' events. When running in production, however, the rate at which faust consumes from these topics vary wildly, sometimes even going to zero. See the graph below:

screen shot 2018-03-06 at 3 55 55 pm

What's notable about these graphs is that they are the exact inverses of each other. When one spikes, the other drops, and vice versa, so Faust is overall consuming at approximately a constant rate. Goku events sometimes even goes to zero for a while.

Expected Behavior

Faust should be consuming from both topics at a reasonable, constant rate.

Steps to reproduce

To reproduce this issue locally, set up trebuchet locally (https://github.com/robinhoodmarkets/trebuchet), and then add print("logging event") to line 14 and print("adjust event") to line 24 of trebuchet/agents.py. Also, change the replication factor in trebuchet/config.py to 1. After building and installing, start trebuchet with trebuchet worker -l info. tests/sample contains sample adjust_data and goku events. To produce sample events, run the following two commands on two separate shells at the same time:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/local/adjust_data
kafka-console-producer --broker-list localhost:9092 --topic goku < tests/local/goku

This should start producing to kafka, and you will see a stream of print statements on Trebuchet. If reproduced correctly, you should see times when just adjust events (and no goku/logging events) being processed for a while, before processing both topics for a while (with a lower rate for adjust events), then repeating this cycle, similar to what the graphs above are showing.

Versions

  • Python version: 3.6.4
  • Faust version: 0.9.36
  • Operating system: macOS High Sierra
  • Kafka version: 0.11.0.1

Application keeps crashing causing the group to continuously try and rebalance

Found the following error in the logs:

TypeError: __init__() got an unexpected keyword argument 'resolve'
[2018-06-05 16:33:01,436: ERROR]: ["/home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited\n  handle = self._ready.popleft()"]
[2018-06-05 16:33:01,427: WARNING]: /home/robinhood/python-3.6.3/lib/python3.6/asyncio/base_events.py:1411: RuntimeWarning: coroutine 'Fetcher._fetcher' was never awaited
  handle = self._ready.popleft()

The app (trebuchet) was continuously crashing and trying to rebalance resulting in never actually forming a proper group.

Add support for SASL authentication

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

N/A

Expected behavior

Ideally, there would be the (documented) ability to connect to a remote Kafka broker via SASL authentication.

Actual behavior

I searched through the docs & spent a while greping, and I found no way to authenticate/connect to a remote Kafka broker.

Versions

N/A

Standbys should write to rocksdb and persist offsets

Seems like upon reading standbys we do write to rocksdb but do not persist offsets. This results in us anyway going through the entire thing when promoted to active. We should persist offsets anytime we are writing standbys to rocksdb.

Error while releasing lock

Saw this issue in one of the production boxes. This is not reproducible.

[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,694: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,694: INFO]: [^--Fetcher]: Starting...
[2018-05-09 13:51:45,695: INFO]: [^-App]: Restarted fetcher
[2018-05-09 13:51:45,696: INFO]: [^--TableManager]: Triggered recovery in background
[2018-05-09 13:51:45,697: INFO]: [^--TableManager]: New assignments found
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Waiting for lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Acquired lock to pause partitions
[2018-05-09 13:51:45,697: INFO]: [^--Consumer]: Released pause partitions lock
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Restoring state from changelog topics...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Waiting for restore to finish...
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading all changelogs
[2018-05-09 13:51:45,698: INFO]: [^--TableManager]: Done reading from changelog topics
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Stopped restoring
[2018-05-09 13:51:45,699: INFO]: [^--TableManager]: Restore complete!
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: Attempting to start standbys
[2018-05-09 13:51:45,706: INFO]: [^--TableManager]: New assignments handled
[2018-05-09 13:51:45,706: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-05-09 13:51:45,707: INFO]: [^--Consumer]: Released resume partitions lock
[2018-05-09 13:51:45,729: ERROR]: Unexpected error in fetcher routine
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 328, in _fetch_requests_routine
    yield from task
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 624, in _update_fetch_positions
    node_id, topic_data)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 747, in _proc_offset_request
    response = yield from self._client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
    if not (yield from self.ready(node_id, group=group)):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
    conn = yield from self._get_conn(node_id, group=group)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 369, in _get_conn
    max_idle_ms=self._connections_max_idle_ms)
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 38, in __exit__
    self._lock.release()
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/locks.py", line 201, in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

Retarting the app worked.

The first message read after consumer group rebalance is potentially dropped

Checklist

  • I have included information about relevant versions
    We saw this issue on Faust 1.0.19.

  • I have verified that the issue persists when using the master branch of Faust.
    We have only seen this issue once and haven't reproduced it. But after talking to Vineet, he seems to think that it is possible that with the way that faust uses consumer offsets we may be dropping the first message that we read after a consumer rebalance.

Steps to reproduce

We saw this issue in the combiner which should hold onto messages it receives from a stream for long periods of time without committing it. Unfortunately, we have seen this issue only once but will provide an update if we see it again.

Expected behavior

This agent receives a special type of event which should trigger it to stop consuming and hold onto the event for a long period of time. This event should not be committed so if the consumer group is rebalanced or the agent is restarted, it should receive the message again and continue waiting.

Actual behavior

We saw this special event being dropped with a message "DROPPED MESSAGE ROFF: n". We believe this may be caused by how Faust manages and uses consumer offsets.

Full traceback

2018-06-29 21:04:57,722: INFO]: [^--TableManager]: Stopped restoring
[2018-06-29 21:04:57,723: INFO]: [^--TableManager]: Restore complete!
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: Attempting to start standbys
[2018-06-29 21:04:57,738: INFO]: [^--TableManager]: New assignments handled
[2018-06-29 21:04:57,739: INFO]: [^--Fetcher]: Starting...
[2018-06-29 21:04:57,739: INFO]: [^--TableManager]: Worker ready
[2018-06-29 21:04:57,740: INFO]: [^Worker]: Ready
[2018-06-29 21:04:57,741: INFO]: No forwarders are waiting. Going to sleep.
[2018-06-29 21:04:57,812: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 2: k=None v=b'{"details": {"date": "2018-06-29"}, "id": "d3a8acf5-eb24-461
0-abc1-ce0b6fff8254", "type": "end_of_day"}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 302: k=None v=b'{"order_id": 168, "id": "3e66bc59-49d3-4388-b9f9-81476f7f6
9bb", "key_field": "0", "batch_id": null, "created_at": "2018-06-30T00:02:59.192929+00:00", "updated_at": "2018-06-30T00:02:59.192943+00:00", "
type": "end_of_day", "sent": false, "account_id": null, "details": {"date": "2018-06-29"}, "meta": null}'
[2018-06-29 21:04:57,838: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 29: k=None v=b'{"type": "end_of_day", "created_at": "2018-06-30T00:03:06.8
22620+00:00", "updated_at": "2018-06-30T00:03:06.822649+00:00", "account": null, "batch_id": null, "id": "5817539f-2f34-46d4-b89b-be7be15cfdba"
, "key_field": "0", "order_id": 42, "trigger_id": null, "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,840: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,841: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:04:57,873: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 4069: k=None v=b'{"created_at": "2018-06-30T00:02:59.160290+00:00", "updat
ed_at": "2018-06-30T00:02:59.160321+00:00", "key_field": "8088457b-fdcb-44ed-a829-e59a12100eb0", "batch_id": null, "order_id": 65146, "type": "
end_of_day", "id": "001d8063-fb9d-4325-87c5-87ff11d57d09", "sent": false, "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 908: k=None v=b'{"type":"end_of_day","details":{"date":"2018-06-29"}}'
[2018-06-29 21:04:57,875: INFO]: Got end_of_day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,876: INFO]: Received end of day event on topic mainst_pinnacle_order_updates
[2018-06-29 21:04:57,970: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:04:57,982: INFO]: Fetch offset 0 is out of range for partition TopicPartition(topic='nummus_transfers', partition=18), resetting offset
[2018-06-29 21:04:57,999: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 18: k=None v=b'{"type": "end_of_day", "id": "19c13f26-824b-4b37-8d9e-fd6775e405f1", "details": {"date": "2018-06-29"}}'
[2018-06-29 21:04:58,014: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 304 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:05:57,973: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:06:57,976: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:07:57,978: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:08:57,981: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:09:35,888: INFO]: Got funding event on topic cashier-transfers_fundingevent
[2018-06-29 21:09:38,123: INFO]: [^--Consumer]: COMMITTING OFFSETS:
+Commit Offsets-------------------------------------------------------+--------+----------+
| TP | Offset | Metadata |
+---------------------------------------------------------------------+--------+----------+
| TopicPartition(topic='cashier-transfers_fundingevent', partition=0) | 305 | |
+---------------------------------------------------------------------+--------+----------+
[2018-06-29 21:09:57,983: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:10:57,987: INFO]: Waiting for the day cycle to begin.
[2018-06-29 21:11:57,990: INFO]: Waiting for the day cycle to begin.

Versions

  • Python version
  • Faust version: 1.0.19
  • Operating system:
  • Kafka version
  • RocksDB version (if applicable)

Potential deadlock

I noticed that when new boxes came up for scroll, it when into a heartbeat failure loop which is typical of deadlocks during the rebalance. I was able to confirm that there was a deadlock in the on_partition_revoked as can be seen here:

[2018-06-06 17:38:41,498: INFO]: [^-App]: [Flight Recorder-3] (started at Wed Jun  6 17:37:41 2018) Replaying logs...
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) fetcher.stop()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) topics.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) tables.on_partitions_revoked()
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] (Wed Jun  6 17:37:41 2018) +TABLES: maybe_abort_ongoing_recovery
[2018-06-06 17:38:41,499: INFO]: [^-App]: [Flight Recorder-3] -End of log-

Modify Offsets committed to next expected offset instead of current consumed offset

Steps to reproduce

Regarding the offset that faust commits to Kafka during consumption. Traditionally, most Kafka by consumers, by convention, write the offset it expects next back to the offset storage (zk or kf). Faust seems to write the last committed offset it has consumed back to offset storage. What we're afraid of here is that most tooling is written with this convention in mind in the open source world. One such tool, released by LinkedIn, (https://github.com/linkedin/burrow)[Burrow] for consumer group monitoring.
For reference from the javadoc of the Java Kafka reference consumer:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Offsets and Consumer Position
Kafka maintains a numerical offset for each record in a partition. 
This offset acts as a kind of unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. 
That is, a consumer which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. 
There are actually two notions of position relevant to the user of the consumer.
The position of the consumer gives the offset of the next record that will be given out. 
It will be one larger than the highest offset the consumer has seen in that partition. 
It automatically advances every time the consumer receives data calls poll(long) and receives messages.

The committed position is the last offset that has been saved securely. 
Should the process fail and restart, this is the offset that it will recover to.
 The consumer can either automatically commit offsets periodically; 
or it can choose to control this committed position manually by calling commitSync, which will
 block until the offsets have been successfully committed or fatal error has happened during
 the commit process, or commitAsync which is non-blocking and will trigger 
OffsetCommitCallback upon either successfully committed or fatally failed.

This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.

Expected behavior

Offset written for each partition is the next offset it expects to read from not the offset that it has last consumed.

Actual behavior

Off by one. The current offset consumed is written not the current offset + 1, which equals the next offset the consumer expects to consume.

Assertion error in aiokafka sender routine causing app to stall

The app seems to stall after the following errors. We should just make the app crash instead of stalling so that it is restarted by supervisor:

[2018-05-29 14:08:03,304: ERROR]: Future exception was never retrieved
future: <Future finished exception=NodeNotReadyError('Attempt to send a request to node which is not ready (node id 42).',)>
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 406, in send
    " which is not ready (node id {}).".format(node_id))
kafka.errors.NodeNotReadyError: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,315: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,315: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:03,715: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:03,716: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,117: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,117: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,517: ERROR]: Unable connect to node with id 42: [Errno 111] Connect call failed ('10.1.30.204', 9092)
[2018-05-29 14:08:04,518: WARNING]: Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 42).
[2018-05-29 14:08:04,917: ERROR]: Unexpected error in sender routine
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
    task.result()
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/producer/producer.py", line 419, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 403, in send
    if not (yield from self.ready(node_id, group=group)):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 381, in ready
    conn = yield from self._get_conn(node_id, group=group)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/client.py", line 354, in _get_conn
    assert broker, 'Broker id %s not in current metadata' % node_id
AssertionError: Broker id 42 not in current metadata

Error upon start up on faust==1.0.12

I get the following error when running faust 1.0.12:

Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 685, in _execute_task
    await task
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 292, in _commit_handler
    await self.commit()
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 331, in commit
    return await self.force_commit(topics)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 417, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 345, in force_commit
    did_commit = await self._commit_tps(commit_tps)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 355, in _commit_tps
    await self._handle_attached(commit_offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/consumer.py", line 389, in _handle_attached
    await producer.wait_many(pending)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 600, in wait_many
    return await self._wait_one(coro, timeout=timeout)
  File "/home/robinhood/env/lib/python3.6/site-packages/mode/services.py", line 625, in _wait_one
    f.result()  # propagate exceptions
  File "/home/robinhood/python-3.6.3/lib/python3.6/asyncio/tasks.py", line 304, in wait
    raise ValueError('Set of coroutines/Futures is empty.')
ValueError: Set of coroutines/Futures is empty.

The app crashes after this error.

faust.Record isodates argument doesn't work in 1.0.16

Getting the following error with varys code upon upgrading from faust 1.0.11 to faust 1.0.16:

Traceback (most recent call last):
  File "examples/psql.py", line 7, in <module>
    from varys.shovels.psql import PSQLShovel
  File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/__init__.py", line 1, in <module>
    from .shovel import PSQLShovel, PSQLTask
  File "/Users/vineet/.virtualenvs/varys/lib/python3.6/site-packages/varys/shovels/psql/shovel.py", line 94, in <module>
    class PSQLTask(Task, isodates=True):
TypeError: __new__() got an unexpected keyword argument 'isodates'

App cannot recover from NodeNotReadyError

The app is stuck with the following error logging:

9092)
[2018-06-19 10:18:54,652: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,654: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,654: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,656: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,656: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).
[2018-06-19 10:18:54,658: ERROR]: Unable connect to node with id 102: [Errno 111] Connect call failed ('10.1.65.249', 9092)
[2018-06-19 10:18:54,658: ERROR]: Failed fetch messages from 102: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 102).

These errors are potentially from aiokafka/fetcher. I see a bunch of these errors.

This is with faust 1.0.14

Garbage collect old windowed table entries by partition

Currently we garbage collect windowed tables entries based on the latest timestamp we have seen across all partitions. This should be changed to garbage collect entries for a partition based on the latest timestamp seen for that partition. This is useful in the case where a partition may be lagging behind others.

Sentry + Faust not playing well together

On the latest version of Faust, I get the following error upon enabling Sentry:

```[2018-06-05 16:31:39,400: ERROR]: Sentry responded with an error: __init__() got an unexpected keyword argument 'resolve' (url: https://sentry-internal.robinhood.com/api/38/store/)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/raven/base.py", line 722, in send_remote
    transport = self.remote.get_transport()
  File "/home/robinhood/env/lib/python3.6/site-packages/raven/conf/remote.py", line 71, in get_transport
    self._transport = self._transport_cls(**self.options)
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 174, in __init__
    super().__init__(*args, **kwargs)
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 57, in __init__
    self._client_session = self._client_session_factory()
  File "/home/robinhood/env/lib/python3.6/site-packages/raven_aiohttp.py", line 77, in _client_session_factory
    loop=self._loop)
TypeError: __init__() got an unexpected keyword argument 'resolve'```

I am using the following versions:

raven==6.6.0
raven-aiohttp==0.6.0
aiohttp==3.1.3

High commit latency + slow web server on faust>=0.9.41

Steps to reproduce

Checkout the trebuchet-local branch in trebuchet. After setting up trebuchet, run it with trebuchet worker -l info. Make sure the faust version is >= 0.9.41.

Then, feed data into Kafka with the following command:
kafka-console-producer --broker-list localhost:9092 --topic adjust_data < tests/sample/adjust_data.
Navigate browser to localhost:6066.

Expected behavior

In faust verions 0.9.39 and below, commit latencies stabilize at around 3-4 seconds when data is being consumed. The web server loads very quickly.

Actual behavior

When faust is consuming data, commit latency spikes to ~30 to 60, and the web server (localhost:6066) takes a long time to load.

[Documentation] Overview: Faust vs. Celery

Although I have no experience with Kafka whatsoever, pretty sure the comparison to Celery meant to read

await add.send(value=AddOperation(2, 2))

But I could be wrong...

CommitFailedError when a new generation may have formed

A new generation may have formed in the consumer groups and we see this error:

[2018-05-09 09:01:27,183: ERROR]: [^--Consumer]: Committing raised exception: CommitFailedError('Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member',)
Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 820, in commit_offsets
    loop=self._loop)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 940, in _do_commit_offsets
    raise first_error
kafka.errors.RebalanceInProgressError: [Error 27] RebalanceInProgressError: risk-faust-v15

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/robinhood/env/lib/python3.6/site-packages/faust/transport/drivers/aiokafka.py", line 332, in _commit
    await self._consumer.commit(commitable)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/consumer.py", line 436, in commit
    yield from self._coordinator.commit_offsets(assignment, offsets)
  File "/home/robinhood/env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py", line 825, in commit_offsets
    "Commit cannot be completed since the group has already "
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already rebalanced and may have assigned the partitions to another member

We should die hard on this error.

Table Partition Garbage Collection

Currently we do not garbage collect stale partition (in-memory) once a partition for a table is unassigned from a client. This may cause memory leaks in the event that partitions rotate a lot around different clients.

A proposed approach here is to divide a table internally by partition (implementing the table interface) multiple underlying tables. This might make the O(1) operations of the dictionary O(p) (p = number of partitions) but should make garbage collection fairly quick and simple.

The other approach could be to maintain a partition to key index but that would have a space complexity of O(n) (n = number of keys).

Json load error with string values in topics

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run the following hello_world app:

import faust

app = faust.App('hello-world', url='kafka://localhost:9092')

greetings_topic = app.topic('greetings')

@app.actor(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)

Expected behavior

Print the greetings (strings)

Actual behavior

Got a json.load error

Full traceback

[2017-08-18 15:46:03,252: ERROR] Cannot decode value for key=None (b'sadkjfh'): ValueDecodeError('Expecting value: line 1 column 1 (char 0)',)
Traceback (most recent call last):
  File "/Users/vineet/faust/faust/topics.py", line 353, in deliver
    v = await loads_value(value_type, message.value)
  File "/Users/vineet/faust/faust/serializers/registry.py", line 94, in loads_value
    str(exc)).with_traceback(sys.exc_info()[2]) from None
  File "/Users/vineet/faust/faust/serializers/registry.py", line 88, in loads_value
    await self._loads(serializer, value))
  File "/Users/vineet/faust/faust/serializers/registry.py", line 71, in _loads
    return loads(serializer, data)
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 321, in loads
    return get_codec(codec).loads(s) if codec else s
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 215, in loads
    reversed(self.nodes), s)
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 214, in <lambda>
    lambda s, d: cast(Codec, d)._loads(s),
  File "/Users/vineet/faust/faust/serializers/codecs.py", line 241, in _loads
    return _json.loads(want_str(s))
  File "/Users/vineet/faust/faust/utils/json.py", line 97, in loads
    return json.loads(s, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None

Versions

  • Python version - 3.6.1
  • Faust version - master
  • Operating system
  • Kafka version - 0.10.2.1
  • RocksDB version (if applicable)

Command to delete state for old app version

The app version flag makes it easy to start applications from scratch. However this results in a bunch of state in rocksdb/kafka for older app versions that stick around for a long time. We should try to fix this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.