nilcoalescing / djangochannelsrestframework Goto Github PK
View Code? Open in Web Editor NEWA Rest-framework for websockets using Django channels-v4
Home Page: https://djangochannelsrestframework.readthedocs.io/en/latest/
License: MIT License
A Rest-framework for websockets using Django channels-v4
Home Page: https://djangochannelsrestframework.readthedocs.io/en/latest/
License: MIT License
It is in some cases possible for the server to know in advance what objects the client is likely to request next.
For Paginated responses, it would be nice to have the option to pre-emptivly send the next page down to the client after having returned the requested page.
Describe the bug
A clear and concise description of what the bug is.
To Reproduce
Steps to reproduce the behavior:
class Message(models.Model):
text = models.TextField()
user = models.ForeignKey(User, related_name="messages", on_delete=modelsCASCADE)
class MyConsumer(GenericAsyncAPIConsumer):
queryset = Message.objects.all()
serializer_class = MessageSerializer
chat_room_name : Optional[str]
@action()
async def join_chat(self, chat: int, **kwargs):
self.chat_room_name = f"chat_{chat}"
await self.channel_layer.group_add(
self.chat_room_name,
self.channel_name,
)
user_serializer = await database_sync_to_async(UserSerializer)(self.scope["user"], many=False)
await self.channel_layer.group_send(
self.chat_room_name,
{
"type": "notification",
"message": f"{self.scope['user'].username} joined the chat",
"user": user_serializer.data,
"chat": chat
}
)
async def notification(self, event):
content = dict(
action="notification",
message=event["message"],
chat=event["chat"],
user=event["user"],
status=status.HTTP_200_OK
)
await self.send_json(content)
When I send the action to the websocket, it should add to a group of users, and send to each one the notification, with the key user that is the user that executes the action. BUT when i want to create the serializer in the async context, it never ends.
LOG
# Test join chat.
await communicator.send_json_to({
"action": "join_chat",
"chat": chat.pk,
"request_id": now().timestamp(),
})
> response = await communicator.receive_json_from()
backend\tests\test_channels_rest_consumers.py:103:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
venv\lib\site-packages\channels\testing\websocket.py:93: in receive_json_from
payload = await self.receive_from(timeout)
venv\lib\site-packages\channels\testing\websocket.py:72: in receive_from
response = await self.receive_output(timeout)
venv\lib\site-packages\asgiref\testing.py:78: in receive_output
self.future.result()
venv\lib\site-packages\channels\routing.py:150: in __call__
return await application(
venv\lib\site-packages\channels\consumer.py:94: in app
return await consumer(scope, receive, send)
venv\lib\site-packages\channels\consumer.py:58: in __call__
await await_many_dispatch(
venv\lib\site-packages\channels\utils.py:51: in await_many_dispatch
await dispatch(result)
venv\lib\site-packages\channels\consumer.py:73: in dispatch
await handler(message)
venv\lib\site-packages\channels\generic\websocket.py:196: in websocket_receive
await self.receive(text_data=message["text"])
venv\lib\site-packages\channels\generic\websocket.py:259: in receive
await self.receive_json(await self.decode_json(text_data), **kwargs)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:160: in receive_json
await self.handle_action(action, request_id=request_id, **content)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:151: in handle_action
await self.handle_exception(exc, action=action, request_id=request_id)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:117: in handle_exception
raise exc
venv\lib\site-packages\djangochannelsrestframework\consumers.py:144: in handle_action
response = await method(request_id=request_id, action=action, **kwargs)
backend\consumers.py:52: in join_chat
"user": user_serializer.data,
venv\lib\site-packages\rest_framework\serializers.py:548: in data
ret = super().data
venv\lib\site-packages\rest_framework\serializers.py:246: in data
self._data = self.to_representation(self.instance)
venv\lib\site-packages\rest_framework\serializers.py:515: in to_representation
ret[field.field_name] = field.to_representation(attribute)
venv\lib\site-packages\rest_framework\serializers.py:663: in to_representation
return [
venv\lib\site-packages\django\db\models\query.py:280: in __iter__
self._fetch_all()
venv\lib\site-packages\django\db\models\query.py:1324: in _fetch_all
self._result_cache = list(self._iterable_class(self))
venv\lib\site-packages\django\db\models\query.py:51: in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
venv\lib\site-packages\django\db\models\sql\compiler.py:1167: in execute_sql
cursor = self.connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (<django.db.backends.sqlite3.base.DatabaseWrapper object at 0x0000011228A21400>,), kwargs = {}, event_loop = <_WindowsSelectorEventLoop running=False closed=False debug=False>
@functools.wraps(func)
def inner(*args, **kwargs):
if not os.environ.get('DJANGO_ALLOW_ASYNC_UNSAFE'):
# Detect a running event loop in this thread.
try:
event_loop = asyncio.get_event_loop()
except RuntimeError:
pass
else:
if event_loop.is_running():
> raise SynchronousOnlyOperation(message)
E django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
venv\lib\site-packages\django\utils\asyncio.py:24: SynchronousOnlyOperation
Describe the bug
djangochannelsrestframework's (DCRF) action
decorator looks in Django's settings for ATOMIC_REQUESTS
. Since at least Django 1.8, ATOMIC_REQUESTS
has been a setting specific to a connection defined in the DATABASE
setting. When ATOMIC_REQUESTS
is setup as described in Django documentation, DCRF's action
decorator does not wrap sync actions in an atomic
block, or throw errors on async actions that cannot be wrapped.
See:
To Reproduce
Steps to reproduce the behavior:
# DATABASES
# ------------------------------------------------------------------------------
# https://docs.djangoproject.com/en/dev/ref/settings/#databases
DATABASES = {"default": env.db("DATABASE_URL", default="postgres:///project")}
DATABASES["default"]["ATOMIC_REQUESTS"] = True
@action
async def change_password(self, current_password="", new_password="", confirm_password="", **kwargs):
...
Expected behavior
The custom async action, wrapped using @action
should throw the exception from this line:
https://github.com/hishnash/djangochannelsrestframework/blob/master/djangochannelsrestframework/decorators.py#L54
DCRF's action
decorator should take a using
kwarg, similar to atomic
's using
kwarg, which is used to check if a specific db connection wants to be wrapped in atomic
blocks. Barring support for the using
kwarg, the default connection should at least be checked for ATOMIC_REQUESTS
.
Hi,
I am actively using your project in mine, and I would like to help improving it. So far, I am not really able to contribute, as I don't understand all the things. I just wanted to report you a problem I have, and suggest a (crude) solution.
My Django app is running on Heroku with the minimum of resources (free when possible). I am using the free plan of RedisCloud with 30 connections, but still, it seems I am reaching the max.
Traceback (most recent call last):
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/celery/app/trace.py", line 382, in trace_task
Mar 06 22:41:40 R = retval = fun(*args, **kwargs)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
Mar 06 22:41:40 return self.run(*args, **kwargs)
Mar 06 22:41:40 File "/app/arcsecond/activities/tasks.py", line 52, in parse_ESO_archive_latest_rows
Mar 06 22:41:40 target_name=row.target_name)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/manager.py", line 82, in manager_method
Mar 06 22:41:40 return getattr(self.get_queryset(), name)(*args, **kwargs)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/query.py", line 413, in create
Mar 06 22:41:40 obj.save(force_insert=True, using=self.db)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/base.py", line 718, in save
Mar 06 22:41:40 force_update=force_update, update_fields=update_fields)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/base.py", line 758, in save_base
Mar 06 22:41:40 update_fields=update_fields, raw=raw, using=using,
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/dispatch/dispatcher.py", line 175, in send
Mar 06 22:41:40 for receiver in self._live_receivers(sender)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/django/dispatch/dispatcher.py", line 175, in <listcomp>
Mar 06 22:41:40 for receiver in self._live_receivers(sender)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 154, in post_save_receiver
Mar 06 22:41:40 **kwargs
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 221, in post_change_receiver
Mar 06 22:41:40 **kwargs
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 233, in send_messages
Mar 06 22:41:40 async_to_sync(channel_layer.group_send)(group_name, message)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/asgiref/sync.py", line 64, in __call__
Mar 06 22:41:40 return call_result.result()
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/concurrent/futures/_base.py", line 425, in result
Mar 06 22:41:40 return self.__get_result()
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
Mar 06 22:41:40 raise self._exception
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/asgiref/sync.py", line 78, in main_wrap
Mar 06 22:41:40 result = await self.awaitable(*args, **kwargs)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 559, in group_send
Mar 06 22:41:40 async with self.connection(self.consistent_hash(group)) as connection:
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 742, in __aenter__
Mar 06 22:41:40 self.conn = await self.pool.pop()
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 49, in pop
Mar 06 22:41:40 conns.append(await aioredis.create_redis(**self.host, loop=loop))
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 178, in create_redis
Mar 06 22:41:40 loop=loop)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/connection.py", line 129, in create_connection
Mar 06 22:41:40 await conn.auth(password)
Mar 06 22:41:40 File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/util.py", line 48, in wait_ok
Mar 06 22:41:40 res = await fut
Mar 06 22:41:40aioredis.errors.MaxClientsError: ERR max number of clients reached
When going right into the problematic line (line 233 of observer/observer.py
), I see a loop over group_names
. My very crude proposition would be to extract async_to_sync(channel_layer.group_send)
and reuse it for all group_names
. Would it save connections ?
Thanks again.
Django supports multiple DB it was raised in #64 that it would be nice to be able to define the DB that is used for the @action
decorator. See https://docs.djangoproject.com/en/3.2/topics/db/multi-db/#topics-db-multi-db-routing
Describe the bug
TypeError: has_permission() got an unexpected keyword argument 'scope'
when use permission_classes = [IsAuthenticated]
To Reproduce
Steps to reproduce the behaviour:
permission_classes = [IsAuthenticated]
Exception inside application: has_permission() got an unexpected keyword argument 'scope'
Traceback (most recent call last):
File "/home/rufs/Bugs/eclass-api/utils/channel_token.py", line 38, in __call__
return await inner(receive, send)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/sessions.py", line 183, in __call__
return await self.inner(receive, self.send)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/middleware.py", line 41, in coroutine_call
await inner_instance(receive, send)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
await await_many_dispatch(
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
await handler(message)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/generic/websocket.py", line 196, in websocket_receive
await self.receive(text_data=message["text"])
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/generic/websocket.py", line 259, in receive
await self.receive_json(await self.decode_json(text_data), **kwargs)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 164, in receive_json
await self.handle_action(action, request_id=request_id, **content)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 155, in handle_action
await self.handle_exception(exc, action=action, request_id=request_id)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 117, in handle_exception
raise exc
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 136, in handle_action
await self.check_permissions(action, **kwargs)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 93, in check_permissions
if not await ensure_async(permission.has_permission)(
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/asgiref/sync.py", line 296, in __call__
ret = await asyncio.wait_for(future, timeout=None)
File "/home/rufs/.pyenv/versions/3.8.1/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
return await fut
File "/home/rufs/.pyenv/versions/3.8.1/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/db.py", line 14, in thread_handler
return super().thread_handler(loop, *args, **kwargs)
File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/asgiref/sync.py", line 334, in thread_handler
return func(*args, **kwargs)
TypeError: has_permission() got an unexpected keyword argument 'scope'
Expected behaviour
It should raise exception user is authenticated or not.
LOG
If applicable, add log to help explain your problem.
class IsAuthenticated(BasePermission):
async def has_permission(
self, scope: Dict[str, Any], consumer: AsyncConsumer, action: str, **kwargs
) -> bool:
user = scope.get("user")
if not user:
return False
return user.pk and user.is_authenticated
when comment all params it leads to
TypeError: has_permission() missing 2 required positional arguments: 'request' and 'view'
Could you complement the examples in the readme with the appropriate client-side code and websocket send/recieve messages?
For example, I don't fully understand how to work with custom actions or subscribing to a filtered list of models on the client side.
Currently if you have an open transaction and update/create/delete a record the subscribing channels will be notified before you commit
your transaction.
We should instead buffer these notifications and only submit
them after the transaction completes.
--
This is also a bug since users do not expect to receive notifications for events that did not happen (eg if the transaction is rolled back)
Hi, Hishnash.
I believe your project would be very much promising for the beginners like me if you had provided a little bit more detailed documentation.
Actually I'm trying to create a chat application with Django Channels which is very confusing. But yours seems would ease the burdain if there is a detail documentation.
N.B: I'm doing my Final Year project with Django Rest Framework which is why may be your short Documentation is very confusing for me to apply in my project.
Please, help me with any resources to implement your project in my solution.
Describe the bug
In a project of mine, I discovered that after subscribing to multiple instances of a model, all updates being sent by the backend for any of the subscribed instances contain the latest request_id
that was sent and not the request_id
that they were initially sent with.
Therefore, e.g. when using the dcrf-client in the frontend, the frontend callbacks are not triggered, as the request_id
of the update does not match the request_id
that was initially sent with the subscription.
To Reproduce
Steps to reproduce the behavior:
I subscribed to two instances of an example Model Test
with the following requests:
request: {"stream":"test","payload":{"action":"subscribe_instance","request_id":"beabe327-3e51-4538-b5c8-ab32ae2c7cd8","pk":1}}
response: {"stream": "test", "payload": {"errors": [], "data": null, "action": "subscribe_instance", "response_status": 201, "request_id": "beabe327-3e51-4538-b5c8-ab32ae2c7cd8"}}
request: {"stream":"test","payload":{"action":"subscribe_instance","request_id":"d6a4a829-ab00-492c-9bdb-01ef77dbab15","pk":2}}
response: {"stream": "test", "payload": {"errors": [], "data": null, "action": "subscribe_instance", "response_status": 201, "request_id": "d6a4a829-ab00-492c-9bdb-01ef77dbab15"}}
One can see that the request_id
of the direct answer is correct.
When updating the Test
instance with the pk = 1
, I'd expect the response to contain the request_id = 'beabe327-3e51-4538-b5c8-ab32ae2c7cd8'
, but instead it returns
{"stream": "test", "payload": {"errors": [], "data": {"id": 1, "name": "Test", "description": "This is a test"}, "action": "update", "response_status": 200, "request_id": "d6a4a829-ab00-492c-9bdb-01ef77dbab15"}}
The error
I dug a little into the corresponding code and added a print
statement to the method subscribe_instance
of class ObserverModelInstanceMixin
in obersver/generics.py
:
print(self.subscribed_requests)
right after the
self.subscribed_requests[self.__class__.handle_instance_change] = request_id
I could then see on the console the reason for the unexpected behaviour:
{<djangochannelsrestframework.observer.model_observer.ModelObserver object at 0x7f85801b86d0>: 'beabe327-3e51-4538-b5c8-ab32ae2c7cd8'}
{<djangochannelsrestframework.observer.model_observer.ModelObserver object at 0x7f85801b86d0>: 'd6a4a829-ab00-492c-9bdb-01ef77dbab15'}
The subscribed_requests
only contains the latest request_id
, though in the second print
it should contain two entries. As one can see, the keys are identical, however, which is why the request_id
is being replaced.
I'm not sure about this, but it seems that it's a mistake to set the ModelObserver
object as the key in subscribed_requests
and it's actually wanted to use the result of the other method with the same name
@handle_instance_change.groups
def handle_instance_change(self: ModelObserver, instance, *args, **kwargs):
# one channel for all updates.
yield "{}-model-{}-pk-{}".format(
self.func.__name__.replace("_", "."), self.model_label, instance.pk
)
which returns a string that would be unique for the model instance, but instead the method
@_GenericModelObserver
async def handle_instance_change(self, message: Dict, **kwargs):
message = deepcopy(message)
action = message.pop("action")
message.pop("type")
await self.handle_observed_action(
action=action,
request_id=self.subscribed_requests.get(
self.__class__.handle_instance_change
),
**message,
)
is being used.
As I'm not so good at async python rn, I don't know exactly how to fix this, but I think that by addressing this issue the correct request_id
s would be stored.
LOG
If applicable, add log to help explain your problem.
0.2.0
Please explain the uses for view_as_consumer.
There's no documentation to explain why one would use this, and the advantages gained from doing so.
Currently, I create a url pattern that uses the view_as_consumer. I import a DRF Model Viewset from .views
from djangochannelsrestframework.consumers import view_as_consumer
from .multiplexing import OpsDemultiplexer
from .views import ProfileViewSet
websocket_urlpatterns = [
path('ws/multi/', OpsDemultiplexer),
url(r"testing123/", view_as_consumer(ProfileViewSet)),
]
Then I import the url_pattern to the router.
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
common.routing.websocket_urlpatterns + testing.routing.websocket_urlpatterns
)
),
})
I'm not sure of what to expect at this point. How do I access the view? What are the perks of connecting to the view via websockets? Is it more efficient? Am I even doing this right?
I make use of the multiplexing, connecting multiple consumers via 1 path. I would like to incorporate the view_as_consumer if it will somehow make this process more efficient.
As of now, I use my DRF views for my HTTP requests. Does this wrapper replace that at all? Can one at least do GET requests? The only apparent feature I can think of is being able to have temporary views that are only accessible for the duration of the socket connection.
Using ObserverModelInstanceMixin locally, everything works perfectly, when it is deployed via gunicorn/daphne/nginx, messages are not received.
I wrote a post on stackoverflow about it.
It has been solved, routing problem with nginx when using daphne + gunicorn thanks the same.
Thank you
Describe the bug
I have the following consumer:
class Consumer(AsyncAPIConsumer):
permission_classes = (permissions.IsAuthenticated, )
@model_observer(SomeModel)
async def model_changed(self, message, observer=None, **kwargs):
raise NotImplementedError()
@action()
async def subscribe_all(self, **kwargs):
await self.model_changed.subscribe()
return {}, 200
I'm sending the following message
{
"stream": "stream",
"payload": {
"action": "subscribe_all",
"request_id": 42
}
}
I can see that the subscribe_all() method is called and a 200 status is sent back to the client. Then when I create a SomeModel
instance or change one nothing happens. No method is called and the NotImplementedError
in model_changed
is never raised.
How is observing all models supposed to work? What am I missing here?
Thank you!
Hello there! Just wanted to mention an npm package I published to provide a websocket client to django-channels-rest-framework: https://github.com/theY4Kman/dcrf-client
It's Promise-based, and supports subscriptions โ which can be canceled. The test/integration/dcrf_client_test directory contains a Django Channels v2 project as an example for how to setup the Django side of things (with the important thing being the use of pk
to return a model's ID from a serializer).
If you wanna check it out, it's available on npm
npm install dcrf-client
Hello! Thank you for the work you did creating djangochannelsrestframework!
I want to know what are your plans for further development? I studied the code, and I have a few questions regarding Observer's.
Why not implement CreateModelMixin
, ListObserverModel
, RetrieveObserverModel
, UpdateModelMixin
, PatchModelMixin
, DeleteModelMixin
, which would provide more obvious functionality?
Based on what I wrote in point 1, this logic can be divided, and by inheriting mixins, the user can get everything he needs.
I will gladly take part in the development of the project, if you allow it. However, I donโt have as much experience as you do.
Where should I start?
Can this Library be integrated with React Js or one needs to use drf-client only???
In ModelObserver.send_message message object passed as mutable. So if you chage it in self.func for one group, other calls of self.func (for other groups) will recieve modified message.
If you don't want to send "type" to ws and call message.pop("type", None) in self.func other groups will not recieve this message.
I think message should be immutable or self.func should recieve copy of message as parametr.
Describe the bug
I have one simple Consumer + Multiplexer as follows:
class TaskStatusConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = TaskStatus.objects.all()
serializer_class = TaskStatusSerializer
permission_classes = (permissions.AllowAny,)
class StatusDemultiplexerAsyncJson(AsyncJsonWebsocketDemultiplexer):
applications = {
'task-status': TaskStatusConsumer
}
I subscribe to events using dcrf-client
on the JavaScript side of things. I see that the WebSocket connection successfully subscribes to a newly created model instance using subscribe_instance
.
I have a view similar to this:
class TaskStatusViewSet(viewsets.ModelViewSet):
queryset = TaskStatus.objects.all()
serializer_class = serializers.TaskStatusSerializer
@action(methods=['post'], detail=True)
def some_action(self, request, pk=None):
task = TaskStatus.objects.get(pk=pk)
task.some_attr = 'some-value'
task.save()
When sending a request to that view I do see an update message for the task.save()
call being sent down on the WebSocket.
Yet any subsequent requests are not being being sent over the WebSocket connection. Somehow the model observer "forgets" about the model instance after the initial request is made.
Am I using the Observer wrong?
GenericModelObserver
is instantiated as a decorator of methods on ObserverModelInstanceMixin
this means that once you have implemented one consumer with this mixin all others will use the same ModelCLS regardless of the queryset's model type.
This is a critical bug.
Maybe not is a bug. I am testing this library, but have problems.
This exception appear when try since my websocket client send json: {'action': 'list', 'request_id': 18}
To Reproduce
My model is very simple. I am testing:
class Stock(models.Model):
id = models.AutoField(primary_key=True)
name = models.CharField(max_length=128)
price = models.FloatField()
created = models.DateTimeField(auto_now_add=True)
update = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = "stock"
verbose_name_plural = "stocks"
ordering = ['-id', ]
def __str__(self):
return self.name
And this is the restconsumer:
from djangochannelsrestframework.mixins import ListModelMixin
class StockConsumer(ListModelMixin, GenericAsyncAPIConsumer):
queryset = Stock.objects.all()
serializer_class = StockSerializer
permission_classes = (permissions.IsAuthenticated,)
And serializer is a standart ModelSerializer.
My websocket client (from javascript) do this:
<script>
let socket = new WebSocket('ws://localhost:8000/ws/stock/');
let timer = null;
socket.onopen = () => {
timer = setInterval(() => {
request = {
"action": "list",
"request_id": 18
}
socket.send(JSON.stringify(request));
}, 1000);
};
socket.onclose = socket.onerror = () => {
clearInterval(timer);
};
socket.onmessage = function(event) {
var pMsg = document.getElementById("id123");
if(pMsg == null)
{
let pMsg = document.createElement("p")
pMsg.id = "id123"
document.body.appendChild(pMsg)
}
pMsg.innerHTML = event.data
}
</script>
LOG
127.0.0.1:55509 - - [01/Feb/2021:21:05:00] "WSCONNECT /ws/stock/" - -
2021-02-01 21:05:01,958 ERROR Exception inside application: has_permission() got an unexpected keyword argument 'scope'
Traceback (most recent call last):
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\routing.py", line 71, in __call__
return await application(scope, receive, send)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\sessions.py", line 47, in __call__
return await self.inner(dict(scope, cookies=cookies), receive, send)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\sessions.py", line 254, in __call__
return await self.inner(wrapper.scope, receive, wrapper.send)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\auth.py", line 181, in __call__
return await super().__call__(scope, receive, send)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\middleware.py", line 26, in __call__
return await self.inner(scope, receive, send)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\routing.py", line 150, in __call__
return await application(
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\consumer.py", line 58, in __call__
await await_many_dispatch(
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\consumer.py", line 73, in dispatch
await handler(message)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\generic\websocket.py", line 196, in websocket_receive
await self.receive(text_data=message["text"])
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\generic\websocket.py", line 259, in receive
await self.receive_json(await self.decode_json(text_data), **kwargs)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 198, in receive_json
await self.handle_action(action, request_id=request_id, **content)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 217, in handle_action
await self.handle_exception(exc, action=action, request_id=request_id)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 117, in handle_exception
raise exc
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 205, in handle_action
await self.check_permissions(action, **kwargs)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 93, in check_permissions
if not await ensure_async(permission.has_permission)(
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\asgiref\sync.py", line 296, in __call__
ret = await asyncio.wait_for(future, timeout=None)
File "c:\programdata\miniconda3\envs\django_test\lib\asyncio\tasks.py", line 455, in wait_for
return await fut
File "c:\programdata\miniconda3\envs\django_test\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\db.py", line 13, in thread_handler
return super().thread_handler(loop, *args, **kwargs)
File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\asgiref\sync.py", line 334, in thread_handler
return func(*args, **kwargs)
TypeError: has_permission() got an unexpected keyword argument 'scope'
127.0.0.1:55509 - - [01/Feb/2021:21:05:01] "WSDISCONNECT /ws/stock/" - -
Packages related:
django-rest-auth 0.9.5
djangochannelsrestframework 0.2.1
djangorestframework 3.12.2
djangorestframework-gis 0.15
Is your feature request related to a problem? Please describe.
Currently this lib uses Django Signals to detect changes however Django does not send these events if you do bulk updates or raw SQL operations and if you have other clients connecting to the database making changes you also do not get any events.
Describe the solution you'd like
PostgreSQL support the concept of NOTIFY and LISTEN, since commit hocks can send NOTIFY messages it would be possible to have PG automatically inform all LISTENs of db changes regardless of were the changes came from.
I would like to investigate a little more how NOTIFY and LISTEN work in PG and how easy/hard it would be to add something to Django's DB manager that automatically adds migrations for ObservableModel
classes to trigger the NOTIFY on commit.
As a first pass it could be interesting to implement a PG packed change channels layer that uses the LISTEN and NOTIFY api this could be an interesting option for people who do not want to deploy an additional redis.
Multiple connections not working on Observer
We are working on a Django project, we are trying to implement channels connections but when we try to observe our Model instance change on multiple connections we end up viewing the change only on one of them, reporting on the other "DISCONNECTED" and ValueError("Incoming message has no 'type' attribute")
.
This is our Consumer:
class BoardObserverConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = Board.objects.all()
serializer_class = BoardChannelSerializer
and this is the Serializer:
class BoardChannelSerializer(HyperChannelsApiModelSerializer):
class Meta:
model = Board
fields = ('@id', 'name', 'players_count')
The version we are currently using is 0.0.3 as it is required by HyperMediaChannels.
To Reproduce
We connected various browsers with "in incognito" mode to the same localhost.
Expected behavior
We expect to be able to see the changes from all connections indistinguishably.
LOG
Exception inside application: Incoming message has no 'type' attribute
Traceback (most recent call last):
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
return await self.inner(receive, self.send)
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
await inner_instance(receive, send)
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channelsmultiplexer/demultiplexer.py", line 54, in __call__
await future
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
[receive, self.channel_receive], self.dispatch
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 71, in dispatch
handler = getattr(self, get_handler_name(message), None)
File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 19, in get_handler_name
raise ValueError("Incoming message has no 'type' attribute")
ValueError: Incoming message has no 'type' attribute
WebSocket DISCONNECT /ws/channel/ [127.0.0.1:59317]
WebSocket HANDSHAKING /ws/channel/ [127.0.0.1:59354]
WebSocket CONNECT /ws/channel/ [127.0.0.1:59354]
There realy should be some docs covering at least
GenericAsyncAPIConsumer
GenericAsyncAPIConsumer
AsyncAPIConsumer
Permissions
and how they play with ChannelsObserver
s using them to listen to events in djanog (eg user-login event)ObserverModelInstanceMixin
Describe the bug
Model subscribe not handle changes in model on creating a new model object from views or shell it only works from admin panel
Consumer code :
class LiveNotificationConsumer(ListModelMixin, GenericAsyncAPIConsumer):
serializer_class = NotificationSerializer
permission_classes = (permissions.IsAuthenticated,)
async def accept(self, **kwargs):
await super().accept(** kwargs)
await self.model_change.subscribe()
@model_observer(Notification)
async def model_change(self, message, action=None, **kwargs):
if message['user'] == self.scope['user'].id:
await self.send_json(message)
@model_change.serializer
def model_serialize(self, instance, action, **kwargs):
return NotificationSerializer(instance).data
def get_queryset(self, *args, **kwargs):
user = self.scope['user']
return user.notifications.all()
Client code :
var endpoint = "ws://127.0.0.1:8000/ws/notifications/"
var ws = new WebSocket(endpoint + "?token=" + token)
ws.onmessage = function(e){
console.log('comming data -->', e.data)
}
var msg = JSON.stringify({
action: "list",
request_id: 6,
})
ws.onopen = () => ws.send(msg);
ws.onclose = function (e) {
console.log(e)
console.error('Error websocket');
};
Describe the bug
I'm not sure if this is an issue with channels, djangochannelsrestframework, or drf.
I'm currently using the @model_observer to watch for model changes via DRF.
This works as intended on all create/update/delete operations. I'm notified of any changes and immediately perform an object extraction.
I figured I'd make it simple and just extract all objects and send it back to the client(s). The clients immediately update the state once the new object is received.
The problem is that the object extracted in the consumer doesn't include the latest update. So I'm sending the unmodified object back to the client(s).
Either I'm incorrectly extracting the model object or there's an async problem, where my object extraction happens to soon.
UPDATE:
Tried extracting the model object with a SyncConsumer, and the result is still the same. It's definitely not an async issue. Perhaps it's because of the DRF serializer? Does that somehow interrupt the ORM processing?
To Reproduce
Steps to reproduce the behavior:
Expected behavior
New Model results are the same as the old model results, when it should reflect the recent model changes.
I would simply like to be notified of all model changes (Which are made via DRF), so I can instantly send the client(s) back the updated model objects.
LOG
django==2.2.8
djangorestframework==3.10.3
channels==2.4.0
psycopg2==2.8.3 --no-binary psycopg2
Additional context
CODE:
class UserFormConsumer(AsyncAPIConsumer):
@database_sync_to_async
def get_userform_object(self, curr_id):
print('REACHED UserFormConsumer GET USERFORM OBJECT')
user_form_serializer = serializers.UserFormSerializer(models.UserForm.objects.all(), many=True)
return user_form_serializer.data
async def accept(self, subprotocol=None):
# subscribe to all new changed events
obj_id = ""
userform_data = await self.get_userform_object(obj_id)
print('OLD USERFORM DATA', userform_data)
await UserFormConsumer.activities_change.subscribe(self)
await super().accept()
@model_observer(models.UserForm) # you Django Model cls goes here.
async def activities_change(self, message, observer=None, **kwargs):
# check the message for the action type if you only want to send events for new ones.
print(observer)
print('TestingActivitiesChange', message)
await self.send_json(message)
async def receive_json(self, content):
if 'user_id' in content:
user_id = content['user_id']
# Get User Group
current_group_name = 'user_%s' % user_id
print('USER ID', user_id)
if content['server_command'] == 'get_all':
obj_id = ""
message = content['message']
userform_data = await self.get_userform_object(obj_id)
print('Get ALL Message', message)
print('User Form Data', userform_data)
Describe the bug
from djangochannelsrestframework.mixins import ObserverModelInstanceMixin
ObserverModelInstanceMixin is not found. Have you removed it from the package?
Hello I have this quastion anout chango channels testing do u have answer for it pleases?
setUp
the data are not reflected in the project!!!APITestCase
class
class WebsocketTests(APITestCase):
def setUp(self):
user1 = User.objects.create(username='Alex',email='[email protected]')
user2 = User.objects.create(username='Sam',email='[email protected]')
user3 = User.objects.create(username='Clover',email='[email protected]')
async def test_connect(self):
communicator = WebsocketCommunicator(application, 'alerts/')
connected, subprotocol = await communicator.connect()
assert connected
await communicator.disconnect()
@database_sync_to_async
def get_user(querys):
print(User.objects.count()) #<======= this returns 0 while it should return 3
try:
token = parse_qs(querys.decode("utf8"))['token'][0]
token_data = UntypedToken(token)
user_id = token_data["user_id"]
except:
return 'token is invalid'
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
return 'AnonymousUser'
class QueryAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
scope['user'] = await get_user(scope["query_string"])
return await self.app(scope, receive, send)
application = ProtocolTypeRouter({
"http": django_asgi_app,
"websocket": QueryAuthMiddleware(
URLRouter(
websocket_urlpatterns
)
),
})
#settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
'OPTIONS': {
'timeout': 20, # in seconds
# see also
# https://docs.python.org/3.7/library/sqlite3.html#sqlite3.connect
},
'TEST': {
'NAME': os.path.join(BASE_DIR, "db_test.sqlite3"),
},
}
}
APITestCase
with django TestCase
from asgiref.sync import sync_to_async
Sorry for writing here, but I can't find answer in docs. Client part of my app expects backend to create new model instance. If I use subscribe_instance
, I get 404 response right after subscription, because the instance is not exist yet. The client have to watch for an instance with exact pk... I'm new in channels, maybe this task is relatively simple.
Is your feature request related to a problem? Please describe.
I'm feel a bit frustrated when I need to implement model observer subscribe. I want the client send this:
{
"action": "sub",
"request_id": 42,
"model": "User"
}
instead of
async def websocket_connect(self, message):
# Super Save
await super().websocket_connect(message)
# Initialized operation
await self.activities_change.subscribe()
Describe the solution you'd like
Um, same as above?
Describe alternatives you've considered
I see https://github.com/theY4Kman/dcrf-client but I want a more simple client from web browser
Additional context
Please guide me, I will implement and make pr. Currently, I had a dirty hack by manually add to available_actions
, but don't know how to call subscribe()
for model update
Describe the bug
I'm trying to subscribe for model changes with no luck. I'm following exactly the guide on https://djangochannelsrestframework.readthedocs.io/en/latest/observer/base_observer.html.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Additional context
I have following consumer:
# consumers.py
from djangochannelsrestframework.generics import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer import model_observer
from djangochannelsrestframework.decorators import action
from .serializers import UserSerializer, CommentSerializer
from .models import User, Comment
class MyConsumer(GenericAsyncAPIConsumer):
queryset = User.objects.all()
serializer_class = UserSerializer
@model_observer(Comment)
async def comment_activity(self, message: CommentSerializer, observer=None, **kwargs):
print("Model observer 1")
await self.send_json(message.data)
print("Model observer 1")
@comment_activity.serializer
def comment_activity(self, instance: Comment, action, **kwargs) -> CommentSerializer:
print("Serializer 1")
'''This will return the comment serializer'''
return CommentSerializer(instance)
print("Serializer 2")
@action()
async def subscribe_to_comment_activity(self, **kwargs):
print("Subscribe 1")
await self.comment_activity.subscribe()
print("Subscribe 2")
And I have a template index.html:
<script>
const ws = new WebSocket("ws://localhost:8000/ws/my-consumer/")
ws.onopen = function(){
ws.send(JSON.stringify({
action: "subscribe_to_comment_activity",
request_id: new Date().getTime(),
}))
}
ws.onmessage = function(e){
console.log(e)
}
</script>
When I refresh the browser, I can see that the browser sends the subscribe_to_comment_activity, but nothing is returned back, even when I create the comments from shell.
The server log prints following:
System check identified no issues (0 silenced).
June 06, 2021 - 10:41:31
Django version 3.2.3, using settings 'websocket.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
HTTP GET / 200 [0.01, 127.0.0.1:64881]
WebSocket HANDSHAKING /ws/my-consumer/ [127.0.0.1:64882]
WebSocket CONNECT /ws/my-consumer/ [127.0.0.1:64882]
Subscribe 1
Subscribe 2
The above would indicate that it probably did not reach the model observer or the serializer.
I'm wondering if the imported GenericAsyncAPIConsumer is correct, as it indeed is different from the documentation, as it could not be found from djangochannelsrestframework.consumers. Any ideas?
How can i use in my application ,such as setting ,use cache or redis ,and how can i run in production environment ?
First of all thanks for this great library. It made a lot of stuff I have to handle much easier. But now I'm confronted with a hard-to-solve problem. I have a many-to-many relationship and want to subscribe to all the children. Just like the documentation (https://github.com/hishnash/djangochannelsrestframework#subscribing-to-a-filtered-list-of-models) shows. But I can't think of a way to implement the groups_for_signal
without database calls.
Let's say my models look like this:
from django.db import models
class Publication(models.Model):
title = models.CharField(max_length=30)
class Meta:
ordering = ['title']
def __str__(self):
return self.title
class Article(models.Model):
headline = models.CharField(max_length=100)
publications = models.ManyToManyField(Publication)
class Meta:
ordering = ['headline']
def __str__(self):
return self.headline
(https://docs.djangoproject.com/en/3.2/topics/db/examples/many_to_many/)
To get the groups_for_signal
I would need to do something like this:
for publication in instance.publications.all():
yield f"-publication__{publication.pk}"
But as stated in the documentation this doesn't seem to be a good idea. Is there a better way that I forgot?
Here is an example from the filtering model observer:
class MyConsumer(GenericAsyncAPIConsumer):
...
@action()
async def subscribe_to_comment_activity(self, user_pk, **kwargs):
# We will check if the user is authenticated for subscribing.
user = await database_sync_to_async(User.objects.get)(pk=user_pk)
await self.comment_activity.subscribe(user=user)
I want to do some filtering, but that is only possible for my case if I have the instance at hand to filter against.
That means as I see it, I need the user=user value in the observer serializer method.
@comment_activity.serializer
def comment_activity(self, instance: Comment, action, **kwargs) -> CommentSerializer:
'''This will return the comment serializer'''
return CommentSerializer(instance)
And here comes my question then, how can I get the user value in the model serializer method?
tl;dr is it possible to subscribe to objects with another field than the private key?
I'm trying to connect an existing database model to the asynchronous websockets.
I tried the pkField setting in connect, like this:
client = dcrf.connect(`${ws}:${window.location.host}${path}`, {
pkField: "uuid",
});
But when I try to subscribe to an object I can still only refer to it by the primary key in the table, not uuid.
I would like to use UUID instead of private key in my subscriptions, as UUID are no sequences, and impossible to guess.
Also the rest of the application works with UUID as the unique identifier.
I tried the 'pkField' setting in the configuration block of the connection, but this didn't seem to make any difference.
I'm subscribing with the following code, this fails:
const prom = client.subscribe('user_node', '932895c1-573e-4552-8703-8a9ca9e958ef', (msg, action) => {
console.log("msg: ", msg, " action: ", action);
}
What does work is the pk, even though pkField is set to 'uuid':
const prom = client.subscribe('user_node', '1', (msg, action) => {
console.log("msg: ", msg, " action: ", action);
}
{
"errors": ["Method "method name" not allowed."],
"data": null,
"action": "method name",
"response_status": 405,
"request_id": 5
}
everytime i got this thing while using custom actions.
Is your feature request related to a problem? Please describe.
When sending messages, if the expected data for a given action is not present at the moment the consumer closes, rather than responding with an error
message.
Describe the solution you'd like
Response with an error json message when:
Hi,
I just found out that the obscure channels error message:
"Group name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods."
do not mention the fact that group name cannot be longer than 100 characters.
I use UUID as pk and the generated group name (for model observer) exceed that limit:
1dd94a45-6020-4cc3-8e8a-3b5405a62eb2-handle.instance.change-model-establishment.establishment-pk-b63a92b5-4119-4908-8fcc-28e045a2cbbc
Any idea on how to solve that ?
Describe the bug
When subscribing to an instance using ObserverModelInstanceMixin
in two browsers, the browser that subscribed last will receive all future responses meant for the first browser (after the subscribe_instance acknowledgement). In other words, an update action appears in the websocket messages in the first browser with a certain request_id but the response with the same request_id ends up in the second browser.
To Reproduce
Steps to reproduce the behavior:
class MyModelConsumer(UpdateModelMixin, ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
queryset = MyModel.objects.all()
serializer_class = MyModelSerializer
dcrf-client
in first browser, retrieve
model using pk. (my pk is coming from the url of the page).retrieve
, subscribe-instance
to changes on same pk.dcrf-client
in first browser, update
the model using pk.update
request in net tab for the socket in the first browser. Observe multiple update
responses in net tab for the socket in the second browser, one with request id for the update request and one for each subscription request id. There will be more subscription responses on subsequent tests if you don't unsubscribe before page unload like my simple case. (The left browser was loaded first in the next image.)Expected behavior
Requests from one browser should have responses directed to the same browser.
Debugging steps
I am able to observe that channel layers are working using this code from the channel docs.
python3 manage.py shell
>>> import channels.layers
>>> channel_layer = channels.layers.get_channel_layer()
>>> from asgiref.sync import async_to_sync
>>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'type': 'hello'}
The issue is reproducible for me when installing djangochannelsrestframework
from the branch in #46.
This should wrap the list response so that the items are embedded within a pagination serializer
.
offset
and limit
are logical args that can be passed as part of the request since not all DB backends support cursors for know that is out of scope.
Hi. I just followed the path from channels_api
to here, and thanks for your efforts! I am using Django2, and wanted to added websocket REST API endpoints to my backend.
But I am a bit lost by all the possibilities. I have a backend with multiple django apps inside. Say, one of these is called activities
and its urls are accessible under http://api.example.com/activities/
.
I would like for instance to subsribe to the creation of new activities. I wondered whether I can use my DRF API views over websocket, as the end of your README seems to suggest (with view_as_consumer
). ?
Or should I use some other combination, with for instance the HyperMediaChannels ?
Thanks a lot for your help.
It would be great to let users subscribe to a list/filtered list of objects.
It would be nice to have a simple way to fire of a sub-async run loop as and when a request arrives (if this request is going to take some IO time) so that other messages on the same WebSocket connection can be processed.
I'm thinking that this could be done by adding an argument to that
@action(blocking=False)
that then starts a nested runloop for this message.
or @nonblocking
that requires a dedicated mixin to be added to the Consumer.
Describe the bug
in generics.py line 1 has import of
from typing import Any, Dict, Type, Optional, List, OrderedDict, Union
OrderedDict is having support from only 3.7 later
To Reproduce
Steps to reproduce the behavior:
Expected behavior
No error should come
Describe the bug
if the Observer._serializer is not empty, the .serialize function will try to use it to generate the message_body. Using a standard ModelSerializer from DRF will cause a TypeError because of the arguments.
Removing self from the call will then create another TypeError unexpected keyword because the Field init does not support kwargs.
This part of the code is not covered in the test.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
The serializer should be able to serialize the instance.
LOG
No Log
Additional context
Simply using self._serializer(instance).data will return the serialized instance under a ReturnDict format that can be converted into JSON.
Since this options is not covered in the test, I would guess that it is a very specific edge case or that it is a placeholder for something else but the fix seems to be simple.
I am getting KeyError: 'request_id'
I don't know this is an issue or I am doing something wrong. If I am doing something wrong, please point me a correction.
Consumer:
class MessageConsumer(ListModelMixin, PatchModelMixin, UpdateModelMixin, DeleteModelMixin, GenericAsyncAPIConsumer):
'''A Consumer for Notifications'''
queryset = models.Message.objects.all()
serializer_class = serializers.MessageSerializer
def get_queryset(self):
return models.Message.objects.filter(record__in=models.Record.objects.filter(user=self.request.user)).order_by('-message_date_created')
Routing:
websocket_urlpatterns = [
path('message', consumers.MessageConsumer),
]
Serializer:
class MessageSerializer(serializers.ModelSerializer):
'''Serializer for Notification'''
message_date_created = serializers.DateTimeField(required = False, format="%Y-%m-%d", read_only=True)
class Meta:
model = models.Message
fields = [
'record',
'message_title',
'message_date_created',
'message_status',
]
My request is, e.g.:
{"stream":"message","payload":{"action":"list","data":{},"request_id":"56def654-9087-45f8-9f12-7a7f6c78b04d"}}
I am receiving the following error:
WebSocket HANDSHAKING /message [127.0.0.1:56054]
WebSocket CONNECT /message [127.0.0.1:56054]
Exception inside application: 'request_id'
Traceback (most recent call last):
File "/.../python3.8/site-packages/channels/consumer.py", line 58, in __call__
await await_many_dispatch(
File "/.../python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
await dispatch(result)
File "/.../python3.8/site-packages/channels/consumer.py", line 73, in dispatch
await handler(message)
File "/.../python3.8/site-packages/channels/generic/websocket.py", line 196, in websocket_receive
await self.receive(text_data=message["text"])
File "/.../python3.8/site-packages/channels/generic/websocket.py", line 259, in receive
await self.receive_json(await self.decode_json(text_data), **kwargs)
File "/.../python3.8/site-packages/djangochannelsrestframework/consumers.py", line 158, in receive_json
request_id = content.pop("request_id")
KeyError: 'request_id'
WebSocket DISCONNECT /message [127.0.0.1:56054]
Hi. I have few questions about subscribers. As I understand pagination and filtering is not handled yet.
Thank you for your input
Describe the bug
I am getting this bug when using the
from marketplace.consumers import ListConsumer
TestChannel.objects.create(text="user21 creates a new comment", user=user_1)
TestChannel object (7)
An open stream object is being garbage collected; call "stream.close()" explicitly.
Traceback (most recent call last):
File "", line 1, in
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/query.py", line 453, in create
obj.save(force_insert=True, using=self.db)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/base.py", line 726, in save
self.save_base(using=using, force_insert=force_insert,
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/base.py", line 774, in save_base
post_save.send(
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/dispatch/dispatcher.py", line 180, in send
return [
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/dispatch/dispatcher.py", line 181, in
(receiver, receiver(signal=self, sender=sender, **named))
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 108, in post_save_receiver
self.database_event(instance, Action.CREATE)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 127, in database_event
connection.on_commit(partial(self.post_change_receiver, instance, action))
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/backends/base/base.py", line 645, in on_commit
func()
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 155, in post_change_receiver
self.send_messages(
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 173, in send_messages
async_to_sync(channel_layer.group_send)(group_name, message_to_send)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/asgiref/sync.py", line 222, in call
return call_result.result()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/asgiref/sync.py", line 287, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 684, in group_send
) = self._map_channel_keys_to_connection(channel_names, message)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 819, in _map_channel_keys_to_connection
channel_key_to_message[key] = self.serialize(value)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 839, in serialize
value = msgpack.packb(message, use_bin_type=True)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/msgpack/init.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'TestChannelSerializer' object
To Reproduce
Steps to reproduce the behavior:
class ListConsumer(ListModelMixin, GenericAsyncAPIConsumer, ):
print('got here')
queryset = TestChannel.objects.none()
serializer_class = TestChannelSerializer
permission_classes = [permissions.AllowAny]
async def accept(self, **kwargs):
await super().accept(** kwargs)
await self.model_change.subscribe()
@model_observer(TestChannel)
async def model_change(self, message, action=None, **kwargs):
await self.send_json(message)
@model_change.serializer
def model_serialize(self, instance, action, **kwargs):
return TestChannelSerializer(instance).data
def get_queryset(self, *args, **kwargs):
#user = self.scope['user']
return TestChannel.objects.all()
Expected behavior
I should expect the console update on the new comment created. But the console doesn't log updates.
LOG
If applicable, add a log to help explain your problem.
Hi, first of all, I am trying your lib for the very first time and it seems really handy. Probably my issue it is just a miss configuration. I would be more then happy to help improving the documentation for this case.
Describe the bug
I am basically giving a try, so I just copied and pasted some piece of code from the documentation but it is not working properly, giving me a 500 (Server Internal error) while subscribing to an action.
To Reproduce
Steps to reproduce the behavior:
views.py
class TestConsumer(ObserverModelInstanceMixin):
queryset = get_user_model().objects.all()
serializer_class = UserSerializer
urls.py
urlpatterns = [
re_path(r'', include(router.urls)),
re_path('ws', TestConsumer)
]
Request
{
"action": "subscribe_instance",
"pk": 42,
"request_id": 4
}
LOG
Error
Traceback (most recent call last):
File "/lib/python3.8/site-packages/django/core/handlers/exception.py", line 34, in inner
response = get_response(request)
File "lib/python3.8/site-packages/django/core/handlers/base.py", line 115, in _get_response
response = self.process_exception_by_middleware(e, request)
File "lib/python3.8/site-packages/django/core/handlers/base.py", line 113, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "lib/python3.8/site-packages/djangochannelsrestframework/observer/generics.py", line 64, in __init__
super().__init__(*args, **kwargs)
TypeError: object.__init__() takes exactly one argument (the instance to initialize)
Is your feature request related to a problem? Please describe.
Current docs indicate the following case
''' If you want the data serializeded instead of pk '''
@model_change.serializer
def model_serialize(self, instance, action, **kwargs):
return TestSerializer(instance).data
I have a serializer that requires self.context['scope'].get('user')
. Used to return "You" if its self.context['scope'].get('user')
else the User instance
when passing instance to TestSerializer in mode_serialize, getting key error on "scope"
I can do
return TestSerializer(instance, context={'request': request}).data
but I have no access to request to pass to the serializer.
How do I access the request on a model observer?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.