Giter Site home page Giter Site logo

broadcaster's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

broadcaster's Issues

MemoryBackend does not get event from asyncio.Queue, when app is running from PyCharm

Consider the following code from example, which works fine when running the file with command-line:
uvicorn main:app

The same code has issues, when running with with PyCharm.
The issue occurs at broadcaster/_backends/memory.py:30, where is cannot get an event from asyncio.Queue.
I'm using python3.8.

from broadcaster import Broadcast
from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates


broadcast = Broadcast("memory:///")
templates = Jinja2Templates("templates")


async def homepage(request):
    template = "index.html"
    context = {"request": request}
    return templates.TemplateResponse(template, context)


async def chatroom_ws(websocket):
    await websocket.accept()
    await run_until_first_complete(
        (chatroom_ws_receiver, {"websocket": websocket}),
        (chatroom_ws_sender, {"websocket": websocket}),
    )


async def chatroom_ws_receiver(websocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="chatroom", message=message)


async def chatroom_ws_sender(websocket):
    async with broadcast.subscribe(channel="chatroom") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)


routes = [
    Route("/", homepage),
    WebSocketRoute("/", chatroom_ws, name='chatroom_ws'),
]


app = Starlette(
    routes=routes, on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect],
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app)

Request: more documentation (having trouble grokking)

Sorry for the newbishness, I'm having trouble understanding how to use the project. Main points I struggle with:

  • Coming from FastAPI/WebSockets, the docs have Depends(...) injected to routes. They use fastapi.app.websocket rather than starlette.routing.WebSocketRoute. Wondering what the simple translation from Starlette->FastAPI is to allow Depends() in the functions?
  • If it's as simple as replacing WebSocketRoute("/", ...) with app.websocket("/"), where do I put my dependencies? Is it at the async def chatroom_ws(websocket, <add deps here>) level?
  • Where/how do I handle on_connect, on_disconnect?
  • What's going on with run_until_first_complete? I can't find documentation in Starlette, and when I dive into asyncio.wait(..., FIRST_COMPLETED) it seems to imply the first task to complete is solid, and what's left is cancelled? Which confuses me on the setup of receiver/sender, don't we want both?
  • Looking at all the above, fastapi-websocket-broadcast has a pretty solid kitchen-sink example of subclassing WebSocketEndpoint with on_connect|disconnect|message, keeping websockets around for later use, etc. Maybe a more robust broadcaster sample in /examples could be in order for us FastAPI newbies?

Create documentation

Similar to other encode projects. Feel free to think how the best organization would be.

set up password, NOAUTH Authentication required

Hello, and thank you for building this great module.
I would like to use it on heroku with the heroku-redis addon, but when I try to send a message with the websocket I have this error asyncio_redis.exceptions.ErrorReply: NOAUTH Authentication required.
and I think it's because I need to config the password but I don't know how to do it, maybe at initialization * broadcast = Broadcast (REDIS_URL, password = "xxx") *, but I don't know how to do it as there is no documentation on this.
Thank you in advance for your help

Postgres: Cannot perform operation: another operation is in progress

Using broadcaster w/ fastapi and seeing an exception when using broadcaster via websockets.

Relevant parts:

    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
    # ...
    'cannot perform operation: another operation is in progress')

Full trace:

INFO:     ('127.0.0.1', 52654) - "WebSocket /viewers" [accepted]
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File ".../site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 154, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File ".../site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
    return await self.app(scope, receive, send)
  File ".../site-packages/fastapi/applications.py", line 179, in __call__
    await super().__call__(scope, receive, send)
  File ".../site-packages/starlette/applications.py", line 111, in __call__
    await self.middleware_stack(scope, receive, send)
  File ".../site-packages/starlette/middleware/errors.py", line 146, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/exceptions.py", line 58, in __call__
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 566, in __call__
    await route.handle(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 283, in handle
    await self.app(scope, receive, send)
  File ".../site-packages/starlette/routing.py", line 57, in app
    await func(session)
  File ".../site-packages/fastapi/routing.py", line 228, in app
    await dependant.call(**values)
  File "./main.py", line 198, in events_ws
    (viewers_ws_sender, {"websocket": websocket}),
  File ".../site-packages/starlette/concurrency.py", line 18, in run_until_first_complete
    [task.result() for task in done]
  File ".../site-packages/starlette/concurrency.py", line 18, in <listcomp>
    [task.result() for task in done]
  File "./main.py", line 204, in viewers_ws_receiver
    await broadcast.publish(channel="viewers", message=message)
  File ".../site-packages/broadcaster/_base.py", line 72, in publish
    await self._backend.publish(channel, message)
  File ".../site-packages/broadcaster/_backends/postgres.py", line 25, in publish
    await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
  File ".../site-packages/asyncpg/connection.py", line 297, in execute
    _, status, _ = await self._execute(query, args, 0, timeout, True)
  File ".../site-packages/asyncpg/connection.py", line 1444, in _execute
    with self._stmt_exclusive_section:
  File ".../site-packages/asyncpg/connection.py", line 1891, in __enter__
    'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
INFO:     ('127.0.0.1', 52655) - "WebSocket /events" [accepted]

Doing this via code that looks something like this, where I think the lock is just slowing things down and exposing the contention.

@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
    await websocket.accept()
    await run_until_first_complete(
        (viewers_ws_receiver, {"websocket": websocket}),
        (viewers_ws_sender, {"websocket": websocket}),
    )

my_lock = threading.Lock()

async def viewers_ws_receiver(websocket: WebSocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="viewers", message=message)

async def viewers_ws_sender(websocket: WebSocket):
    async with broadcast.subscribe(channel="viewers") as subscriber:
        async for event in subscriber:
            counter = 0
            with my_lock:
                # do something with event.message
                counter = ...
            await websocket.send_json({"viewers": counter})

Update: Refactored down to a single websocket and not using a lock for anything and saw this exception again. ๐Ÿคท

Question regarding `__aenter__` and `__aexit__` in class `Broadcast`

Hi I'm developing an OSS using FastAPI and it's involved with WebSocket functionality, which is essentially a PUB/SUB model to broadcast some event to the connected clients. Following the recommendation on FastAPI documentation, I find this neat and elegant yet robust library that really helps me in developing my code base. I'm learning this code base and trying to make a contribution if possible.

Currently I pretty much get the general idea of the code base but this one thing:

Why do we handle the connect/disconnect of backend via aysnc context manager magic methods in class Broadcast? In the example application in the README, the instance method Broadcast.subscribe is the only place async with statement is used. And instantiation of a class doesn't call __aenter__ or __aexit__ methdos.

The async/await Python sometimes still makes me scratch my head and I hope someone can give me some guidance on this part of the code base. Thanks!

can't define redis database number

I review source code of backends related to connect redis backend
I can't connect to specific db in redis and this ignore in codebase of parse URI definition

in broadcaster/_backends/redis.py initialize RedisBackend by port and host , in this section must be define database number

redis://HOST[:PORT][?db=DATABASE[&password=PASSWORD]]

db is missing on URI define

Test hangs with Redis backend in Starlette/FastAPI

Using the testing example from the Starlette docs with a route that uses a Redis websocket just hangs indefinitely:

def test_websocket(client):
        with client.websocket_connect("/ws") as websocket:
            websocket.send_text("Hello WebSocket")
            data = websocket.receive_text()
            assert data == "Hello WebSocket"

this however, works fine:

def test_websocket(client):
        with client.websocket_connect("/ws") as websocket:
            assert True

maybe it's related to this #2 (comment) and #42?

I think it's the same with a memory backend. Has anyone successfully tested a Starlette websocket route?

UPDATE: I tried running ./scripts/test locally from the broadcaster directory and it actually hangs for me on the redis test. Anyone else getting that?

Support for pattern based topic in kafka

A single consumer cannot read from a pattern and a normal topic.
So at least 2 consumers must be created. This can be done internally, no problem here.

Now... how should the interface look?

async with broadcast.subscribe(channel="logs/*") as subscriber:

If we want to use the current channel parameter, then a Consumer should be created each
time .subscribe is called (if not mistaken).

async with broadcast.subscribe(pattern="logs/*") as subscriber:

In this case we can have 2 consumers only, but we'd have a pattern keyword
which may not be supported by every backend.

Support `history`

Something that'd be really valuable would be to allow subscriptions to start up with some initial history.

For example:

  • Subscribe to "logs", with the last 1000 records, so that a front-end can display a log console, that's populated with the latest set of logs.
  • Subscribe to "http-stats" with the last 24 hours of records, so that a front-end can display a realtime status monitoring graph, that's populated with a window of the latest history.

Not all backends will support this, but Redis Streams, and Apache Kafka do. I think the implementation may be a bit fiddly to get right, but before we even get there let's discuss the API.

One option here might be...

async with broadcaster.subscribe("logs", history=1000):
    ...

async with broadcaster.subscribe("http-stats", history=timedelta(hours=24)):
    ...
  • Are there any API alternatives we ought to be considering?
  • Would Kafka and Redis Streams properly support both the int and the timedelta styles, and what commands would we need to be mapping them onto?

Please release lib version

I need the functionality of a feature that sends the redis url password to asyncio_redis version 0.2.0, but I cloned your repository and saw that it is already available. I have an app in production that really needs this. I'll end up having to open a fork of your app or use another lib if that doesn't work.

0.2.0:

    async def connect(self) -> None:
        self._pub_conn = await asyncio_redis.Connection.create(self._host, self._port)
        self._sub_conn = await asyncio_redis.Connection.create(self._host, self._port)
        self._subscriber = await self._sub_conn.start_subscribe()

current version repo:

    async def connect(self) -> None:
        kwargs = {"host": self._host, "port": self._port, "password": self._password}
        self._pub_conn = await asyncio_redis.Connection.create(**kwargs)
        self._sub_conn = await asyncio_redis.Connection.create(**kwargs)
        self._subscriber = await self._sub_conn.start_subscribe()

How can I utilize broadcaster to subscribe multiple channel?

I know a way to have a dynamic channel by having chatroom id in URL, /ws/{chat_id}

I want to send {'type': 'subscribe', 'chat_id': 'dsfjkhdskjfhdskjhfkjdsahfa'}

@app.websocket("/ws")
async def ws_manager(websocket: WebSocket):
    await websocket.accept()
    async for message in websocket.iter_json():
        if message['type'] == 'subscribe':
             chat_id = message['chat_id']
             async with broadcast.subscribe(channel=chat_id) as subscriber:
                   async for event in subscriber:
                       await websocket.send_text(event.message)

how can i have multiple subscriber for same socket connection?

No notifications received with Postgres as backend

I'm trying to get broadcaster work with FastAPI and a PostgreSQL database with notifications enabled.

Here is the code I use

FastAPI (test_broadcaster.py)

from broadcaster import Broadcast

broadcast = Broadcast("postgres://<username>:<password>@localhost:5432/<database>")

async def broadcast_ws_receiver(websocket):
    async for message in websocket.iter_text():
        await broadcast.publish(channel="account", message=message)

async def broadcast_ws_sender(websocket):
    async with broadcast.subscribe(channel="account") as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

FastAPI (router.py)

from fastapi.routing import APIRouter
from fastapi.websockets import WebSocket
from fastapi.concurrency import run_until_first_complete

from test_broadcaster import broadcast_ws_sender, broadcast_ws_receiver

router = APIRouter()

@router.websocket("/broadcast")
async def broadcast_ws(websocket: WebSocket):
    await websocket.accept()
    
    await run_until_first_complete(
        (broadcast_ws_receiver, {"websocket": websocket}),
        (broadcast_ws_sender, {"websocket": websocket}),
    )

PostgreSQL (trigger name) # on the table account
account

PostgreSQL (trigger_function)

BEGIN
  PERFORM pg_notify(
    'account',
    json_build_object(
      'operation', TG_OP,
      'record', row_to_json(NEW)
    )::text
  );

  RETURN NEW;
END;

When making the websocket connection connection and update the table to trigger a notification, nothing happends. Also not when using

NOTIFY account, 'hello world';

When changing to Redis it is working flawless, from Redis to websocket and websocket to Redis.

Is this a bug? Or I'm missing something in the code?

Asyncio_redis - "Maintainers needed!" - Update asyncio_redis library to redis-py and add SSL connection support

Currently, the asyncio_redis library is not receiving regular maintenance. This library is essential for handling asynchronous operations related to Redis. However, its lack of maintenance may lead to security issues and delays in resolving bugs and incompatibilities.

Additionally, the library does not natively support SSL connections, which is crucial for environments where security is a priority.

I suggest considering migrating the asyncio_redis library to redis-py, which is well-maintained and widely used by the community. Furthermore, adding support for SSL connections would be highly beneficial to increase the security and compatibility of the library with various environments.

Redis PubSub Not Close Subscribing

Hi
I Using broadcaster for websocket connection and set new unique id for websocket client when new websocket connection open and subscribe on this unique id
other application publish message to websocket client by this unique id.
my application work properly and this is fine.
but I have one issue .
connection close but subscribe of redis not close until applicatin down.

async def ws_sender(websocket):
    async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
        async for event in subscriber:
            await websocket.send_text(event.message)

context manager close but subscribe not close (unsubscribe)

Unsubscribing issue in Broadcast class

Hi,

I was working with your Broadcast class and noticed a bug related to websocket disconnection. In the subscribe method of the Broadcast class, when a user disconnects from the websocket, the code after yield Subscriber(queue) will never execute. This could potentially lead to increased memory usage if items are never removed from the _subscribers dictionary.

Thank you for your great code!

Import Error

when I run your example code, it raises an Import Error

tempsnip

Django Channels Layer compatibility

Django Channels Layers perform a similar task to broadcaster, but currently only works with redis.

Would be a nice addition if there was a built-in wrapper around broadcaster to be used as a drop-in replacement. Perhaps the dotted path would look something like broadcaster.django.KafkaChannelLayer?

This is definitely a task for > 1.0.0 release.

BROADCAST_URL of Kafka cluster not getting parsed -

Issue: BROADCAST_URL for Kafka cluster doesn't get parsed correctly and throws KafkaConnectionError

Sample URL:
BROADCAST_URL = 'kafka://server1:port,server2:port,server3:port'

Description:
aiokafka client is using collect_hosts method of kafka.conn and while
kafka.conn.collect_hosts method collects a comma-separated set of hosts (host:port)
https://github.com/dpkp/kafka-python/blob/master/kafka/conn.py#L1485

In broadcaster._backends.kafka we are preparing bootstrap server by passing list of entire netloc string as shown below:
https://github.com/encode/broadcaster/blob/master/broadcaster/_backends/kafka.py#L13
self._servers = [urlparse(url).netloc]
If we tweak above line with below statement then it will support kafka cluster connection string:
self._servers = urlparse(url).netloc.split(',')

Do let me know your thoughts on this.

Setup CI

So I've actually only used Travis CI in the past, but I'd be up for trying out GitHub Actions in this repo.

If anyone's got experiance here, and can provide a quick first pointer on how the GitHub actions file might need to look to start docker-compose, and run the test suite, then that'd be fab. Otherwise I'll try to take a dig sometime.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.