Giter Site home page Giter Site logo

faust-streaming / faust Goto Github PK

View Code? Open in Web Editor NEW
1.5K 31.0 174.0 9.72 MB

Python Stream Processing. A Faust fork

Home Page: https://faust-streaming.github.io/faust/

License: Other

Makefile 0.31% Python 98.40% Shell 0.14% HTML 0.43% Cython 0.72%
asyncio python-streaming kafka redis distributed-systems

faust's Introduction

faust

Python Stream Processing Fork

python versions version codecov slack Code style: black pre-commit license downloads

Installation

pip install faust-streaming

Documentation

Why the fork

We have decided to fork the original Faust project because there is a critical process of releasing new versions which causes uncertainty in the community. Everybody is welcome to contribute to this fork, and you can be added as a maintainer.

We want to:

  • Ensure continues release
  • Code quality
  • Use of latest versions of kafka drivers (for now only aiokafka)
  • Support kafka transactions
  • Update the documentation

and more...

Usage

# Python Streams
# Forever scalable event processing & in-memory durable K/V store;
# as a library w/ asyncio & static typing.
import faust

Faust is a stream processing library, porting the ideas from Kafka Streams to Python.

It is used at Robinhood to build high performance distributed systems and real-time data pipelines that process billions of events every day.

Faust provides both stream processing and event processing, sharing similarity with tools such as Kafka Streams, Apache Spark, Storm, Samza, Flink,

It does not use a DSL, it's just Python! This means you can use all your favorite Python libraries when stream processing: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++

Faust requires Python 3.6 or later for the new async/await_ syntax, and variable type annotations.

Here's an example processing a stream of incoming orders:

app = faust.App('myapp', broker='kafka://localhost')

# Models describe how messages are serialized:
# {"account_id": "3fae-...", amount": 3}
class Order(faust.Record):
    account_id: str
    amount: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')

The Agent decorator defines a "stream processor" that essentially consumes from a Kafka topic and does something for every event it receives.

The agent is an async def function, so can also perform other operations asynchronously, such as web requests.

This system can persist state, acting like a database. Tables are named distributed key/value stores you can use as regular Python dictionaries.

Tables are stored locally on each machine using a super fast embedded database written in C++, called RocksDB.

Tables can also store aggregate counts that are optionally "windowed" so you can keep track of "number of clicks from the last day," or "number of clicks in the last hour." for example. Like Kafka Streams, we support tumbling, hopping and sliding windows of time, and old windows can be expired to stop data from filling up.

For reliability, we use a Kafka topic as "write-ahead-log". Whenever a key is changed we publish to the changelog. Standby nodes consume from this changelog to keep an exact replica of the data and enables instant recovery should any of the nodes fail.

To the user a table is just a dictionary, but data is persisted between restarts and replicated across nodes so on failover other nodes can take over automatically.

You can count page views by URL:

# data sent to 'clicks' topic sharded by URL key.
# e.g. key="http://example.com" value="1"
click_topic = app.topic('clicks', key_type=str, value_type=int)

# default value for missing URL will be 0 with `default=int`
counts = app.Table('click_counts', default=int)

@app.agent(click_topic)
async def count_click(clicks):
    async for url, count in clicks.items():
        counts[url] += count

The data sent to the Kafka topic is partitioned, which means the clicks will be sharded by URL in such a way that every count for the same URL will be delivered to the same Faust worker instance.

Faust supports any type of stream data: bytes, Unicode and serialized structures, but also comes with "Models" that use modern Python syntax to describe how keys and values in streams are serialized:

# Order is a json serialized dictionary,
# having these fields:

class Order(faust.Record):
    account_id: str
    product_id: str
    price: float
    quantity: float = 1.0

orders_topic = app.topic('orders', key_type=str, value_type=Order)

@app.agent(orders_topic)
async def process_order(orders):
    async for order in orders:
        # process each order using regular Python
        total_price = order.price * order.quantity
        await send_order_received_email(order.account_id, order)

Faust is statically typed, using the mypy type checker, so you can take advantage of static types when writing applications.

The Faust source code is small, well organized, and serves as a good resource for learning the implementation of Kafka Streams.

Learn more about Faust in the introduction introduction page to read more about Faust, system requirements, installation instructions, community resources, and more.

or go directly to the quickstart tutorial to see Faust in action by programming a streaming application.

then explore the User Guide for in-depth information organized by topic.

Local development

  1. Clone the project
  2. Create a virtualenv: python3.7 -m venv venv && source venv/bin/activate
  3. Install the requirements: ./scripts/install
  4. Run lint: ./scripts/lint
  5. Run tests: ./scripts/tests

Faust key points

Simple

Faust is extremely easy to use. To get started using other stream processing solutions you have complicated hello-world projects, and infrastructure requirements. Faust only requires Kafka, the rest is just Python, so If you know Python you can already use Faust to do stream processing, and it can integrate with just about anything.

Here's one of the easier applications you can make::

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

You're probably a bit intimidated by the async and await keywords, but you don't have to know how asyncio works to use Faust: just mimic the examples, and you'll be fine.

The example application starts two tasks: one is processing a stream, the other is a background thread sending events to that stream. In a real-life application, your system will publish events to Kafka topics that your processors can consume from, and the background thread is only needed to feed data into our example.

Highly Available

Faust is highly available and can survive network problems and server crashes. In the case of node failure, it can automatically recover, and tables have standby nodes that will take over.

Distributed

Start more instances of your application as needed.

Fast

A single-core Faust worker instance can already process tens of thousands of events every second, and we are reasonably confident that throughput will increase once we can support a more optimized Kafka client.

Flexible

Faust is just Python, and a stream is an infinite asynchronous iterator. If you know how to use Python, you already know how to use Faust, and it works with your favorite Python libraries like Django, Flask, SQLAlchemy, NLTK, NumPy, SciPy, TensorFlow, etc.

Bundles

Faust also defines a group of setuptools extensions that can be used to install Faust and the dependencies for a given feature.

You can specify these in your requirements or on the pip command-line by using brackets. Separate multiple bundles using the comma:

pip install "faust-streaming[rocksdb]"

pip install "faust-streaming[rocksdb,uvloop,fast,redis,aerospike]"

The following bundles are available:

Faust with extras

Stores

RocksDB

For using RocksDB for storing Faust table state. Recommended in production.

pip install faust-streaming[rocksdb] (uses RocksDB 6)

pip install faust-streaming[rocksdict] (uses RocksDB 8, not backwards compatible with 6)

Aerospike

pip install faust-streaming[aerospike] for using Aerospike for storing Faust table state. Recommended if supported

Aerospike Configuration

Aerospike can be enabled as the state store by specifying store="aerospike://"

By default, all tables backed by Aerospike use use_partitioner=True and generate changelog topic events similar to a state store backed by RocksDB. The following configuration options should be passed in as keys to the options parameter in Table namespace : aerospike namespace

ttl: TTL for all KV's in the table

username: username to connect to the Aerospike cluster

password: password to connect to the Aerospike cluster

hosts : the hosts parameter as specified in the aerospike client

policies: the different policies for read/write/scans policies

client: a dict of host and policies defined above

Caching

faust-streaming[redis] for using Redis as a simple caching backend (Memcached-style).

Codecs

faust-streaming[yaml] for using YAML and the PyYAML library in streams.

Optimization

faust-streaming[fast] for installing all the available C speedup extensions to Faust core.

Sensors

faust-streaming[datadog] for using the Datadog Faust monitor.

faust-streaming[statsd] for using the Statsd Faust monitor.

faust-streaming[prometheus] for using the Prometheus Faust monitor.

Event Loops

faust-streaming[uvloop] for using Faust with uvloop.

faust-streaming[eventlet] for using Faust with eventlet

Debugging

faust-streaming[debug] for using aiomonitor to connect and debug a running Faust worker.

faust-streaming[setproctitle]when the setproctitle module is installed the Faust worker will use it to set a nicer process name in ps/top listings.vAlso installed with the fast and debug bundles.

Downloading and installing from source

Download the latest version of Faust from https://pypi.org/project/faust-streaming/

You can install it by doing:

$ tar xvfz faust-streaming-0.0.0.tar.gz
$ cd faust-streaming-0.0.0
$ python setup.py build
# python setup.py install

The last command must be executed as a privileged user if you are not currently using a virtualenv.

Using the development version

With pip

You can install the latest snapshot of Faust using the following pip command:

pip install https://github.com/faust-streaming/faust/zipball/master#egg=faust

FAQ

Can I use Faust with Django/Flask/etc

Yes! Use eventlet as a bridge to integrate with asyncio.

Using eventlet

This approach works with any blocking Python library that can work with eventlet

Using eventlet requires you to install the faust-aioeventlet module, and you can install this as a bundle along with Faust:

pip install -U faust-streaming[eventlet]

Then to actually use eventlet as the event loop you have to either use the -L <faust --loop> argument to the faust program:

faust -L eventlet -A myproj worker -l info

or add import mode.loop.eventlet at the top of your entry point script:

#!/usr/bin/env python3
import mode.loop.eventlet  # noqa

It's very important this is at the very top of the module, and that it executes before you import libraries.

Can I use Faust with Tornado

Yes! Use the tornado.platform.asyncio bridge

Can I use Faust with Twisted

Yes! Use the asyncio reactor implementation: https://twistedmatrix.com/documents/current/api/twisted.internet.asyncioreactor.html

Will you support Python 2.7 or Python 3.5

No. Faust requires Python 3.8 or later, since it heavily uses features that were introduced in Python 3.6 (async, await, variable type annotations).

I get a maximum number of open files exceeded error by RocksDB when running a Faust app locally. How can I fix this

You may need to increase the limit for the maximum number of open files. On macOS and Linux you can use:

ulimit -n max_open_files to increase the open files limit to max_open_files.

On docker, you can use the --ulimit flag:

docker run --ulimit nofile=50000:100000 <image-tag> where 50000 is the soft limit, and 100000 is the hard limit See the difference.

What kafka versions faust supports

Faust supports kafka with version >= 0.10.

Getting Help

Slack

For discussions about the usage, development, and future of Faust, please join the fauststream Slack.

Resources

Bug tracker

If you have any suggestions, bug reports, or annoyances please report them to our issue tracker at https://github.com/faust-streaming/faust/issues/

License

This software is licensed under the New BSD License. See the LICENSE file in the top distribution directory for the full license text.

Contributing

Development of Faust happens at GitHub

You're highly encouraged to participate in the development of Faust.

Code of Conduct

Everyone interacting in the project's code bases, issue trackers, chat rooms, and mailing lists is expected to follow the Faust Code of Conduct.

As contributors and maintainers of these projects, and in the interest of fostering an open and welcoming community, we pledge to respect all people who contribute through reporting issues, posting feature requests, updating documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in these projects a harassment-free experience for everyone, regardless of level of experience, gender, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery
  • Personal attacks
  • Trolling or insulting/derogatory comments
  • Public or private harassment
  • Publishing other's private information, such as physical or electronic addresses, without explicit permission
  • Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct. By adopting this Code of Conduct, project maintainers commit themselves to fairly and consistently applying these principles to every aspect of managing this project. Project maintainers who do not follow or enforce the Code of Conduct may be permanently removed from the project team.

This code of conduct applies both within project spaces and in public spaces when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by opening an issue or contacting one or more of the project maintainers.

faust's People

Contributors

aoberegg avatar ask avatar billaram avatar bobh66 avatar bryantbiggs avatar cbrand avatar cdeil avatar cesarpantoja avatar dada-engineer avatar dependabot[bot] avatar dhruvapatil98 avatar ekerstens avatar forsberg avatar godtamit avatar jerrylinew avatar jsurloppe avatar lsabi avatar marcosschroh avatar martinmaillard avatar mihatroha avatar omarrayward avatar patkivikram avatar ran-ka avatar richardhundt avatar robertzk avatar rubyw avatar taybin avatar trauter avatar wbarnha avatar yiurule avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

faust's Issues

Fix faust to not resume flow on rebalance if rocksdb file is unavailable

Checklist

  • [ x] I have included information about relevant versions
  • [ x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

On repartition if the rocksdb instance is owned by another worker the requesting worker should wait and not resume its streams

Expected behavior

It should wait until the rocksdb instance is available

Actual behavior

The worker started to process and crashed when it could not access the rocksdb instance

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version
  • Faust version
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

Fix that released all fetch waiters introduced error messages for idle Faust worker

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run a faust worker that waits for messages on a topic with no or very little traffic.

Expected behavior

Worker should sit and do nothing.

Actual behavior

I get ERROR level messages as follows:

[2020-11-17 16:19:03,695] [708969] [ERROR] Exception in callback Fetcher._create_fetch_waiter.<locals>.<lambda>(<Future cancelled>) at /home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py:446
handle: <Handle Fetcher._create_fetch_waiter.<locals>.<lambda>(<Future cancelled>) at /home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py:446> 
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/forsberg/.virtualenvs/ferrostream/lib/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 446, in <lambda>
    lambda f, waiters=self._fetch_waiters: waiters.remove(f))
KeyError: <Future cancelled>

Problem goes away if I revert 361b09d (releasing all fetch waiters).

Problem also seems to appear during rebalance for busy workers, i.e workers where the input topic is busy.

Versions

  • Python version 3.8.5
  • Faust version faust-streaming 18230a7
  • Operating system Linux
  • Kafka version 2.5.0
  • RocksDB version (if applicable) N/A

Recovery hangs in Fetcher

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

We have noticed that on a restart the recovery hangs with the only message from the fetcher and then it just hangs indefinitely

Expected behavior

Recovery should not hang

Actual behavior

Hangs with the following log message
[^----Fetcher]: Starting...

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version 3.8
  • Faust version 1.11.0a1
  • Operating system centos
  • Kafka version 2.4
  • RocksDB version (if applicable) 6.7

Exception thrown on faust-streaming/faust but not on robinhood/faust

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have the same single module app that works correctly with the robinhood version of faust, but does not work with this fork. Namely, the exception quoted in the title on_rebalance() takes 5 positional arguments but 6 were given is thrown on app startup in when faust-streaming but not faust. relevant versions used are 0.4.8 466dbf2 (git master at time of writing) and 1.10.4. The single module app, and exception are as follows:

from typing import Any

import faust


class UserUpdate(faust.Record):
    user_id: int
    event_type: str
    event_data: Any


app = faust.App("faust-demo", broker="kafka://kafka-1:9092")
user_updates = app.topic("user_updates", value_type=UserUpdate, partitions=8)
users = app.Table("user", partitions=8)


@app.agent(user_updates)
async def process_user_updates(updates: faust.StreamT[UserUpdate]):
    async for update in updates:
        user = users.get(update.user_id)
        if user:
            user[update.event_type] = update.event_data
            users[update.user_id] = user
        else:
            users[update.user_id] = {update.event_type: update.event_data}


@app.page("/users")
async def get_users(self, request):
    return self.json(users)

Expected behavior

Exception is not thrown

Actual behavior

Exception is thrown

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/faust/app/base.py", line 1720, in _on_partitions_assigned
    await T(self.tables.on_rebalance)(
  File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.9/site-packages/faust/tables/manager.py", line 194, in on_rebalance
    await T(table.on_rebalance)(
  File "/usr/local/lib/python3.9/site-packages/faust/utils/tracing.py", line 133, in corowrapped
    await_ret = await ret
  File "/usr/local/lib/python3.9/site-packages/faust/tables/base.py", line 571, in on_rebalance
    await self.data.on_rebalance(
TypeError: on_rebalance() takes 5 positional arguments but 6 were given

Versions

  • python: 3.9.1
  • faust: 0.4.8 466dbf2 (master)
  • os: debian (python:3.9 docker image)
  • kafka: 2.7.0

Tests using deprecated "yield_fixture"

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

I have a branch with an update ready to push, but will need to be added as a contributor first.

Steps to reproduce

Tell us what you did to cause something to happen.

I ran tests via ./scripts/tests.

Expected behavior

Tell us what you expected to happen.

I would expect to see few (ideally, 0) deprecation warning while running tests.

Actual behavior

Tell us what happened instead.

Running tests reveals multiple deprecation warnings, including the following:

PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

See the pytest documentation for the deprecation note: https://docs.pytest.org/en/stable/yieldfixture.html

Full traceback

====================================================================== warnings summary ======================================================================
tests/conftest.py:84
  /Users/kristenfoster-marks/Desktop/faust/tests/conftest.py:84: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/conftest.py:199
  /Users/kristenfoster-marks/Desktop/faust/tests/conftest.py:199: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture(autouse=True)

tests/functional/conftest.py:30
  /Users/kristenfoster-marks/Desktop/faust/tests/functional/conftest.py:30: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/functional/conftest.py:64
  /Users/kristenfoster-marks/Desktop/faust/tests/functional/conftest.py:64: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

venv/lib/python3.7/site-packages/colorclass/codes.py:4
  /Users/kristenfoster-marks/Desktop/faust/venv/lib/python3.7/site-packages/colorclass/codes.py:4: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working
    from collections import Mapping

tests/unit/livecheck/conftest.py:46
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/conftest.py:46: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/unit/livecheck/conftest.py:53
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/conftest.py:53: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/functional/web/conftest.py:7
  /Users/kristenfoster-marks/Desktop/faust/tests/functional/web/conftest.py:7: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

faust/livecheck/exceptions.py:43
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:43: PytestCollectionWarning: cannot collect test class 'TestFailed' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
    class TestFailed(LiveCheckError):

faust/livecheck/models.py:49
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:49: PytestCollectionWarning: cannot collect test class 'TestExecution' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
    class TestExecution(Record, isodates=True):

faust/livecheck/models.py:132
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:132: PytestCollectionWarning: cannot collect test class 'TestReport' because it has a __init__ constructor (from: tests/unit/livecheck/test_app.py)
    class TestReport(Record):

tests/unit/livecheck/test_case.py:159
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/livecheck/test_case.py:159: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

faust/livecheck/models.py:132
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/models.py:132: PytestCollectionWarning: cannot collect test class 'TestReport' because it has a __init__ constructor (from: tests/unit/livecheck/test_case.py)
    class TestReport(Record):

faust/livecheck/exceptions.py:39
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:39: PytestCollectionWarning: cannot collect test class 'TestSkipped' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
    class TestSkipped(LiveCheckError):

faust/livecheck/exceptions.py:43
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:43: PytestCollectionWarning: cannot collect test class 'TestFailed' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
    class TestFailed(LiveCheckError):

faust/livecheck/exceptions.py:47
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:47: PytestCollectionWarning: cannot collect test class 'TestRaised' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
    class TestRaised(LiveCheckError):

faust/livecheck/exceptions.py:51
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:51: PytestCollectionWarning: cannot collect test class 'TestTimeout' because it has a __init__ constructor (from: tests/unit/livecheck/test_runners.py)
    class TestTimeout(LiveCheckError):

faust/livecheck/exceptions.py:51
  /Users/kristenfoster-marks/Desktop/faust/faust/livecheck/exceptions.py:51: PytestCollectionWarning: cannot collect test class 'TestTimeout' because it has a __init__ constructor (from: tests/unit/livecheck/test_signals.py)
    class TestTimeout(LiveCheckError):

tests/unit/stores/test_rocksdb.py:77
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:77: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/unit/stores/test_rocksdb.py:82
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:82: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/unit/stores/test_rocksdb.py:212
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/stores/test_rocksdb.py:212: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/unit/tables/test_objects.py:27
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/tables/test_objects.py:27: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

tests/unit/tables/test_wrappers.py:62
  /Users/kristenfoster-marks/Desktop/faust/tests/unit/tables/test_wrappers.py:62: PytestDeprecationWarning: @pytest.yield_fixture is deprecated.
  Use @pytest.fixture instead; they are the same.
    @pytest.yield_fixture()

Versions

  • Python version 3.7.6
  • Faust version 0.4.0
  • Operating system macOS Catalina 10.15.7
  • Kafka version
  • RocksDB version (if applicable)

IllegalStateException on seek to offset of a partition that was removed by a rebalance

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

faust-streaming 0.3.0
aiokafka 0.7.0

In this example, partitions 1, 5 and 10 were originally assigned but after the rebalance partition 1 is no longer assigned. The code tries to seek to the committed offset for partition 1 and crashes:

[2020-12-06 15:31:12,106] [11] [WARNING] Heartbeat failed for group spe-5 because it is rebalancing 
[2020-12-06 15:31:12,106] [11] [INFO] Revoking previously assigned partitions 
+Topic Partition Set----------+
| topic          | partitions |
+----------------+------------+
| spe_requests_5 | {1, 5, 10} |
+----------------+------------+ for group spe-5 
[2020-12-06 15:31:12,108] [11] [INFO] (Re-)joining group spe-5 
[2020-12-06 15:31:12,108] [11] [INFO] [^---Recovery]: Resuming flow... 
[2020-12-06 15:31:12,109] [11] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2020-12-06 15:31:12,121] [11] [INFO] Joined group 'spe-5' (generation 968) with member_id faust-0.3.0-9126a240-a759-4e9f-bca9-cc0ad3f20f7e 
[2020-12-06 15:31:12,128] [11] [INFO] Successfully synced group spe-5 with generation 968 
[2020-12-06 15:31:12,129] [11] [INFO] Setting newly assigned partitions 
+Topic Partition Set----------+
| topic          | partitions |
+----------------+------------+
| spe_requests_5 | {5, 10}    |
+----------------+------------+ for group spe-5 
[2020-12-06 15:31:12,131] [11] [ERROR] [^---Recovery]: Crashed reason=IllegalStateError("No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)") 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
    await T(self._resume_streams)()
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
    consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
    wait_result = await self.wait_first(coro, signal, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
    _committed_offsets = await self.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
    return await self._thread.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
    return await self.call_thread(self._ensure_consumer().seek_to_committed)
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
    self._fetcher.seek_to(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
    self._subscriptions.seek(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
    self._assigned_state(tp).seek(offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
    "No current assignment for partition {}".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)

Expected behavior

Should not attempt to seek on a partition that has been removed. Maybe catch the exception and restart the fetcher?

Actual behavior

Process crashes on exception

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 349, in _restart_recovery
    await T(self._resume_streams)()
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 297, in _resume_streams
    consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
  File "/usr/local/lib/python3.7/site-packages/faust/tables/recovery.py", line 556, in _wait
    wait_result = await self.wait_first(coro, signal, timeout=timeout)
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 534, in perform_seek
    _committed_offsets = await self.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
    return await self._thread.seek_to_committed()
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
{"version": "1.0", "type": "log", "log": {"message": "[^---Recovery]: Crashed reason=IllegalStateError(\"No current assignment for partition TopicPartition(topic='spe_requests_5', partition=1)\")"}, "time": "2020-12-06T15:31:12.136Z", "level": "err"}
    return await self.call_thread(self._ensure_consumer().seek_to_committed)
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.7/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.7/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
    self._fetcher.seek_to(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
    self._subscriptions.seek(tp, offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
    self._assigned_state(tp).seek(offset)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
    "No current assignment for partition {}".format(tp))

Versions

  • Python version - 3.7.3
  • Faust version - 0.3.0
  • Operating system - Centos7
  • Kafka version - 4.0.0-5.3.1
  • RocksDB version (if applicable)

Recovery thread updating standby partition writes in single writes instead of using writeBatch

Checklist

  • [ x] I have included information about relevant versions
  • [x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

This issue is after recovery is complete the standby partition views should be updated in batches but is not due to the check in _may_signal_recovery_end()

Expected behavior

Tell us what you expected to happen.

Actual behavior

Tell us what happened instead.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version 3.8
  • Faust version 0.3.0
  • Operating system Centos75
  • Kafka version 2.4
  • RocksDB version (if applicable) 6.10.3

Allow multiple workers to share rocksdb data dir

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Faust does not support sharing the rocksdb data directory amongst workers. This drastically reduces the applications performance when it has to scale out as on scale out every worker has to rebuild state by replaying partition events from Kafka.

Unable to subscribe to topics by pattern

Steps to reproduce

Created a topic specifying pattern= with a valid regex that should have matched multiple topics on the kafka cluster. Running the worker with -l info shows everything starting up, but the only topic subscription is to the base assignor topic.

This is a known, existing issue dating back to at least July 25, 2019 as detailed in robinhood/faust:
robinhood/faust#390

I suspect, but have not verified, that the issue occurs in 'faust/faust/transport/drivers/aiokafka:subscribe' given the comment XXX pattern does not work :/ from a prior dev.

Still occurs on master

Expected behavior

  • Topic is subscribed to all Kafka topics matching the regex
  • Topic subscription updates when new topic matching regex is created
  • In general, should match behavior documented in aiokafka under 'Topic Subscription by Pattern' here https://aiokafka.readthedocs.io/en/stable/consumer.html

Actual behavior

  • The topic subscribes to nothing, silently

Full traceback

No traceback - silently fails to do anything

Versions

  • Python version = 3.8
  • Faust version = v0.2.1
  • Operating system = whatever python:3.8 docker image uses
  • Kafka version = 2.6
  • RocksDB version (if applicable) = N/A
import faust
import io
import json
from datetime import datetime
import random
import os
import ssl
import sys

from decimal import *

import typing


class Greeting(faust.Record):
    from_name: str
    to_name: str


app = faust.App('faust-dev', broker='kafka://broker:29092')
topic = app.topic('MyGreatTopic-1', value_type=Greeting)
topic2 = app.topic('MyGreatTopic-2', value_type=Greeting)
topic3 = app.topic('MyGreatTopic-3', value_type=Greeting)

@app.task
async def create_topics():
    await topic.maybe_declare()
    await topic2.maybe_declare()
    await topic3.maybe_declare()

regex_topic = app.topic(pattern="^MyGreatTopic-.*$", value_type=Greeting)

@app.agent(regex_topic)
async def hello(greetings):
    async for event in greetings.events():
        greeting = event.value
        print(f'{event.message.topic} says: Hello from {greeting.from_name} to {greeting.to_name}')


@app.timer(interval=1.0)
async def example_sender(app):
    await topic.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )
    await topic2.send(value=Greeting(from_name='Faust 2', to_name='you'))
    await topic3.send(value=Greeting(from_name='Faust 3', to_name='you'))

if __name__ == '__main__':
    app.main()

v0.4.2 regression breaks channels

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

It looks like the regression was added in faust/agents/agent.py, specifically this chunk of code:

        if cur_version >= req_version:
            task = asyncio.Task(
                self._execute_actor(coro, aref),
                loop=self.loop,
                name=f"{str(aref)}-{self.channel.get_topic_name()}",
            )
        else:
            task = asyncio.Task(
                self._execute_actor(coro, aref),
                loop=self.loop,
            )

Steps to reproduce

Technically nothing - yesterday pytest testing code I was writing was running fine, today that same code errored out on every test I was writing - even previously passing tests. I pulled it out an created a minimal set of code to reproduce the error here (agent.test_context() returns a Channel which is how I noticed it):

import faust

app = faust.App('faust-bug-reproduction', broker='kafka://broker:29092')

topic = app.channel()
topic.maybe_declare()
@app.agent(topic)
async def split_sentence(data):
    async for key, value in data.items():
        print(key)
        print(value)

app.k = 1
app.v = 1

# Sends a message to the topic on a timer
@app.timer(interval=2.0)
async def example_sender(app):
    await topic.send(
        value=app.v,
        key=app.k
    )
    app.k *= 2
    app.v *= 3


# When ran standalone, start it up
if __name__ == '__main__':
    app.main()

Expected behavior

Just a bunch of repeating messages of the form:

[2021-01-21 17:38:42,774] [1] [WARNING] 1 
[2021-01-21 17:38:42,774] [1] [WARNING] 1 
[2021-01-21 17:38:44,774] [1] [WARNING] 2 
[2021-01-21 17:38:44,774] [1] [WARNING] 3 
[2021-01-21 17:38:46,774] [1] [WARNING] 4 
[2021-01-21 17:38:46,774] [1] [WARNING] 9 

Actual behavior

Crash on name=f"{str(aref)}-{self.channel.get_topic_name()}. Error:NotImplemented Error('Channels are unnamed topics')

Full traceback

name=f"{str(aref)}-{self.channel.get_topic_name()}",
RuntimeWarning: Enable tracemalloc to get the object allocation traceback      β—£[2021-01-21 17:34:54,688] [1] [ERROR] [^Worker]: Error: NotImplementedError('Channels are unnamed topics') 
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/mode/worker.py", line 276, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
    await self._default_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 790, in _actually_start
    await child.maybe_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
    await self._default_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 790, in _actually_start
    await child.maybe_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
    await self._default_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 783, in _actually_start
    await self.on_start()
  File "/usr/local/lib/python3.8/site-packages/faust/agents/manager.py", line 57, in on_start
    await agent.maybe_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 818, in maybe_start
    await self.start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 759, in start
    await self._default_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
    await self._actually_start()
  File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 783, in _actually_start
    await self.on_start()
  File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 284, in on_start
    await self._on_start_supervisor()
  File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 315, in _on_start_supervisor
    res = await self._start_one(
  File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 253, in _start_one
    return await self._start_task(
  File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 648, in _start_task
    return await self._prepare_actor(
  File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 668, in _prepare_actor
    name=f"{str(aref)}-{self.channel.get_topic_name()}",
  File "/usr/local/lib/python3.8/site-packages/faust/channels.py", line 197, in get_topic_name
    raise NotImplementedError("Channels are unnamed topics")
NotImplementedError: Channels are unnamed topics                    

Versions

  • Python version = 3.8
  • Faust version = 0.4.2
  • Operating system = whatever python3.8-slim docker uses
  • Kafka version

Do you plan to also fork mode, because it seems abandonned.

Hi,

Thank you for your work.

Do you plan to also fork mode ?

It is another project formerly maintained by a robinhood employee and today it looks abandoned.

It is an important dependency of Faust so it could be valuable to keep it updated in parallel of your Faust fork.

If there is anything I can do help you in this process, just ask.

Thank you very much.

Missing sensor state for rebalance #1

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

MWE:

import faust

app = faust.App('myapp')

if __name__ == "__main__":
    app.main()
python test.py worker

Expected behavior

no warnning

Actual behavior

starting➒ 😊
β—¦[2021-01-22 23:50:16,167] [414000] [WARNING] [^-App]: Missing sensor state for rebalance #1 
[2021-01-22 23:50:16,181] [414000] [WARNING] [^-App]: Missing sensor state for rebalance #1 

The same warning also appeared in #44.

Full traceback

Versions

  • Python version 3.8.5
  • Faust version 0.4.3
  • Operating system Fedora 32 5.10.8-100.fc32.x86_64
  • Kafka version 2.13-2.6.0
  • RocksDB version (if applicable)

This was introduced in 0.2.0. It won't happen in 0.1.1. The signal

self.signal_recovery_start.set()

was changed from reset to start in 0.2.0, somehow causing

faust/faust/app/base.py

Lines 1611 to 1612 in fda7e52

finally:
self._rebalancing_sensor_state = None

clears the sensor state.

Recovery hang due to backpressure changes

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Tell us what you did to cause something to happen.

Expected behavior

Tell us what you expected to happen.

Actual behavior

If the stream buffer gets full and the worker is still in recovery, there is a chance of it stuck in the recovery loop forever

Full traceback

This keeps going on forever
+Remaining for active recovery--------------+-----------+-------------+-------------+-----------+
| topic | partition | need offset | have offset | remaining |
+-------------------------------------------+-----------+-------------+-------------+-----------+
| ifos0.5-correletable_keys_table-changelog | 0 | 4574 | 4256 | 318 |
| ifos0.5-kv_table-changelog | 2 | 26857 | 26617 | 240 |
| ifos0.5-triggers_table-changelog | 0 | 3691 | 1347 | 2344 |
+-------------------------------------------+-----------+-------------+-------------+-----------+

Paste the full traceback (if there is any)

Versions

  • Python version
  • Faust version
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

Worker fails to recover table with exactly_once guarantee

Checklist

  • [] I have included information about relevant versions
  • [] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • Application is configured with processing_guarantee="exactly_once"

  • publish 5 messages to a topic, keyed by id

  • repartition the topic using group_by(new_id)

  • increment count on table with keys that are new_id

  • Initially, the worker is up, processes the messages and stores the correct data in the changelog topic.

  • Then I send SIGTERM to stop the worker

  • When restarting the worker, it gets stuck on recovering per the logs below.

Tell us what you did to cause something to happen.

Possibly some issue with the transaction producer and a transaction potentially getting aborted leads to worker not able to recover.

Expected behavior

Tell us what you expected to happen.

Worker recovers and is able to process events.

Actual behavior

Tell us what happened instead.

Worker hangs on recovery.

Full traceback

Log showing this behavior.

[2020-11-29 15:55:45,969] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.04 minute ago) 
[2020-11-29 15:55:50,974] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.13 minute ago) 
[2020-11-29 15:55:55,970] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.21 minute ago) 
[2020-11-29 15:56:00,976] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.29 minute ago) 
[2020-11-29 15:56:05,972] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.38 minute ago) 
[2020-11-29 15:56:10,977] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.46 minute ago) 
[2020-11-29 15:56:15,974] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.54 minute ago) 
[2020-11-29 15:56:20,979] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.63 minute ago) 
[2020-11-29 15:56:25,975] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.71 minute ago) 
[2020-11-29 15:56:30,980] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.79 minute ago) 
[2020-11-29 15:56:35,977] [114] [WARNING] [^---Recovery]: No event received for active tp TP(topic='meteor-submission-count-by-workflow-changelog', partition=0) in the last 30.0 seconds (last event received 1.88 minute ago) 
[2020-11-29 15:56:40,982] [114] [WARNING] [^---Recovery]: Recovery has not flushed buffers in the last 120.0 seconds (last flush was 2.00 minutes ago). Current total buffer size: 5 

Versions

  • Python version: 3.7
  • Faust version: 0.3.0
  • Operating system: Ubuntu 18:04
  • Kafka version: 2.6.0
  • RocksDB version (if applicable)

Question: Is there any way to prevent agents from starting until a task is complete?

Hey there guys, I'm sorry I didn't fill in the template as this is more of a question.

Our application must read some data from a database before starting to process streams, as the loaded data is correlated with certain fields of a message that arrives.

At present, I don't seem to see an elegant way of doing this other than having a global bool and making the agent sleep until this bool changes to indicate loading has completed.

Is there any other way to accomplish this?

Huge thanks
Fotis

'Cannot find stack of coroutine' on wait_empty

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I struggle to reproduce this consistently, as I suspect it is the result of a very tight race condition. I can however say that it tends to occur around rebalances, and generally with agents which return naturally.

The problem stems from the wait_empty method on a consumer, which waits for streams to be consumed before exiting (assuming stream_wait_empty is not overriden).

This method logs human tracebacks of the agents, which in turn relies on mode to produce those tracebacks - raising the error here.

This error is raised if the agent coroutine has coro.cr_frame == None, which is the valid state for a closed coroutine (i.e. one which has finished processing and returned). This could presumably be raised for agen.ag_frame etc too.

I think its valid that mode raises here (although a custom exception would be useful), because the traceback cannot be found.

My current workaround for this is as follows:

from typing import List

from faust import App
from faust.agents import Agent

class CustomAgent(Agent):
    def actor_tracebacks(self) -> List[str]:
        tracebacks: List[str] = []
        for actor in self._actors:
            try:
                tracebacks.append(actor.traceback())
            except RuntimeError as exc:
                if "cannot find stack of coroutine" in str(exc):
                    tracebacks.append(f"Could not find stack of coroutine for actor: {actor}")
                    continue
                raise exc
        return tracebacks


app = App(
    ...,
    Agent=CustomAgent,
    ...,
)

Expected behavior

No exception to be raised when collecting actor traces for the purpose of logging.

Actual behavior

A RuntimeError("cannot find stack of coroutine") is raised.

Full traceback

Error trace
  File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 52, in print_coro_stack
    tb = Traceback.from_coroutine(coro, limit=limit)
         |         |              |           -> 125
         |         |              -> <coroutine object my_agent at 0x7f6d09d361c0>
         |         -> <classmethod object at 0x7f6d0d48ce20>
         -> <class 'mode.utils.tracebacks.Traceback'>
  File \"/usr/local/lib/python3.8/site-packages/mode/utils/tracebacks.py\", line 231, in from_coroutine
    raise RuntimeError('cannot find stack of coroutine')

RuntimeError: cannot find stack of coroutine

Versions

  • Python version: 3.8
  • Faust version: 1.10.4
  • Operating system: Ubuntu 20.10
  • Kafka version: confluentinc/cp-kafka:5.5.1
  • RocksDB version (if applicable): N/A

Recovery crash

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Recovery crashes on a rebalance

Expected behavior

no crash
Tell us what you expected to happen.

Actual behavior

[2020-11-20 20:25:29,427] [22] [ERROR] [^---Recovery]: Crashed reason=IllegalStateError("No current assignment for partition TopicPartition(topic='p999105-enriched-ad', partition=10)")
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 802, in _execute_task
await task
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 349, in _restart_recovery
await T(self._resume_streams)()
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 297, in _resume_streams
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
File "/usr/local/lib/python3.7/dist-packages/faust/tables/recovery.py", line 556, in _wait
wait_result = await self.wait_first(coro, signal, timeout=timeout)
File "/usr/local/lib/python3.7/dist-packages/mode/services.py", line 715, in wait_first
f.result() # propagate exceptions
File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 534, in perform_seek
_committed_offsets = await self.seek_to_committed()
File "/usr/local/lib/python3.7/dist-packages/faust/transport/consumer.py", line 1315, in seek_to_committed
return await self._thread.seek_to_committed()
File "/usr/local/lib/python3.7/dist-packages/faust/transport/drivers/aiokafka.py", line 509, in seek_to_committed
return await self.call_thread(self._ensure_consumer().seek_to_committed)
File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.7/dist-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.7/dist-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/consumer.py", line 868, in seek_to_committed
self._fetcher.seek_to(tp, offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/fetcher.py", line 1162, in seek_to
self._subscriptions.seek(tp, offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/subscription_state.py", line 239, in seek
self._assigned_state(tp).seek(offset)
File "/usr/local/lib/python3.7/dist-packages/aiokafka/consumer/subscription_state.py", line 120, in _assigned_state
"No current assignment for partition {}".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: No current assignment for partition TopicPartition(topic='p999105-enriched-ad', partition=10)
[2020-11-20 20:25:29,436] [22] [WARNING] [^-App]: Missing sensor state for rebalance #13
[2020-11-20 20:25:29,439] [22] [INFO] [^Worker]: Stopping...
[2020-11-20 20:25:29,440] [22] [INFO] [^-App]: Stopping...
[2020-11-20 20:25:29,440] [22] [INFO] [^---Fetcher]: Stopping...
[2020-11-20 20:25:29,441] [22] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2020-11-20 20:25:29,441] [22] [INFO] [^-App]: Flush producer buffer...
[2020-11-20 20:25:29,442] [22] [INFO] [^--TableManager]: Stopping...

Tell us what happened instead.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version 3.7
  • Faust version 0.2.2
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

Rocksdb Exceptions on rebalance

On rebalance the worker should not try to reopen rocksdb from old partitions

Expected behavior

Close rocksdb and not crash

Actual behavior

The worker keeps trying to open rocksdb and eventually times out and crashes

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version
  • Faust version
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

Deploying Faust applications via Circus or Supervisord, not possible due to lack of web fd support?

Hey there guys, great job on the fork.

The Faust docs do reference tools like Circus and Supervisord for use when running in production. If we wish to fire off multiple Faust workers from a single Circus config, we encounter a problem.

Let's take an example Circus config:

[watcher:processor]
cmd = faust -A boo --datadir=/data/worker$(circus.wid) worker -l info --web-transport=unix://...
numprocesses = 2
copy_env = True
use_sockets = True

[socket:web]
host = 127.0.0.1
port = 8888

Circus makes available Unix sockets for use by child applications by opening a Unix socket and presenting the file descriptor (an integer) to be used. However, Faust only seems to support specifying the full path to a Unix socket file which won't work.

Other web servers like Uvicorn allow passing an fd like so:

[watcher:web]
cmd = uvicorn --fd $(circus.sockets.web) example:App
numprocesses = 4
use_sockets = True
copy_env = True

[socket:web]
host = 0.0.0.0
port = 8888

Is it possible to support use of a file descriptor in Faust or alternatively, is there another way that this can be made to work with Circus or Supervisor when running multiple workers?

Thanks a lot
Fotis

Default replication for replicas is 0 if replicas is None, is this a valid choice?

I am not very familiar with Kafka; however, when used with single instance Kafka and Redpanda on my development environment, RPC requests (agent.ask()) causes this error:

[^Worker]: Error: InvalidReplicationFactorError('Cannot create topic: f-reply-664f489d-06e6-4f66-b8ff-14fbb3cd8b67 (38): Replication factor must be greater than 0')

Same error with if app.conf.reply_create_topic is set to True even with topic_replication_factor is set to 1

replication=replicas or 0,

When I change the default value to 1 from 0, RPC works as intended.

I believe I am missing something here? Will this change break things in the production deployments with Kafka clusters?

Most unit tests are not running due to class naming issues

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run unit tests using the scripts/tests script

Expected behavior

All unit tests should run

Actual behavior

75% of the unit tests do not run because they are class-based and the class names are test_Foo instead of Test_Foo

Full traceback

Versions

  • Python version - 3.8
  • Faust version - 0.4.1
  • Operating system - Linux
  • Kafka version N/A
  • RocksDB version (if applicable) N/A

New-offset is off between 0.2.0 and 0.3.0, resulting in reprocessing last record (or many records) on worker restart

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • Pretty much any time you restart a worker, it will replay the last message it received. So if there were messages [0,1,2,3,4] that a worker processed, then restart, it will re-process item 4. This will mess up any analytics that are based on stateful counts. With a trivial case of incrementing a counter in a table, this can consistently reproduced by simply restarting and starting a worker and finding the last id continue to increment even though there were no new messages to the underlying topic.

  • If using the group_by functionality to re-partition a stream, I am finding that it will replay ALL of the messages resulting in much more duplicates than simply +1 to counts.

Expected behavior

  • Do not replay the most recent message.

Actual behavior

  • Replays messages on restart.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version: 3.7
  • Faust version: 0.3.0
  • Operating system: ubuntu 18.04
  • Kafka version: latest
  • RocksDB version (if applicable)

scripts/install script forces use of ./venv virtualenv, impossible to use with virtualenvwrapper

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Check out faust-streaming 40237be, then

./scripts/install
./scripts/lint
./scripts/tests
<runs a lot of tests>
scripts/coverage: 11: codecov: not found

I'm not familiar with codecov, but I guess it should be run only if the environment variable CODECOV_TOKEN is set?

Update metadata in pypi

Currently the metadata that we have for faust-straming in pypi is pointing to the original faust. We need to update the setup.py and probably the MANIFEST.in

Define CI/CD

Which provider shall we use? Our options are Travis, Gitlab, Circle CI

Messages are dropped by the Consumer when buffer full condition occurs

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Running the following code will only process some of the messages generated by the task. The problem is that the stream buffer fills up and removes the topic from the active partitions, and when Consumer.getmany() is iterating over the list of already-received messages, and see that the topic is no longer in the list of active partitions, it drops the remaining messages.

The fix is to include self._buffered_partitions in the check in Consumer.getmany() so that it will continue to process the already-received messages even when the Consumer is no longer actively fetching new messages from the topic because the buffer is full. When the buffer full condition clears, new messages will be processed as usual.

import faust

app = faust.App(
    'hello-world3',
    broker='kafka://localhost:29092',
    value_serializer='raw',
#    stream_buffer_maxsize=1000000,
)

greetings_topic = app.topic('greetings')

@app.agent(greetings_topic)
async def greet(greetings):
    count = 0
    async for greeting in greetings:
        count += 1
        print(count)

@app.task()
async def say_hello():
    count = 0
    for i in range(0, 40000):
        count += 1
        await greetings_topic.send(key='key', value=f'hello{count}')

Expected behavior

The agent should continue to receive messages when the Consumer buffer is considered "full" and backpressure has been applied

Actual behavior

The Consumer stops sending the already-received messages to the agents/Stream when backpressure is being applied to the topic

Full traceback

Versions

  • Python version - 3.8
  • Faust version - 0.4.3
  • Operating system - Linux
  • Kafka version - N/A
  • RocksDB version (if applicable) N/A

Prevent stream buffer overflow by lowering the rate of incoming partitions

Checklist

  • [ x] I have included information about relevant versions
  • [x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Have an agent process slowly. When the number of events on its Queue > stream_buffer_maxsize it causes starvation of other stream processors(agents) processing other topics

Expected behavior

Stop fetching data from the slow topic/partition when the buffer reaches a threshold and then resume it after the pressure drops

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version 3.8
  • Faust version 0.3.0
  • Operating system centos 7.5
  • Kafka version 2.4
  • RocksDB version (if applicable) 6.10.3

Sending batch messages to topic

Hello, just wanted to say that that we really enjoy Faust and much thanks to everyone who helps develop and maintain the project. I've not run into any issues with Faust, but I have a small question just regarding how to more efficiently send messages (eg as a batch) to a topic

I have an agent which processes messages from a topic, whose goal it is to split a single message into many messages and send the new messages to the next topic. The code looks something like this:

@app.agent(input_topic)
async def split_messages(stream):
    async for event in stream:
        # each event contains a list of readings
        for reading in event.readings:
            # reading is an object like {value: 1, id: 34}

            individual_event = IndividualEvent(
                id=reading.id,
                value=reading.value
            )

            await output_topic.send(key=individual_event.id, value=individual_event)

Is it possible to send the new messages more efficiently? I want to do something like this instead:

@app.agent(input_topic)
async def split_messages(stream):
    async for event in stream:
        messages_to_publish = [IndividualEvent(
            id=reading.id,
            value=reading.value
        ) for reading in event.readings]
       
        await output_topic.send_batch(messages_to_publish)

Is there a preferred way to handle this case and is it possible to do this with the map/reduce functions?

Thank you for the help !

Note on web views exposing tables

Just opening an issue for documentation purposes, to note that the documentation has this snippet of text talking about future plans for faust. Given that this is a fork, under new ownership, it seems worth noting so it can be allocated to "actual future plans of the fork" or "thing to remove when updating the documentation".

https://faust.readthedocs.io/en/latest/userguide/tasks.html#http-verbs-get-post-put-delete

Then under "Exposing Tables":

"A frequent requirement is the ability to expose table values in a web view, and while this is likely to be built-in to Faust in the future, you will have to implement this manually for now."

[^---AIOKafkaConsumerThread]: Thread keepalive is not responding ERROR

Checklist

  • [v] I have included information about relevant versions
  • [v] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

setting up a normal streaming application with rocksdb and an internal timer to delete older rows in rocksdb

Expected behavior

I expected that the scipt continues to work normally

Actual behavior

randomly, in the log I found:

[2020-12-21 15:36:51,857] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:34:04,708] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:37:01,560] [60692] [WARNING] Heartbeat failed: local member_id was not recognized; resetting and re-joining group
[2020-12-21 15:37:03,509] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=178.82933128997684)
[2020-12-21 15:38:05,538] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:14,381] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=17.595341868989635)
[2020-12-21 15:38:22,749] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:31,374] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:41,520] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:38:59,004] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:18,977] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:38,909] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:39:50,086] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:22,655] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:41,770] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:40:52,244] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:00,917] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:19,617] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:29,388] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:36,996] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:41:48,920] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:16,437] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:30,207] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:42:38,045] [60692] [WARNING] Timer commit is overlapping (interval=2.8 runtime=117.13050575199304)
[2020-12-21 15:42:47,174] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:43:11,633] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
[2020-12-21 15:43:19,542] [60692] [ERROR] [^---AIOKafkaConsumerThread]: Thread keepalive is not responding...
simulate-rt-odd.service: Main process exited, code=killed, status=9/KILL
simulate-rt-odd.service: Failed with result 'signal'.

so basically the application starts to raise these errors and eventually it is stopped by systemctl (at least I suppose)

Full traceback

for the full traceback see previous step

Versions

  • Python version: Python 3.8.5
  • Faust version: faust-streaming 0.3.1
  • Operating system: Ubuntu 20.04.1 LTS (GNU/Linux 5.4.0-1032-gcp x86_64)
  • Kafka version: 2.4.1
  • RocksDB version (if applicable): 6.15.0 (11/13/2020)

Problems with offset commits with filter/group_by pattern

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have a typical filter/group_by setup. Simplified to the maximum goes along these lines:

import faust


class Value(faust.Record):
    ...


app = faust.App(
    "app",
    broker=...,
    broker_credentials=...,
    consumer_auto_offset_reset="latest",
    topic_partitions=3,
    topic_replication_factor=3,
)

source_topic = app.topic("source-topic", value_type=Value)


@app.agent(source_topic)
async def my_agent(stream):
    async for value in stream.filter(
        lambda v: v > 1000
    ).group_by(
        lambda v: v.some_other_attr,
        name="custom-key",
    ):
        pass

This automatically creates topic app.my_agent-source-topic-custom-key-repartition (I might be slightly wrong here, doesn't matter).

Expected behavior

  • The repartitioned topic receives regular topic offset commits So that they are always 1 or close to 1 on all partitions.
  • Upon every application restarts most recent offset is picked up and processing starts where it left off before shutdown.

Actual behavior

  • Repartitioned topic offset is committed only during brief period right after application start (or restart) and then no offset commits take place whatsoever and the lag grows.
  • Upon every application restart, repartitioned topic is being read all over again.

Observations

  • No tracebacks, no explicit failures.
  • Application keeps processing incoming data just fine, doesn't seem like there was a problem here. Tested on low-traffic staging environment and in isolation on one of the mid-traffic production environments. The point is that the performance is not an issue.
  • The body of the stream operation is literally pass. I have stripped all the functionalities one by one and got to "bare bones" Faust.
  • Source topic receives proper commits, no problem here.
  • Tried pre-generated and auto-generated repartition topics - no difference.
  • Tried original Faust and the fork - no difference.
  • We are using Kafka clusters on Confluent Cloud, observing offsets in their UI. I don't expect major differences if I'd check in CLI.
  • And the most important part: If I remove filter and leave only group_by then the problem is gone, offset commits are tight, back where they should be. The thing is that we really need this filter as only about 30% traffic is eligible for repartition.

Versions

  • Python version 3.8.6
  • Faust version 0.3.1
  • Operating system Debian Buster
  • Kafka version (Can't find it now probably most recent. Tried on Basic and Standard Confluent Cloud clusters)

First message is ignored after resetting Kafka log offsets to the latest offset

Issue copied from robinhood/faust#620

Checklist

Issue can be reproduced on the latest stable commit in master - 8d61a78c76a096597421a8e9db2878d4381dd6a

Steps to reproduce

  • Create fresh python 3.8 virtualenv
  • Install Faust pip install git+https://github.com/robinhood/faust@38d61a78c76a096597421a8e9db2878d4381dd6a
  • Create an app with agent. The example from the quick start is enough.
    import faust
    
    app = faust.App(
        'hello-world',
        broker='kafka://localhost:9092',
        value_serializer='raw',
    )
    
    greetings_topic = app.topic('greetings')
    
    @app.agent(greetings_topic)
    async def greet(greetings):
        async for greeting in greetings:
            print(greeting)
    
  • Launch the worker and let it create Kafka topic.
    faust -A hello_world worker -l warn
    
  • Send a message to the Kafka topic
    kafka-console-producer  --broker-list localhost:9092 --topic greetings
    >hello
    
  • Stop the worker, and reset log offset to the latest
    kafka-consumer-groups --bootstrap-server localhost:9092 --group hello-world --reset-offsets --to-latest --execute --all-topics
    
  • Launch the worker again
    faust -A hello_world worker -l warn
    
  • Produce 2 more messages to the Kafka topic
    kafka-console-producer  --broker-list localhost:9092 --topic greetings
    >hello 1, will be ignored
    >hello 2, will be printed
    

Expected behavior

  • Both messages should be printed in the worker console.
    β”ŒΖ’aΒ΅S† v1.11.0a1────────────────────────────────────────────────────────┐
    β”‚ id          β”‚ hello-world                                             β”‚
    β”‚ transport   β”‚ [URL('kafka://localhost:9092')]                         β”‚
    β”‚ store       β”‚ memory:                                                 β”‚
    β”‚ web         β”‚ http://maxims-macbook-pro.local:6066                    β”‚
    β”‚ log         β”‚ -stderr- (warn)                                         β”‚
    β”‚ pid         β”‚ 49793                                                   β”‚
    β”‚ hostname    β”‚ Maxims-MacBook-Pro.local                                β”‚
    β”‚ platform    β”‚ CPython 3.8.0 (Darwin x86_64)                           β”‚
    β”‚ drivers     β”‚                                                         β”‚
    β”‚   transport β”‚ aiokafka=1.1.6                                          β”‚
    β”‚   web       β”‚ aiohttp=3.6.2                                           β”‚
    β”‚ datadir     β”‚ /Users/shamanu4/projects/faust_test/hello-world-data    β”‚
    β”‚ appdir      β”‚ /Users/shamanu4/projects/faust_test/hello-world-data/v1 β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    starting➒ 😊
    [2020-07-21 12:51:38,541] [49793] [WARNING] b'hello 1, will be ignored'
    [2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
    

Actual behavior

  • First message is missing
    β”ŒΖ’aΒ΅S† v1.11.0a1────────────────────────────────────────────────────────┐
    β”‚ id          β”‚ hello-world                                             β”‚
    β”‚ transport   β”‚ [URL('kafka://localhost:9092')]                         β”‚
    β”‚ store       β”‚ memory:                                                 β”‚
    β”‚ web         β”‚ http://maxims-macbook-pro.local:6066                    β”‚
    β”‚ log         β”‚ -stderr- (warn)                                         β”‚
    β”‚ pid         β”‚ 49793                                                   β”‚
    β”‚ hostname    β”‚ Maxims-MacBook-Pro.local                                β”‚
    β”‚ platform    β”‚ CPython 3.8.0 (Darwin x86_64)                           β”‚
    β”‚ drivers     β”‚                                                         β”‚
    β”‚   transport β”‚ aiokafka=1.1.6                                          β”‚
    β”‚   web       β”‚ aiohttp=3.6.2                                           β”‚
    β”‚ datadir     β”‚ /Users/shamanu4/projects/faust_test/hello-world-data    β”‚
    β”‚ appdir      β”‚ /Users/shamanu4/projects/faust_test/hello-world-data/v1 β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    starting➒ 😊
    [2020-07-21 12:51:39,399] [49793] [WARNING] b'hello 2, will be printed'
    

Versions

Silent Failure / OOM on startup with large topic/partition lag

I have a consumer group where 15 of the partitions are lagged by approximately 16M records each. The faust workers fail on startup due to an OOM condition.

faust-streaming[uvloop,fast,redis,rocksdb]==0.4.1

This issue originated in the faust codebase and is still present in the 0.4.1 faust-streaming release.
robinhood/faust#453

The _add_gap function in faust/transport/consumer.py is being called with a VERY large offset_from / offset_to delta. In the screenshot below a list is populated from 1 to 2,288,752,002 which results in python running out of memory and triggering the failure. Note that in the screenshot I added an exception handler to see if I could trap the error (I could not - python fails silently).

I have observed faust intermittently failing silently on startup with the 1.8.x series and have been able to trace it back to PR #403, commit c0daac1.

Connecting to Confluent Cloud

Hey all, the default Faust package wouldn't allow my app to connect to Confluent Cloud, I'm guessing because it was using the older aiokafka driver. Since this uses the updated version, can I use the aiokafka URI to connect to my conlfuent cloud instance? And if by any chance someone can provide an example, that would be greatly appreciated.

Many thanks!

Stream processing stops due to StaleLeaderEpochCodeError exception

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Run faust-streaming in a pod in a Kubernetes cluster watching Azure Eventhubs through their Kafka interface.

After a period of several days aiokafka will raise StaleLeaderEpochCodeError several times. As this error is not caught anywhere Faust application stops processing streams, but does not crash or restart.

N.B. I didn't see this behaviour when using robinhood/faust, I suspect the change to aiokafka might be part of it.

Something that my intuition suggests is related is that Eventhubs only retain messages in the queue for at most 7 days. So the ****-__assignor-__leader topic will eventually drain and appear empty from Faust's point of view.

I understand that working with Azure Eventhubs rather than proper Kafka is unsupported and appreciate any advice you can give. Even if there isn't a way to stop the exception happening (given my non-standard usage), if there was a way to catch the exception so I could cause the pod to restart that would be fine too.

I have to say that Faust is much better than any of the native Microsoft libraries for dealing with Eventhub message streams.

Expected behavior

Faust pods continue to process Eventhub message streams without interruption.

Actual behavior

StaleLeaderEpochCodeError exception occurs somewhat randomly after several days of operation but the app does not crash and so the Kubernetes pod does not restart.

Full traceback

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 66, in run
    self.service._start_thread()
  File "/usr/local/.venv/lib/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
    self.thread_loop.run_until_complete(self._serve())
> File "/usr/local/.venv/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/consumer.py", line 550, in commit
    await self._coordinator.commit_offsets(assignment, offsets)
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 964, in commit_offsets
    raise err
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 953, in commit_offsets
    await asyncio.shield(
  File "/usr/local/.venv/lib/python3.8/site-packages/aiokafka/consumer/group_coordinator.py", line 1066, in _do_commit_offsets
    raise first_error
kafka.errors.StaleLeaderEpochCodeError: [Error 13] StaleLeaderEpochCodeError

Versions

  • Python version; 3.8.6
  • Faust version; faust-streaming 0.4.0
  • Operating system; Debian Slim in Python Docker image
  • Kafka version; Azure Eventhubs
  • RocksDB version (if applicable); librocksdb5.17

Faust release version not being updated on GitHub

Checklist

  • [*] I have included information about relevant versions
  • [N/A] I have verified that the issue persists when using the master branch of Faust.

Currently GitHub shows v0.2.1 as the latest release as shown in the attached screenshot.

image

IllegalStateError causes consumer crash when commit is pending and a rebalance happens

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When the Consumer has requested a commit on one or more partitions and a rebalance is triggered before the aiokafka consumer can perform the commit, it is possible that one or more of the partitions being committed is revoked by the rebalance and not re-assigned. This will cause an IllegalStateError exception from the kafka-python client code due to the attempt to commit a partition that isn't owned by the consumer.

The fix is to filter the commit partitions again in the aiokafka consumer commit code to ensure that they were not revoked during the await break.

Expected behavior

Commits after a rebalance should not cause an IllegalStateError exception

Actual behavior

Intermittent IllegalStateError exceptions after a rebalance

Full traceback

[2021-01-21 14:59:16,201] [9] [ERROR] [^---AIOKafkaConsumerThread]: Got exception: IllegalStateError("Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned")
Current assignment: '\n+Topic Partition Set-------+\n| topic     | partitions   |\n+-----------+--------------+\n| spe_kxe_5 | {25, 52, 67} |\n+-----------+--------------+'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 523, in _commit
    await consumer.commit(aiokafka_offsets)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 548, in commit
    "Partition {} is not assigned".format(tp))
kafka.errors.IllegalStateError: IllegalStateError: Partition TopicPartition(topic='spe_kxe_5', partition=40) is not assigned

Versions

  • Python version 3.7
  • Faust version 0.4.6
  • Operating system - CentOS
  • Kafka version N/A
  • RocksDB version (if applicable) N/A

newer version of open tracing

Checklist

  • [ X ] I have included information about relevant versions
  • [ X ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Install faust together with a current version of jaeger-client (which requires opentracing>=2.1)

Expected behavior

No version conflict should arise

Actual behavior

faust claims to be unable to work with opentracing>=2.0.0

Full traceback

ERROR: jaeger-client 4.3.0 has requirement opentracing<3.0,>=2.1, but you'll have opentracing 1.3.0 which is incompatible.

Versions

  • Python version
    3.7
  • Faust version
    faust-streaming-0.3.0
  • Operating system
    linux

According to this issue in the original project just bumping the version number here should be enough to enable opentracing>2.0.0.

The reason it was not upgraded in the original project was a possible incompatibility in robinhoods closed source internal tracing framework.

I tried it locally and all tests work.

Rebalance can leave some workers with no partitions

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

When the number of partitions for an agent topic is not evenly divisible by the number of workers, the PartitionAssignor can leave one or more workers with no partitions assigned. This wastes CPU and memory resources, as well as causing other workers to carry a heavier load and potentially reduces throughput.

The initial deployment will distribute the partitions across all workers. For example, a topic with 100 partitions will be spread across 40 workers with 20 workers having three partitions and 20 workers having 2 partitions. The CopartitionedAssignor will calculate the (maximum) capacity for each worker to be 3 partitions, using the formula ceil(num_partitions / num_workers).

Now if one worker gets rebooted or leaves the group for any reason, a rebalance is triggered and the 2 or 3 partitions for that worker get moved to other workers that have 2 partitions, leaving 22 or 23 workers with 3 partitions and 17 or 18 workers with 2 partitions (100 partitions across 39 workers).

When the rebooted worker recovers and rejoins the group, it will probably not receive any partitions because there are no "extra" partitions on any of the workers. The maximum capacity is still 3, no worker has more than 3 partitions, so no partitions are "available" for assignment. Rebooting the worker has no impact for the same reason.

The worker with no partitions will leave the consumer group after 5 minutes as the aiokafka Fetcher has been idle due to no assignment, which means that future rebalances of the group will NOT include this consumer/worker, and it will be idle forever, or until the group is redeployed.

Expected behavior

Partitions should be assigned to all available workers, as balanced as possible.

Actual behavior

Workers can receive no partition assignment and leave the group.

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version - 3.7
  • Faust version - 0.4.6
  • Operating system - Centos
  • Kafka version N/A
  • RocksDB version (if applicable) N/A

SetManager.flush timer overlapping when using SetGlobalTable

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Define a SetGlobalTable and start a faust worker.

my_table= app.SetGlobalTable('some_name', start_manager=True)

Expected behavior

The worker runs without any warnings.

Actual behavior

A warning appears every few seconds:

faust_1  | [2021-01-20 14:49:11,275] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0038752279942855) 
faust_1  | [2021-01-20 14:49:19,277] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.003296997048892) 
faust_1  | [2021-01-20 14:49:27,277] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0008818010101095) 
faust_1  | [2021-01-20 14:49:35,279] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0001070860307664) 
faust_1  | [2021-01-20 14:49:39,280] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000795218977146) 
faust_1  | [2021-01-20 14:49:43,282] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0005710140103474) 
faust_1  | [2021-01-20 14:49:47,283] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000350568909198) 
faust_1  | [2021-01-20 14:49:51,285] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0012008629273623) 
faust_1  | [2021-01-20 14:49:55,286] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.00038573902566) 
faust_1  | [2021-01-20 14:49:59,289] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.001224332023412) 
faust_1  | [2021-01-20 14:50:03,290] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000285888905637) 
faust_1  | [2021-01-20 14:50:07,292] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0006646789843217) 
faust_1  | [2021-01-20 14:50:15,295] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.0010148769943044) 
faust_1  | [2021-01-20 14:50:19,296] [1] [WARNING] Timer SetManager.flush is overlapping (interval=2.0 runtime=2.000219620997086) 

Is the sleep timer here neccessary?

await self.sleep(sleep_time)

That sleep should already be handled by itertimer right?

Versions

  • Python version: 3.8.6
  • Faust version: 0.4.1
  • Operating system: Ubuntu 20.04
  • Kafka version: 2.6.0
  • python-rocksdb version: 0.7.0

InvalidReplicationFactorError is raised if reply_create_topic is set

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Create faust app and set reply_create_topic=True flag.

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
)

greetings_topic = app.topic('greetings', value_type=str)


@app.agent(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)
        yield 'resp ' + greeting


@app.timer(5)
async def produce():
    for i in range(100):
        resp = await print_greetings.ask(value=f'hello {i}')
        print(resp)

if __name__== '__main__':
    app.main()

Expected behavior

Reply topic is created.

Actual behavior

InvalidReplicationFactorError is raised.

Full traceback

hello-word_1  | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9'
hello-word_1  | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.')
hello-word_1  | Traceback (most recent call last):
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
hello-word_1  |     await task
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped
hello-word_1  |     return await task()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer
hello-word_1  |     await fun(*args)
hello-word_1  |   File "/main.py", line 31, in produce
hello-word_1  |     resp = await print_greetings.ask(value=f'hello {i}')
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask
hello-word_1  |     await app._reply_consumer.add(p.correlation_id, p)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add
hello-word_1  |     await self._start_fetcher(reply_topic)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher
hello-word_1  |     await topic.maybe_declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare
hello-word_1  |     await self.declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare
hello-word_1  |     await producer.create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic
hello-word_1  |     await cast(Transport, self.transport)._create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic
hello-word_1  |     await wrap()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic
hello-word_1  |     raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
hello-word_1  | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.

Workaround

class MyTopic(faust.Topic):

    def __init__(self, *args, **kwargs):
        if kwargs.get('replicas') == 0:
            kwargs['replicas'] = 1
        super().__init__(*args, **kwargs)

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
    Topic=MyTopic
)

Proposed solution

I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1 or replicas=None will solve the problem

Versions

  • Python version 3.8
  • Faust version 0.4.1
  • Operating system
    tested on python:3.8 docker container
root@0d7a8b87eab1:/# cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
  • Kafka version bitnami/kafka:2.7.0
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
  • RocksDB version (if applicable) None

RocksDB recovery crashes with KeyError

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

We created a simple test for Faust streaming and ran two or more parallel workers, each using own RocksDB.

Under high traffic, stopping one worker sometimes triggers crashes in the other one.

Expected behavior

Workers can be stopped, and restarted, and even in the unfortunate even they crash, it should not impact other workers operating in parallel.

Actual behavior

Other workers may crash.

Tell us what happened instead.

Full traceback

[^---Recovery]: Crashed reason=KeyError(TP(topic='<our-topic>', partition=1)) 
Traceback (most recent call last):
  File "<our venv>/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
    await task
  File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 375, in _restart_recovery
    await self._wait(
  File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 562, in _wait
    wait_result = await self.wait_first(coro, signal, timeout=timeout)
  File "<our venv>/lib/python3.8/site-packages/mode/services.py", line 715, in wait_first
    f.result()  # propagate exceptions
  File "<our venv>/lib/python3.8/site-packages/faust/tables/recovery.py", line 666, in _build_offsets
    new_value = earliest[tp]
KeyError: TP(topic='<our topic>', partition=1)

Versions

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.