encode / broadcaster Goto Github PK
View Code? Open in Web Editor NEWBroadcast channels for async web apps. ๐ข
License: BSD 3-Clause "New" or "Revised" License
Broadcast channels for async web apps. ๐ข
License: BSD 3-Clause "New" or "Revised" License
Consider the following code from example, which works fine when running the file with command-line:
uvicorn main:app
The same code has issues, when running with with PyCharm.
The issue occurs at broadcaster/_backends/memory.py:30
, where is cannot get an event from asyncio.Queue.
I'm using python3.8.
from broadcaster import Broadcast
from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates
broadcast = Broadcast("memory:///")
templates = Jinja2Templates("templates")
async def homepage(request):
template = "index.html"
context = {"request": request}
return templates.TemplateResponse(template, context)
async def chatroom_ws(websocket):
await websocket.accept()
await run_until_first_complete(
(chatroom_ws_receiver, {"websocket": websocket}),
(chatroom_ws_sender, {"websocket": websocket}),
)
async def chatroom_ws_receiver(websocket):
async for message in websocket.iter_text():
await broadcast.publish(channel="chatroom", message=message)
async def chatroom_ws_sender(websocket):
async with broadcast.subscribe(channel="chatroom") as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
routes = [
Route("/", homepage),
WebSocketRoute("/", chatroom_ws, name='chatroom_ws'),
]
app = Starlette(
routes=routes, on_startup=[broadcast.connect], on_shutdown=[broadcast.disconnect],
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app)
Sorry for the newbishness, I'm having trouble understanding how to use the project. Main points I struggle with:
Depends(...)
injected to routes. They use fastapi.app.websocket
rather than starlette.routing.WebSocketRoute
. Wondering what the simple translation from Starlette->FastAPI is to allow Depends()
in the functions?WebSocketRoute("/", ...)
with app.websocket("/")
, where do I put my dependencies? Is it at the async def chatroom_ws(websocket, <add deps here>)
level?on_connect
, on_disconnect
?run_until_first_complete
? I can't find documentation in Starlette, and when I dive into asyncio.wait(..., FIRST_COMPLETED)
it seems to imply the first task to complete is solid, and what's left is cancelled? Which confuses me on the setup of receiver/sender, don't we want both?WebSocketEndpoint
with on_connect|disconnect|message
, keeping websockets around for later use, etc. Maybe a more robust broadcaster
sample in /examples
could be in order for us FastAPI newbies?Kafka accept n + 1 servers to bootstrap initial cluster metadata.
https://aiokafka.readthedocs.io/en/stable/api.html#aiokafkaconsumer-class
Similar to other encode projects. Feel free to think how the best organization would be.
Hello, and thank you for building this great module.
I would like to use it on heroku with the heroku-redis addon, but when I try to send a message with the websocket I have this error asyncio_redis.exceptions.ErrorReply: NOAUTH Authentication required.
and I think it's because I need to config the password but I don't know how to do it, maybe at initialization * broadcast = Broadcast (REDIS_URL, password = "xxx") *, but I don't know how to do it as there is no documentation on this.
Thank you in advance for your help
Google cloud pubsub is widely used, it would be useful to be able to use it as a backend.
Using broadcaster w/ fastapi and seeing an exception when using broadcaster via websockets.
Relevant parts:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
# ...
'cannot perform operation: another operation is in progress')
Full trace:
INFO: ('127.0.0.1', 52654) - "WebSocket /viewers" [accepted]
ERROR: Exception in ASGI application
Traceback (most recent call last):
File ".../site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 154, in run_asgi
result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
File ".../site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File ".../site-packages/fastapi/applications.py", line 179, in __call__
await super().__call__(scope, receive, send)
File ".../site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File ".../site-packages/starlette/middleware/errors.py", line 146, in __call__
await self.app(scope, receive, send)
File ".../site-packages/starlette/exceptions.py", line 58, in __call__
await self.app(scope, receive, send)
File ".../site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File ".../site-packages/starlette/routing.py", line 283, in handle
await self.app(scope, receive, send)
File ".../site-packages/starlette/routing.py", line 57, in app
await func(session)
File ".../site-packages/fastapi/routing.py", line 228, in app
await dependant.call(**values)
File "./main.py", line 198, in events_ws
(viewers_ws_sender, {"websocket": websocket}),
File ".../site-packages/starlette/concurrency.py", line 18, in run_until_first_complete
[task.result() for task in done]
File ".../site-packages/starlette/concurrency.py", line 18, in <listcomp>
[task.result() for task in done]
File "./main.py", line 204, in viewers_ws_receiver
await broadcast.publish(channel="viewers", message=message)
File ".../site-packages/broadcaster/_base.py", line 72, in publish
await self._backend.publish(channel, message)
File ".../site-packages/broadcaster/_backends/postgres.py", line 25, in publish
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
File ".../site-packages/asyncpg/connection.py", line 297, in execute
_, status, _ = await self._execute(query, args, 0, timeout, True)
File ".../site-packages/asyncpg/connection.py", line 1444, in _execute
with self._stmt_exclusive_section:
File ".../site-packages/asyncpg/connection.py", line 1891, in __enter__
'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
INFO: ('127.0.0.1', 52655) - "WebSocket /events" [accepted]
Doing this via code that looks something like this, where I think the lock is just slowing things down and exposing the contention.
@app.websocket("/viewers", name="viewers_ws")
async def events_ws(websocket: WebSocket):
await websocket.accept()
await run_until_first_complete(
(viewers_ws_receiver, {"websocket": websocket}),
(viewers_ws_sender, {"websocket": websocket}),
)
my_lock = threading.Lock()
async def viewers_ws_receiver(websocket: WebSocket):
async for message in websocket.iter_text():
await broadcast.publish(channel="viewers", message=message)
async def viewers_ws_sender(websocket: WebSocket):
async with broadcast.subscribe(channel="viewers") as subscriber:
async for event in subscriber:
counter = 0
with my_lock:
# do something with event.message
counter = ...
await websocket.send_json({"viewers": counter})
Update: Refactored down to a single websocket and not using a lock for anything and saw this exception again. ๐คท
Hi I'm developing an OSS using FastAPI and it's involved with WebSocket functionality, which is essentially a PUB/SUB model to broadcast some event to the connected clients. Following the recommendation on FastAPI documentation, I find this neat and elegant yet robust library that really helps me in developing my code base. I'm learning this code base and trying to make a contribution if possible.
Currently I pretty much get the general idea of the code base but this one thing:
Why do we handle the connect/disconnect of backend via aysnc context manager magic methods in class Broadcast
? In the example application in the README, the instance method Broadcast.subscribe
is the only place async with
statement is used. And instantiation of a class doesn't call __aenter__
or __aexit__
methdos.
The async/await
Python sometimes still makes me scratch my head and I hope someone can give me some guidance on this part of the code base. Thanks!
I review source code of backends related to connect redis backend
I can't connect to specific db in redis and this ignore in codebase of parse URI definition
in broadcaster/_backends/redis.py initialize RedisBackend by port and host , in this section must be define database number
redis://HOST[:PORT][?db=DATABASE[&password=PASSWORD]]
db is missing on URI define
Using the testing example from the Starlette docs with a route that uses a Redis websocket just hangs indefinitely:
def test_websocket(client):
with client.websocket_connect("/ws") as websocket:
websocket.send_text("Hello WebSocket")
data = websocket.receive_text()
assert data == "Hello WebSocket"
this however, works fine:
def test_websocket(client):
with client.websocket_connect("/ws") as websocket:
assert True
maybe it's related to this #2 (comment) and #42?
I think it's the same with a memory backend. Has anyone successfully tested a Starlette websocket route?
UPDATE: I tried running ./scripts/test
locally from the broadcaster directory and it actually hangs for me on the redis test. Anyone else getting that?
A single consumer cannot read from a pattern and a normal topic.
So at least 2 consumers must be created. This can be done internally, no problem here.
Now... how should the interface look?
async with broadcast.subscribe(channel="logs/*") as subscriber:
If we want to use the current channel
parameter, then a Consumer should be created each
time .subscribe
is called (if not mistaken).
async with broadcast.subscribe(pattern="logs/*") as subscriber:
In this case we can have 2 consumers only, but we'd have a pattern
keyword
which may not be supported by every backend.
Something that'd be really valuable would be to allow subscriptions to start up with some initial history.
For example:
Not all backends will support this, but Redis Streams, and Apache Kafka do. I think the implementation may be a bit fiddly to get right, but before we even get there let's discuss the API.
One option here might be...
async with broadcaster.subscribe("logs", history=1000):
...
async with broadcaster.subscribe("http-stats", history=timedelta(hours=24)):
...
Doesn't look like there's any way to create a Broadcast class providing my own BroadcastBackend, which is a shame because it's so easy to extend
I need the functionality of a feature that sends the redis url password to asyncio_redis version 0.2.0, but I cloned your repository and saw that it is already available. I have an app in production that really needs this. I'll end up having to open a fork of your app or use another lib if that doesn't work.
0.2.0:
async def connect(self) -> None:
self._pub_conn = await asyncio_redis.Connection.create(self._host, self._port)
self._sub_conn = await asyncio_redis.Connection.create(self._host, self._port)
self._subscriber = await self._sub_conn.start_subscribe()
current version repo:
async def connect(self) -> None:
kwargs = {"host": self._host, "port": self._port, "password": self._password}
self._pub_conn = await asyncio_redis.Connection.create(**kwargs)
self._sub_conn = await asyncio_redis.Connection.create(**kwargs)
self._subscriber = await self._sub_conn.start_subscribe()
Best I can tell the URL format is redis://username:password@host:port
source
I know a way to have a dynamic channel by having chatroom id in URL, /ws/{chat_id}
I want to send {'type': 'subscribe', 'chat_id': 'dsfjkhdskjfhdskjhfkjdsahfa'}
@app.websocket("/ws")
async def ws_manager(websocket: WebSocket):
await websocket.accept()
async for message in websocket.iter_json():
if message['type'] == 'subscribe':
chat_id = message['chat_id']
async with broadcast.subscribe(channel=chat_id) as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
how can i have multiple subscriber for same socket connection?
The idea here is to:
setup.cfg
to pyproject.toml
isort
and flake8
by ruff
.As an example, you can check this PR.
Ok
I'm trying to get broadcaster work with FastAPI and a PostgreSQL database with notifications enabled.
Here is the code I use
FastAPI (test_broadcaster.py)
from broadcaster import Broadcast
broadcast = Broadcast("postgres://<username>:<password>@localhost:5432/<database>")
async def broadcast_ws_receiver(websocket):
async for message in websocket.iter_text():
await broadcast.publish(channel="account", message=message)
async def broadcast_ws_sender(websocket):
async with broadcast.subscribe(channel="account") as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
FastAPI (router.py)
from fastapi.routing import APIRouter
from fastapi.websockets import WebSocket
from fastapi.concurrency import run_until_first_complete
from test_broadcaster import broadcast_ws_sender, broadcast_ws_receiver
router = APIRouter()
@router.websocket("/broadcast")
async def broadcast_ws(websocket: WebSocket):
await websocket.accept()
await run_until_first_complete(
(broadcast_ws_receiver, {"websocket": websocket}),
(broadcast_ws_sender, {"websocket": websocket}),
)
PostgreSQL (trigger name) # on the table account
account
PostgreSQL (trigger_function)
BEGIN
PERFORM pg_notify(
'account',
json_build_object(
'operation', TG_OP,
'record', row_to_json(NEW)
)::text
);
RETURN NEW;
END;
When making the websocket connection connection and update the table to trigger a notification, nothing happends. Also not when using
NOTIFY account, 'hello world';
When changing to Redis it is working flawless, from Redis to websocket and websocket to Redis.
Is this a bug? Or I'm missing something in the code?
The title says it all.
We now have tests suite courtesy of #11, but test for Kafka backend hands indefinitely on await subscriber.get()
, causing tests suite to timeout:
broadcaster/tests/test_broadcast.py
Line 43 in 435c35e
Sadly, I'm out of my depth here, and I'm unable to diagnose if this is problem with Kafka backend or test setup.
Help is welcome.
Currently, the asyncio_redis library is not receiving regular maintenance. This library is essential for handling asynchronous operations related to Redis. However, its lack of maintenance may lead to security issues and delays in resolving bugs and incompatibilities.
Additionally, the library does not natively support SSL connections, which is crucial for environments where security is a priority.
I suggest considering migrating the asyncio_redis library to redis-py, which is well-maintained and widely used by the community. Furthermore, adding support for SSL connections would be highly beneficial to increase the security and compatibility of the library with various environments.
Hi
I Using broadcaster for websocket connection and set new unique id for websocket client when new websocket connection open and subscribe on this unique id
other application publish message to websocket client by this unique id.
my application work properly and this is fine.
but I have one issue .
connection close but subscribe of redis not close until applicatin down.
async def ws_sender(websocket):
async with broadcast.subscribe(channel=websocket.channel_name) as subscriber:
async for event in subscriber:
await websocket.send_text(event.message)
context manager close but subscribe not close (unsubscribe)
The idea is to do the same as encode/starlette#2148.
Hi,
I was working with your Broadcast
class and noticed a bug related to websocket disconnection. In the subscribe
method of the Broadcast
class, when a user disconnects from the websocket, the code after yield Subscriber(queue)
will never execute. This could potentially lead to increased memory usage if items are never removed from the _subscribers
dictionary.
Thank you for your great code!
I am compiling a report on 3rd party library usage at my company and noticed that broadcaster doesn't have a LICENSE file.
asyncpg
doesn't support pgpool2
, whereas aiopg
seems to have enough support for using broadcaster.
I will try and find the time to create a PR
Django Channels Layers perform a similar task to broadcaster, but currently only works with redis
.
Would be a nice addition if there was a built-in wrapper around broadcaster to be used as a drop-in replacement. Perhaps the dotted path would look something like broadcaster.django.KafkaChannelLayer
?
This is definitely a task for > 1.0.0 release.
Issue: BROADCAST_URL for Kafka cluster doesn't get parsed correctly and throws KafkaConnectionError
Sample URL:
BROADCAST_URL = 'kafka://server1:port,server2:port,server3:port'
Description:
aiokafka client is using collect_hosts method of kafka.conn and while
kafka.conn.collect_hosts method collects a comma-separated set of hosts (host:port)
https://github.com/dpkp/kafka-python/blob/master/kafka/conn.py#L1485
In broadcaster._backends.kafka we are preparing bootstrap server by passing list of entire netloc string as shown below:
https://github.com/encode/broadcaster/blob/master/broadcaster/_backends/kafka.py#L13
self._servers = [urlparse(url).netloc]
If we tweak above line with below statement then it will support kafka cluster connection string:
self._servers = urlparse(url).netloc.split(',')
Do let me know your thoughts on this.
So I've actually only used Travis CI in the past, but I'd be up for trying out GitHub Actions in this repo.
If anyone's got experiance here, and can provide a quick first pointer on how the GitHub actions file might need to look to start docker-compose, and run the test suite, then that'd be fab. Otherwise I'll try to take a dig sometime.
python >= 3.7
if lower from contextlib import asynccontextmanager
cannot be imported.
Thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.