Giter Site home page Giter Site logo

nilcoalescing / djangochannelsrestframework Goto Github PK

View Code? Open in Web Editor NEW
592.0 5.0 82.0 355 KB

A Rest-framework for websockets using Django channels-v4

Home Page: https://djangochannelsrestframework.readthedocs.io/en/latest/

License: MIT License

Python 100.00%
django channels django-rest-framework

djangochannelsrestframework's People

Contributors

boiyelove avatar destos avatar djwoms avatar gegenschall avatar herst avatar hishnash avatar jakiro2017 avatar jeetpatel9 avatar johnthagen avatar juliger avatar lautarodapin avatar linuxlewis avatar naresh-khatri avatar pranav377 avatar they4kman avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

djangochannelsrestframework's Issues

[Feature] Pre-emptive push for pagented data.

It is in some cases possible for the server to know in advance what objects the client is likely to request next.

For Paginated responses, it would be nice to have the option to pre-emptivly send the next page down to the client after having returned the requested page.

[BUG] Creating serializer from instance in async context

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. ..
  2. ..
class Message(models.Model):
    text = models.TextField()
    user = models.ForeignKey(User, related_name="messages", on_delete=modelsCASCADE)

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = Message.objects.all()
    serializer_class = MessageSerializer
    chat_room_name : Optional[str]

    @action()
    async def join_chat(self, chat: int, **kwargs):
        self.chat_room_name = f"chat_{chat}"
        await self.channel_layer.group_add(
            self.chat_room_name,
            self.channel_name,
        )
        user_serializer = await database_sync_to_async(UserSerializer)(self.scope["user"], many=False)
        await self.channel_layer.group_send(
            self.chat_room_name,
            {
                "type": "notification",
                "message": f"{self.scope['user'].username} joined the chat",
                "user": user_serializer.data,
                "chat": chat
            }
        )
    async def notification(self, event):
        content = dict(
            action="notification",
            message=event["message"],
            chat=event["chat"],
            user=event["user"],
            status=status.HTTP_200_OK
        )
        await self.send_json(content)

    

When I send the action to the websocket, it should add to a group of users, and send to each one the notification, with the key user that is the user that executes the action. BUT when i want to create the serializer in the async context, it never ends.

LOG

      # Test join chat.
        await communicator.send_json_to({
            "action": "join_chat",
            "chat": chat.pk,
            "request_id": now().timestamp(),
        })
>       response = await communicator.receive_json_from()

backend\tests\test_channels_rest_consumers.py:103:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv\lib\site-packages\channels\testing\websocket.py:93: in receive_json_from
    payload = await self.receive_from(timeout)
venv\lib\site-packages\channels\testing\websocket.py:72: in receive_from
    response = await self.receive_output(timeout)
venv\lib\site-packages\asgiref\testing.py:78: in receive_output
    self.future.result()
venv\lib\site-packages\channels\routing.py:150: in __call__
    return await application(
venv\lib\site-packages\channels\consumer.py:94: in app
    return await consumer(scope, receive, send)
venv\lib\site-packages\channels\consumer.py:58: in __call__
    await await_many_dispatch(
venv\lib\site-packages\channels\utils.py:51: in await_many_dispatch
    await dispatch(result)
venv\lib\site-packages\channels\consumer.py:73: in dispatch
    await handler(message)
venv\lib\site-packages\channels\generic\websocket.py:196: in websocket_receive
    await self.receive(text_data=message["text"])
venv\lib\site-packages\channels\generic\websocket.py:259: in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:160: in receive_json
    await self.handle_action(action, request_id=request_id, **content)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:151: in handle_action
    await self.handle_exception(exc, action=action, request_id=request_id)
venv\lib\site-packages\djangochannelsrestframework\consumers.py:117: in handle_exception
    raise exc
venv\lib\site-packages\djangochannelsrestframework\consumers.py:144: in handle_action
    response = await method(request_id=request_id, action=action, **kwargs)
backend\consumers.py:52: in join_chat
    "user": user_serializer.data,
venv\lib\site-packages\rest_framework\serializers.py:548: in data
    ret = super().data
venv\lib\site-packages\rest_framework\serializers.py:246: in data
    self._data = self.to_representation(self.instance)
venv\lib\site-packages\rest_framework\serializers.py:515: in to_representation
    ret[field.field_name] = field.to_representation(attribute)
venv\lib\site-packages\rest_framework\serializers.py:663: in to_representation
    return [
venv\lib\site-packages\django\db\models\query.py:280: in __iter__
    self._fetch_all()
venv\lib\site-packages\django\db\models\query.py:1324: in _fetch_all
    self._result_cache = list(self._iterable_class(self))
venv\lib\site-packages\django\db\models\query.py:51: in __iter__
    results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
venv\lib\site-packages\django\db\models\sql\compiler.py:1167: in execute_sql
    cursor = self.connection.cursor()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _  

args = (<django.db.backends.sqlite3.base.DatabaseWrapper object at 0x0000011228A21400>,), kwargs = {}, event_loop = <_WindowsSelectorEventLoop running=False closed=False debug=False>

    @functools.wraps(func)
    def inner(*args, **kwargs):
        if not os.environ.get('DJANGO_ALLOW_ASYNC_UNSAFE'):
            # Detect a running event loop in this thread.
            try:
                event_loop = asyncio.get_event_loop()
            except RuntimeError:
                pass
            else:
                if event_loop.is_running():
>                   raise SynchronousOnlyOperation(message)
E                   django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.

venv\lib\site-packages\django\utils\asyncio.py:24: SynchronousOnlyOperation
  • OS: Windows 10

[BUG] `ATOMIC_REQUESTS` is checked for in the wrong place.

Describe the bug
djangochannelsrestframework's (DCRF) action decorator looks in Django's settings for ATOMIC_REQUESTS. Since at least Django 1.8, ATOMIC_REQUESTS has been a setting specific to a connection defined in the DATABASE setting. When ATOMIC_REQUESTS is setup as described in Django documentation, DCRF's action decorator does not wrap sync actions in an atomic block, or throw errors on async actions that cannot be wrapped.

See:

To Reproduce
Steps to reproduce the behavior:

  1. Setup django databases. (In this case, setup as in cookiecutter-django).
    # DATABASES
    # ------------------------------------------------------------------------------
    # https://docs.djangoproject.com/en/dev/ref/settings/#databases
    DATABASES = {"default": env.db("DATABASE_URL", default="postgres:///project")}
    DATABASES["default"]["ATOMIC_REQUESTS"] = True
  2. Run a custom async action:
    @action
    async def change_password(self, current_password="", new_password="", confirm_password="", **kwargs):
        ...

Expected behavior
The custom async action, wrapped using @action should throw the exception from this line:
https://github.com/hishnash/djangochannelsrestframework/blob/master/djangochannelsrestframework/decorators.py#L54

DCRF's action decorator should take a using kwarg, similar to atomic's using kwarg, which is used to check if a specific db connection wants to be wrapped in atomic blocks. Barring support for the using kwarg, the default connection should at least be checked for ATOMIC_REQUESTS.

max number of clients reached ?

Hi,

I am actively using your project in mine, and I would like to help improving it. So far, I am not really able to contribute, as I don't understand all the things. I just wanted to report you a problem I have, and suggest a (crude) solution.

My Django app is running on Heroku with the minimum of resources (free when possible). I am using the free plan of RedisCloud with 30 connections, but still, it seems I am reaching the max.

Traceback (most recent call last): 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/celery/app/trace.py", line 382, in trace_task 
Mar 06 22:41:40    R = retval = fun(*args, **kwargs) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/celery/app/trace.py", line 641, in __protected_call__ 
Mar 06 22:41:40    return self.run(*args, **kwargs) 
Mar 06 22:41:40  File "/app/arcsecond/activities/tasks.py", line 52, in parse_ESO_archive_latest_rows 
Mar 06 22:41:40    target_name=row.target_name) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/manager.py", line 82, in manager_method 
Mar 06 22:41:40    return getattr(self.get_queryset(), name)(*args, **kwargs) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/query.py", line 413, in create 
Mar 06 22:41:40    obj.save(force_insert=True, using=self.db) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/base.py", line 718, in save 
Mar 06 22:41:40    force_update=force_update, update_fields=update_fields) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/db/models/base.py", line 758, in save_base 
Mar 06 22:41:40    update_fields=update_fields, raw=raw, using=using, 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/dispatch/dispatcher.py", line 175, in send 
Mar 06 22:41:40    for receiver in self._live_receivers(sender) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/django/dispatch/dispatcher.py", line 175, in <listcomp> 
Mar 06 22:41:40    for receiver in self._live_receivers(sender) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 154, in post_save_receiver 
Mar 06 22:41:40    **kwargs 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 221, in post_change_receiver 
Mar 06 22:41:40    **kwargs 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/djangochannelsrestframework/observer/observer.py", line 233, in send_messages 
Mar 06 22:41:40    async_to_sync(channel_layer.group_send)(group_name, message) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/asgiref/sync.py", line 64, in __call__ 
Mar 06 22:41:40    return call_result.result() 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/concurrent/futures/_base.py", line 425, in result 
Mar 06 22:41:40    return self.__get_result() 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result 
Mar 06 22:41:40    raise self._exception 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/asgiref/sync.py", line 78, in main_wrap 
Mar 06 22:41:40    result = await self.awaitable(*args, **kwargs) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 559, in group_send 
Mar 06 22:41:40    async with self.connection(self.consistent_hash(group)) as connection: 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 742, in __aenter__ 
Mar 06 22:41:40    self.conn = await self.pool.pop() 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/channels_redis/core.py", line 49, in pop 
Mar 06 22:41:40    conns.append(await aioredis.create_redis(**self.host, loop=loop)) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/commands/__init__.py", line 178, in create_redis 
Mar 06 22:41:40    loop=loop) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/connection.py", line 129, in create_connection 
Mar 06 22:41:40    await conn.auth(password) 
Mar 06 22:41:40  File "/app/.heroku/python/lib/python3.7/site-packages/aioredis/util.py", line 48, in wait_ok 
Mar 06 22:41:40    res = await fut 
Mar 06 22:41:40aioredis.errors.MaxClientsError: ERR max number of clients reached 

When going right into the problematic line (line 233 of observer/observer.py), I see a loop over group_names. My very crude proposition would be to extract async_to_sync(channel_layer.group_send) and reuse it for all group_names. Would it save connections ?

Thanks again.

[BUG]`TypeError: has_permission() got an unexpected keyword argument 'scope' `when use `permission_classes = [IsAuthenticated]`

Describe the bug
TypeError: has_permission() got an unexpected keyword argument 'scope' when use permission_classes = [IsAuthenticated]

To Reproduce
Steps to reproduce the behaviour:

  1. Write a consumer with permission_classes = [IsAuthenticated]
  2. Authenticate via websocket, verify scope has an user instance
  3. Got exception
Exception inside application: has_permission() got an unexpected keyword argument 'scope'
Traceback (most recent call last):
  File "/home/rufs/Bugs/eclass-api/utils/channel_token.py", line 38, in __call__
    return await inner(receive, send)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/generic/websocket.py", line 196, in websocket_receive
    await self.receive(text_data=message["text"])
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/generic/websocket.py", line 259, in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 164, in receive_json
    await self.handle_action(action, request_id=request_id, **content)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 155, in handle_action
    await self.handle_exception(exc, action=action, request_id=request_id)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 117, in handle_exception
    raise exc
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 136, in handle_action
    await self.check_permissions(action, **kwargs)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/djangochannelsrestframework/consumers.py", line 93, in check_permissions
    if not await ensure_async(permission.has_permission)(
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/asgiref/sync.py", line 296, in __call__
    ret = await asyncio.wait_for(future, timeout=None)
  File "/home/rufs/.pyenv/versions/3.8.1/lib/python3.8/asyncio/tasks.py", line 455, in wait_for
    return await fut
  File "/home/rufs/.pyenv/versions/3.8.1/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/channels/db.py", line 14, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
  File "/home/rufs/.pyenv/versions/blank-test/lib/python3.8/site-packages/asgiref/sync.py", line 334, in thread_handler
    return func(*args, **kwargs)
TypeError: has_permission() got an unexpected keyword argument 'scope'

Expected behaviour
It should raise exception user is authenticated or not.

LOG
If applicable, add log to help explain your problem.

  • OS: Ubuntu
  • Version 18.04
  • Python 3.8.1
    Additional context
    It's weird since it didn't call to the function below, I really out of idea why this is happened.
class IsAuthenticated(BasePermission):
    async def has_permission(
        self, scope: Dict[str, Any], consumer: AsyncConsumer, action: str, **kwargs
    ) -> bool:
        user = scope.get("user")
        if not user:
            return False
        return user.pk and user.is_authenticated

when comment all params it leads to
TypeError: has_permission() missing 2 required positional arguments: 'request' and 'view'

Supplement readme

Could you complement the examples in the readme with the appropriate client-side code and websocket send/recieve messages?

For example, I don't fully understand how to work with custom actions or subscribing to a filtered list of models on the client side.

[Feature] Buffer `signals` that happen durring a transaction

Currently if you have an open transaction and update/create/delete a record the subscribing channels will be notified before you commit your transaction.

We should instead buffer these notifications and only submit them after the transaction completes.

--

This is also a bug since users do not expect to receive notifications for events that did not happen (eg if the transaction is rolled back)

Detail Documentation

Hi, Hishnash.

I believe your project would be very much promising for the beginners like me if you had provided a little bit more detailed documentation.

Actually I'm trying to create a chat application with Django Channels which is very confusing. But yours seems would ease the burdain if there is a detail documentation.

N.B: I'm doing my Final Year project with Django Rest Framework which is why may be your short Documentation is very confusing for me to apply in my project.

Please, help me with any resources to implement your project in my solution.

[BUG] Request IDs for multiple model instance subscriptions are stored incorrect

Describe the bug
In a project of mine, I discovered that after subscribing to multiple instances of a model, all updates being sent by the backend for any of the subscribed instances contain the latest request_id that was sent and not the request_id that they were initially sent with.
Therefore, e.g. when using the dcrf-client in the frontend, the frontend callbacks are not triggered, as the request_id of the update does not match the request_id that was initially sent with the subscription.

To Reproduce
Steps to reproduce the behavior:
I subscribed to two instances of an example Model Test with the following requests:

request: {"stream":"test","payload":{"action":"subscribe_instance","request_id":"beabe327-3e51-4538-b5c8-ab32ae2c7cd8","pk":1}}
response: {"stream": "test", "payload": {"errors": [], "data": null, "action": "subscribe_instance", "response_status": 201, "request_id": "beabe327-3e51-4538-b5c8-ab32ae2c7cd8"}}
request: {"stream":"test","payload":{"action":"subscribe_instance","request_id":"d6a4a829-ab00-492c-9bdb-01ef77dbab15","pk":2}}
response: {"stream": "test", "payload": {"errors": [], "data": null, "action": "subscribe_instance", "response_status": 201, "request_id": "d6a4a829-ab00-492c-9bdb-01ef77dbab15"}}

One can see that the request_id of the direct answer is correct.
When updating the Test instance with the pk = 1, I'd expect the response to contain the request_id = 'beabe327-3e51-4538-b5c8-ab32ae2c7cd8', but instead it returns

{"stream": "test", "payload": {"errors": [], "data": {"id": 1, "name": "Test", "description": "This is a test"}, "action": "update", "response_status": 200, "request_id": "d6a4a829-ab00-492c-9bdb-01ef77dbab15"}}

The error
I dug a little into the corresponding code and added a print statement to the method subscribe_instance of class ObserverModelInstanceMixin in obersver/generics.py:

print(self.subscribed_requests)

right after the

self.subscribed_requests[self.__class__.handle_instance_change] = request_id

I could then see on the console the reason for the unexpected behaviour:

{<djangochannelsrestframework.observer.model_observer.ModelObserver object at 0x7f85801b86d0>: 'beabe327-3e51-4538-b5c8-ab32ae2c7cd8'}
{<djangochannelsrestframework.observer.model_observer.ModelObserver object at 0x7f85801b86d0>: 'd6a4a829-ab00-492c-9bdb-01ef77dbab15'}

The subscribed_requests only contains the latest request_id, though in the second print it should contain two entries. As one can see, the keys are identical, however, which is why the request_id is being replaced.

I'm not sure about this, but it seems that it's a mistake to set the ModelObserver object as the key in subscribed_requests and it's actually wanted to use the result of the other method with the same name

    @handle_instance_change.groups
    def handle_instance_change(self: ModelObserver, instance, *args, **kwargs):
        # one channel for all updates.
        yield "{}-model-{}-pk-{}".format(
            self.func.__name__.replace("_", "."), self.model_label, instance.pk
        )

which returns a string that would be unique for the model instance, but instead the method

    @_GenericModelObserver
    async def handle_instance_change(self, message: Dict, **kwargs):
        message = deepcopy(message)
        action = message.pop("action")
        message.pop("type")

        await self.handle_observed_action(
            action=action,
            request_id=self.subscribed_requests.get(
                self.__class__.handle_instance_change
            ),
            **message,
        )

is being used.

As I'm not so good at async python rn, I don't know exactly how to fix this, but I think that by addressing this issue the correct request_ids would be stored.

LOG
If applicable, add log to help explain your problem.

  • Using Version 0.2.0

More documentation on the view_as_consumer

Please explain the uses for view_as_consumer.

There's no documentation to explain why one would use this, and the advantages gained from doing so.

Currently, I create a url pattern that uses the view_as_consumer. I import a DRF Model Viewset from .views

from djangochannelsrestframework.consumers import view_as_consumer
from .multiplexing import OpsDemultiplexer
from .views import ProfileViewSet

websocket_urlpatterns = [
    path('ws/multi/', OpsDemultiplexer),
    url(r"testing123/", view_as_consumer(ProfileViewSet)),
]

Then I import the url_pattern to the router.

application = ProtocolTypeRouter({
    'websocket': AuthMiddlewareStack(
        URLRouter(
            common.routing.websocket_urlpatterns + testing.routing.websocket_urlpatterns
        )
    ),
})

I'm not sure of what to expect at this point. How do I access the view? What are the perks of connecting to the view via websockets? Is it more efficient? Am I even doing this right?

I make use of the multiplexing, connecting multiple consumers via 1 path. I would like to incorporate the view_as_consumer if it will somehow make this process more efficient.

As of now, I use my DRF views for my HTTP requests. Does this wrapper replace that at all? Can one at least do GET requests? The only apparent feature I can think of is being able to have temporary views that are only accessible for the duration of the socket connection.

[BUG] Subscribing to all model instances doesn't work

Describe the bug
I have the following consumer:

class Consumer(AsyncAPIConsumer):
    permission_classes = (permissions.IsAuthenticated, )

    @model_observer(SomeModel)
    async def model_changed(self, message, observer=None, **kwargs):
        raise NotImplementedError()

    @action()
    async def subscribe_all(self, **kwargs):
        await self.model_changed.subscribe()
        return {}, 200

I'm sending the following message

{        
    "stream": "stream",
    "payload": {
        "action": "subscribe_all",
        "request_id": 42
    }
}

I can see that the subscribe_all() method is called and a 200 status is sent back to the client. Then when I create a SomeModel instance or change one nothing happens. No method is called and the NotImplementedError in model_changed is never raised.

How is observing all models supposed to work? What am I missing here?

Thank you!

DCRF websocket client on npm

Hello there! Just wanted to mention an npm package I published to provide a websocket client to django-channels-rest-framework: https://github.com/theY4Kman/dcrf-client

It's Promise-based, and supports subscriptions โ€” which can be canceled. The test/integration/dcrf_client_test directory contains a Django Channels v2 project as an example for how to setup the Django side of things (with the important thing being the use of pk to return a model's ID from a serializer).

If you wanna check it out, it's available on npm

npm install dcrf-client

ObserverModelMixins

Hello! Thank you for the work you did creating djangochannelsrestframework!

I want to know what are your plans for further development? I studied the code, and I have a few questions regarding Observer's.

  1. ObserverModelInstanceMixin is now implemented, and looking at its code, it becomes clear what it does, but this is a little not obvious API, in my opinion.

Why not implement CreateModelMixin, ListObserverModel, RetrieveObserverModel, UpdateModelMixin, PatchModelMixin, DeleteModelMixin, which would provide more obvious functionality?

  1. Why is the delete action present in ObserverModelInstanceMixin? This may slightly confuse users of this application.

Based on what I wrote in point 1, this logic can be divided, and by inheriting mixins, the user can get everything he needs.

I will gladly take part in the development of the project, if you allow it. However, I donโ€™t have as much experience as you do.

Where should I start?

[BUG/FEATURE] send_message parameter should be immutable

In ModelObserver.send_message message object passed as mutable. So if you chage it in self.func for one group, other calls of self.func (for other groups) will recieve modified message.
If you don't want to send "type" to ws and call message.pop("type", None) in self.func other groups will not recieve this message.
I think message should be immutable or self.func should recieve copy of message as parametr.

[BUG] ObserverModelInstanceMixin seems to not be working across threads/processes

Describe the bug
I have one simple Consumer + Multiplexer as follows:

class TaskStatusConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
    queryset = TaskStatus.objects.all()
    serializer_class = TaskStatusSerializer
    permission_classes = (permissions.AllowAny,)


class StatusDemultiplexerAsyncJson(AsyncJsonWebsocketDemultiplexer):
    applications = {
        'task-status': TaskStatusConsumer
    }

I subscribe to events using dcrf-client on the JavaScript side of things. I see that the WebSocket connection successfully subscribes to a newly created model instance using subscribe_instance.

I have a view similar to this:

class TaskStatusViewSet(viewsets.ModelViewSet):
    queryset = TaskStatus.objects.all()
    serializer_class = serializers.TaskStatusSerializer

    @action(methods=['post'], detail=True)
    def some_action(self, request, pk=None):
        task = TaskStatus.objects.get(pk=pk)
        task.some_attr = 'some-value'
        task.save()

When sending a request to that view I do see an update message for the task.save() call being sent down on the WebSocket.

Yet any subsequent requests are not being being sent over the WebSocket connection. Somehow the model observer "forgets" about the model instance after the initial request is made.

Am I using the Observer wrong?

Exception TypeError: has_permission() got an unexpected keyword argument 'scope'

Maybe not is a bug. I am testing this library, but have problems.
This exception appear when try since my websocket client send json: {'action': 'list', 'request_id': 18}

To Reproduce
My model is very simple. I am testing:

class Stock(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=128)
    price = models.FloatField()
    created = models.DateTimeField(auto_now_add=True)
    update = models.DateTimeField(auto_now=True)

    class Meta:
        verbose_name = "stock"
        verbose_name_plural = "stocks"
        ordering = ['-id', ]

    def __str__(self):
        return self.name

And this is the restconsumer:

from djangochannelsrestframework.mixins import ListModelMixin


class StockConsumer(ListModelMixin, GenericAsyncAPIConsumer):
    queryset = Stock.objects.all()
    serializer_class = StockSerializer
    permission_classes = (permissions.IsAuthenticated,)

And serializer is a standart ModelSerializer.

My websocket client (from javascript) do this:

<script>
let socket = new WebSocket('ws://localhost:8000/ws/stock/');
let timer = null;
socket.onopen = () => {
    timer = setInterval(() => {

        request = {
            "action": "list",
            "request_id": 18
        }

        socket.send(JSON.stringify(request));

    }, 1000);
};
socket.onclose = socket.onerror = () => {
    clearInterval(timer);
};
socket.onmessage = function(event) {
  var pMsg = document.getElementById("id123");
  if(pMsg == null)
  {
      let pMsg = document.createElement("p")
      pMsg.id = "id123"
      document.body.appendChild(pMsg)
  }
  pMsg.innerHTML = event.data
}
</script>

LOG

127.0.0.1:55509 - - [01/Feb/2021:21:05:00] "WSCONNECT /ws/stock/" - -
2021-02-01 21:05:01,958 ERROR    Exception inside application: has_permission() got an unexpected keyword argument 'scope'
Traceback (most recent call last):
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\routing.py", line 71, in __call__
    return await application(scope, receive, send)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\sessions.py", line 47, in __call__
    return await self.inner(dict(scope, cookies=cookies), receive, send)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\sessions.py", line 254, in __call__
    return await self.inner(wrapper.scope, receive, wrapper.send)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\auth.py", line 181, in __call__
    return await super().__call__(scope, receive, send)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\middleware.py", line 26, in __call__
    return await self.inner(scope, receive, send)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\routing.py", line 150, in __call__
    return await application(
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\consumer.py", line 73, in dispatch
    await handler(message)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\generic\websocket.py", line 196, in websocket_receive
    await self.receive(text_data=message["text"])
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\generic\websocket.py", line 259, in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 198, in receive_json
    await self.handle_action(action, request_id=request_id, **content)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 217, in handle_action
    await self.handle_exception(exc, action=action, request_id=request_id)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 117, in handle_exception
    raise exc
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 205, in handle_action
    await self.check_permissions(action, **kwargs)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\djangochannelsrestframework\consumers.py", line 93, in check_permissions
    if not await ensure_async(permission.has_permission)(
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\asgiref\sync.py", line 296, in __call__
    ret = await asyncio.wait_for(future, timeout=None)
  File "c:\programdata\miniconda3\envs\django_test\lib\asyncio\tasks.py", line 455, in wait_for
    return await fut
  File "c:\programdata\miniconda3\envs\django_test\lib\concurrent\futures\thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\channels\db.py", line 13, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
  File "c:\programdata\miniconda3\envs\django_test\lib\site-packages\asgiref\sync.py", line 334, in thread_handler
    return func(*args, **kwargs)
TypeError: has_permission() got an unexpected keyword argument 'scope'
127.0.0.1:55509 - - [01/Feb/2021:21:05:01] "WSDISCONNECT /ws/stock/" - -
  • OS: Windows 10
Packages related:
django-rest-auth            0.9.5
djangochannelsrestframework 0.2.1
djangorestframework         3.12.2
djangorestframework-gis     0.15

Investigate using PostgreSQL NOTIFY and commit hooks

Is your feature request related to a problem? Please describe.
Currently this lib uses Django Signals to detect changes however Django does not send these events if you do bulk updates or raw SQL operations and if you have other clients connecting to the database making changes you also do not get any events.

Describe the solution you'd like
PostgreSQL support the concept of NOTIFY and LISTEN, since commit hocks can send NOTIFY messages it would be possible to have PG automatically inform all LISTENs of db changes regardless of were the changes came from.

I would like to investigate a little more how NOTIFY and LISTEN work in PG and how easy/hard it would be to add something to Django's DB manager that automatically adds migrations for ObservableModel classes to trigger the NOTIFY on commit.

As a first pass it could be interesting to implement a PG packed change channels layer that uses the LISTEN and NOTIFY api this could be an interesting option for people who do not want to deploy an additional redis.

[BUG] "Incoming message has no 'type' attribute"

Multiple connections not working on Observer
We are working on a Django project, we are trying to implement channels connections but when we try to observe our Model instance change on multiple connections we end up viewing the change only on one of them, reporting on the other "DISCONNECTED" and ValueError("Incoming message has no 'type' attribute").
This is our Consumer:

class BoardObserverConsumer(ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
    queryset = Board.objects.all()
    serializer_class = BoardChannelSerializer

and this is the Serializer:

class BoardChannelSerializer(HyperChannelsApiModelSerializer):
    class Meta:
        model = Board
        fields = ('@id', 'name', 'players_count')

The version we are currently using is 0.0.3 as it is required by HyperMediaChannels.

To Reproduce
We connected various browsers with "in incognito" mode to the same localhost.

Expected behavior
We expect to be able to see the changes from all connections indistinguishably.

LOG

Exception inside application: Incoming message has no 'type' attribute
Traceback (most recent call last):
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/sessions.py", line 183, in __call__
    return await self.inner(receive, self.send)
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/middleware.py", line 41, in coroutine_call
    await inner_instance(receive, send)
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channelsmultiplexer/demultiplexer.py", line 54, in __call__
    await future
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 59, in __call__
    [receive, self.channel_receive], self.dispatch
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 71, in dispatch
    handler = getattr(self, get_handler_name(message), None)
  File "/Users/myusername/Documents/Development/AmbVirt/lib/python3.7/site-packages/channels/consumer.py", line 19, in get_handler_name
    raise ValueError("Incoming message has no 'type' attribute")
ValueError: Incoming message has no 'type' attribute
WebSocket DISCONNECT /ws/channel/ [127.0.0.1:59317]
WebSocket HANDSHAKING /ws/channel/ [127.0.0.1:59354]
WebSocket CONNECT /ws/channel/ [127.0.0.1:59354]
  • OS: macOS 10.15.4 but same issue reproduced on Windows 10 Home

Add sphinx docs

There realy should be some docs covering at least

  • Using to wrap Django Views (your normal views over WebSocket)
  • Creating Using the Generic View GenericAsyncAPIConsumer
  • Using all the mixins for GenericAsyncAPIConsumer
  • Custom actions on any AsyncAPIConsumer
  • Permissions and how they play with Channels
  • Observers using them to listen to events in djanog (eg user-login event)
  • Subscribing to changes of a model instance ObserverModelInstanceMixin

Model subscribe not handle changes in model on creating a new model object from views or shell.

Describe the bug
Model subscribe not handle changes in model on creating a new model object from views or shell it only works from admin panel

Consumer code :

class LiveNotificationConsumer(ListModelMixin, GenericAsyncAPIConsumer):
    serializer_class = NotificationSerializer
    permission_classes = (permissions.IsAuthenticated,)

    async def accept(self, **kwargs):
        await super().accept(** kwargs)
        await self.model_change.subscribe()

    @model_observer(Notification)
    async def model_change(self, message, action=None, **kwargs):
        if message['user'] == self.scope['user'].id:
            await self.send_json(message)

    @model_change.serializer
    def model_serialize(self, instance, action, **kwargs):
        return NotificationSerializer(instance).data

    def get_queryset(self, *args, **kwargs):
        user = self.scope['user']
        return user.notifications.all()

Client code :

var endpoint = "ws://127.0.0.1:8000/ws/notifications/"
        var ws = new WebSocket(endpoint + "?token=" + token)
        ws.onmessage = function(e){
            console.log('comming data -->', e.data)
        }

        var msg = JSON.stringify({
            action: "list",
            request_id: 6,
        })
        ws.onopen = () =>  ws.send(msg);

        ws.onclose = function (e) {
            console.log(e)
            console.error('Error websocket');
        };
  • OS: [Ubuntu]
  • Version [12.04]

Model Object extraction not showing updated value

Describe the bug
I'm not sure if this is an issue with channels, djangochannelsrestframework, or drf.

I'm currently using the @model_observer to watch for model changes via DRF.

This works as intended on all create/update/delete operations. I'm notified of any changes and immediately perform an object extraction.

I figured I'd make it simple and just extract all objects and send it back to the client(s). The clients immediately update the state once the new object is received.

The problem is that the object extracted in the consumer doesn't include the latest update. So I'm sending the unmodified object back to the client(s).

Either I'm incorrectly extracting the model object or there's an async problem, where my object extraction happens to soon.

UPDATE:
Tried extracting the model object with a SyncConsumer, and the result is still the same. It's definitely not an async issue. Perhaps it's because of the DRF serializer? Does that somehow interrupt the ORM processing?

To Reproduce
Steps to reproduce the behavior:

  1. Add model observer to Model
  2. Subscribe to model change events
  3. Extract Model and Print results
  4. Make model changes via DRF, which should trigger another model extraction
  5. Re-print results

Expected behavior
New Model results are the same as the old model results, when it should reflect the recent model changes.

I would simply like to be notified of all model changes (Which are made via DRF), so I can instantly send the client(s) back the updated model objects.

LOG

django==2.2.8
djangorestframework==3.10.3
channels==2.4.0
psycopg2==2.8.3 --no-binary psycopg2

  • OS: [e.g. Ubuntu]
  • Version [e.g. 18.04]

Additional context
CODE:

class UserFormConsumer(AsyncAPIConsumer):

@database_sync_to_async
def get_userform_object(self, curr_id):
    print('REACHED UserFormConsumer GET USERFORM OBJECT')
    user_form_serializer = serializers.UserFormSerializer(models.UserForm.objects.all(), many=True)
    return user_form_serializer.data

async def accept(self, subprotocol=None):
    # subscribe to all new changed events
    obj_id = ""
    userform_data = await self.get_userform_object(obj_id)
    print('OLD USERFORM DATA', userform_data)
    await UserFormConsumer.activities_change.subscribe(self)
    await super().accept()

@model_observer(models.UserForm)  # you Django Model cls goes here.
async def activities_change(self, message, observer=None, **kwargs):
    # check the message for the action type if you only want to send events for new ones.
    print(observer)
    print('TestingActivitiesChange', message)
    await self.send_json(message)

async def receive_json(self, content):

    if 'user_id' in content:
        user_id = content['user_id']

        # Get User Group
        current_group_name = 'user_%s' % user_id

        print('USER ID', user_id)
        if content['server_command'] == 'get_all':
            obj_id = ""
            message = content['message']
            userform_data = await self.get_userform_object(obj_id)
            print('Get ALL Message', message)
            print('User Form Data', userform_data)

Django concurrency: can't create data for testing in async function?

Hello I have this quastion anout chango channels testing do u have answer for it pleases?

  • Goal Test async function with test database
  • Problem I have async functions for Django channels and when I create data in setUp the data are not reflected in the project!!!
  • My tests: note: these tests inside a APITestCase class

class WebsocketTests(APITestCase):
        def setUp(self):
        user1 = User.objects.create(username='Alex',email='[email protected]')
        user2 = User.objects.create(username='Sam',email='[email protected]')
        user3 = User.objects.create(username='Clover',email='[email protected]')

    async def test_connect(self):
        communicator = WebsocketCommunicator(application, 'alerts/')
        connected, subprotocol = await communicator.connect()
        assert connected
        await communicator.disconnect()

@database_sync_to_async
def get_user(querys):
    print(User.objects.count()) #<======= this returns 0 while it should return 3

    try:
        token = parse_qs(querys.decode("utf8"))['token'][0]
        token_data = UntypedToken(token)
        user_id = token_data["user_id"]
    except:
        return 'token is invalid'

    try:
        return User.objects.get(id=user_id)
    except User.DoesNotExist:
        return 'AnonymousUser'
class QueryAuthMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        scope['user'] = await get_user(scope["query_string"])
        return await self.app(scope, receive, send)


application = ProtocolTypeRouter({
    "http": django_asgi_app,
    "websocket": QueryAuthMiddleware(
        URLRouter(
            websocket_urlpatterns
        )
    ),
})

#settings.py
DATABASES = {
        'default': {
            'ENGINE': 'django.db.backends.sqlite3',
            'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
            'OPTIONS': {
                'timeout': 20,  # in seconds
                # see also
                # https://docs.python.org/3.7/library/sqlite3.html#sqlite3.connect
            },
            'TEST': {
                'NAME': os.path.join(BASE_DIR, "db_test.sqlite3"),
            },

        }
    }

How to subscribe to model instance creation?

Sorry for writing here, but I can't find answer in docs. Client part of my app expects backend to create new model instance. If I use subscribe_instance, I get 404 response right after subscription, because the instance is not exist yet. The client have to watch for an instance with exact pk... I'm new in channels, maybe this task is relatively simple.

Change format of request to "subscribe to model change"

Is your feature request related to a problem? Please describe.
I'm feel a bit frustrated when I need to implement model observer subscribe. I want the client send this:

{
  "action": "sub",
  "request_id": 42,
  "model": "User"
}

instead of

async def websocket_connect(self, message):

    # Super Save
    await super().websocket_connect(message)

    # Initialized operation
    await self.activities_change.subscribe()

Describe the solution you'd like
Um, same as above?

Describe alternatives you've considered
I see https://github.com/theY4Kman/dcrf-client but I want a more simple client from web browser

Additional context
Please guide me, I will implement and make pr. Currently, I had a dirty hack by manually add to available_actions, but don't know how to call subscribe() for model update

[BUG] Subscribing to model

Describe the bug
I'm trying to subscribe for model changes with no luck. I'm following exactly the guide on https://djangochannelsrestframework.readthedocs.io/en/latest/observer/base_observer.html.

To Reproduce
Steps to reproduce the behavior:

  1. Follow guide on https://djangochannelsrestframework.readthedocs.io/en/latest/observer/base_observer.html
  2. Create comments as user 1 and user 2.
  3. Nothing is returned from the server.

Expected behavior
A clear and concise description of what you expected to happen.

  1. Changes made in model is returned from the server.
  • OS: Debian
  • Version 10.9 (WSL)

Additional context

I have following consumer:

# consumers.py

from djangochannelsrestframework.generics import GenericAsyncAPIConsumer
from djangochannelsrestframework.observer import model_observer
from djangochannelsrestframework.decorators import action

from .serializers import UserSerializer, CommentSerializer
from .models import User, Comment

class MyConsumer(GenericAsyncAPIConsumer):
    queryset = User.objects.all()
    serializer_class = UserSerializer

    @model_observer(Comment)
    async def comment_activity(self, message: CommentSerializer, observer=None, **kwargs):
        print("Model observer 1")
        await self.send_json(message.data)
        print("Model observer 1")

    @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs) -> CommentSerializer:
        print("Serializer 1")
        '''This will return the comment serializer'''
        return CommentSerializer(instance)
        print("Serializer 2")

    @action()
    async def subscribe_to_comment_activity(self, **kwargs):
        print("Subscribe 1")
        await self.comment_activity.subscribe()
        print("Subscribe 2")

And I have a template index.html:

<script>
    const ws = new WebSocket("ws://localhost:8000/ws/my-consumer/")

    ws.onopen = function(){
        ws.send(JSON.stringify({
            action: "subscribe_to_comment_activity",
            request_id: new Date().getTime(),
        }))
    }
    ws.onmessage = function(e){
        console.log(e)
    }
</script>

When I refresh the browser, I can see that the browser sends the subscribe_to_comment_activity, but nothing is returned back, even when I create the comments from shell.

The server log prints following:

System check identified no issues (0 silenced).
June 06, 2021 - 10:41:31
Django version 3.2.3, using settings 'websocket.settings'
Starting ASGI/Channels version 3.0.3 development server at http://127.0.0.1:8000/
Quit the server with CONTROL-C.
HTTP GET / 200 [0.01, 127.0.0.1:64881]
WebSocket HANDSHAKING /ws/my-consumer/ [127.0.0.1:64882]
WebSocket CONNECT /ws/my-consumer/ [127.0.0.1:64882]
Subscribe 1
Subscribe 2

The above would indicate that it probably did not reach the model observer or the serializer.

I'm wondering if the imported GenericAsyncAPIConsumer is correct, as it indeed is different from the documentation, as it could not be found from djangochannelsrestframework.consumers. Any ideas?

[Question] Many-to-Many Relationship

First of all thanks for this great library. It made a lot of stuff I have to handle much easier. But now I'm confronted with a hard-to-solve problem. I have a many-to-many relationship and want to subscribe to all the children. Just like the documentation (https://github.com/hishnash/djangochannelsrestframework#subscribing-to-a-filtered-list-of-models) shows. But I can't think of a way to implement the groups_for_signal without database calls.

Let's say my models look like this:

from django.db import models

class Publication(models.Model):
    title = models.CharField(max_length=30)

    class Meta:
        ordering = ['title']

    def __str__(self):
        return self.title

class Article(models.Model):
    headline = models.CharField(max_length=100)
    publications = models.ManyToManyField(Publication)

    class Meta:
        ordering = ['headline']

    def __str__(self):
        return self.headline

(https://docs.djangoproject.com/en/3.2/topics/db/examples/many_to_many/)

To get the groups_for_signal I would need to do something like this:

for publication in instance.publications.all():
    yield f"-publication__{publication.pk}"

But as stated in the documentation this doesn't seem to be a good idea. Is there a better way that I forgot?

Is there a way to get an actions argument within of consumer methods?

Here is an example from the filtering model observer:

class MyConsumer(GenericAsyncAPIConsumer):
    ...

    @action()
    async def subscribe_to_comment_activity(self, user_pk, **kwargs):
        # We will check if the user is authenticated for subscribing.
        user = await database_sync_to_async(User.objects.get)(pk=user_pk)
        await self.comment_activity.subscribe(user=user)

I want to do some filtering, but that is only possible for my case if I have the instance at hand to filter against.
That means as I see it, I need the user=user value in the observer serializer method.

 @comment_activity.serializer
    def comment_activity(self, instance: Comment, action, **kwargs) -> CommentSerializer:
        '''This will return the comment serializer'''
        return CommentSerializer(instance)

And here comes my question then, how can I get the user value in the model serializer method?

Using another unique id to identify records

tl;dr is it possible to subscribe to objects with another field than the private key?

I'm trying to connect an existing database model to the asynchronous websockets.
I tried the pkField setting in connect, like this:

client = dcrf.connect(`${ws}:${window.location.host}${path}`, {
    pkField: "uuid",
});

But when I try to subscribe to an object I can still only refer to it by the primary key in the table, not uuid.

I would like to use UUID instead of private key in my subscriptions, as UUID are no sequences, and impossible to guess.
Also the rest of the application works with UUID as the unique identifier.

I tried the 'pkField' setting in the configuration block of the connection, but this didn't seem to make any difference.

I'm subscribing with the following code, this fails:

const prom = client.subscribe('user_node', '932895c1-573e-4552-8703-8a9ca9e958ef', (msg, action) => {
    console.log("msg: ", msg, " action: ", action);
}

What does work is the pk, even though pkField is set to 'uuid':

const prom = client.subscribe('user_node', '1', (msg, action) => {
    console.log("msg: ", msg, " action: ", action);
}

Error While Using Custom Actions.

{
"errors": ["Method "method name" not allowed."],
"data": null,
"action": "method name",
"response_status": 405,
"request_id": 5
}
everytime i got this thing while using custom actions.

Batter handling of malformed WS message bodies.

Is your feature request related to a problem? Please describe.
When sending messages, if the expected data for a given action is not present at the moment the consumer closes, rather than responding with an error message.

Describe the solution you'd like
Response with an error json message when:

  • malformed JSON is sent over the websocket connection
  • the action keyword is missing
  • the action method expects a kwarge that was not provided as part of the action json

max length limit on channels group name

Hi,
I just found out that the obscure channels error message:

"Group name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods."

do not mention the fact that group name cannot be longer than 100 characters.

I use UUID as pk and the generated group name (for model observer) exceed that limit:

1dd94a45-6020-4cc3-8e8a-3b5405a62eb2-handle.instance.change-model-establishment.establishment-pk-b63a92b5-4119-4908-8fcc-28e045a2cbbc

Any idea on how to solve that ?

[BUG] Action response to the wrong client

Describe the bug
When subscribing to an instance using ObserverModelInstanceMixin in two browsers, the browser that subscribed last will receive all future responses meant for the first browser (after the subscribe_instance acknowledgement). In other words, an update action appears in the websocket messages in the first browser with a certain request_id but the response with the same request_id ends up in the second browser.

To Reproduce
Steps to reproduce the behavior:

  1. Setup a consumer:
    class MyModelConsumer(UpdateModelMixin, ObserverModelInstanceMixin, GenericAsyncAPIConsumer):
        queryset = MyModel.objects.all()
        serializer_class = MyModelSerializer   
  2. On page load, using dcrf-client in first browser, retrieve model using pk. (my pk is coming from the url of the page).
  3. Observe request and response in net tab for the socket.
  4. After retrieve, subscribe-instance to changes on same pk.
  5. Observe request and response in net tab for the socket.
  6. Repeat browser steps using a second browser.
  7. Using dcrf-client in first browser, update the model using pk.
  8. Observe update request in net tab for the socket in the first browser. Observe multiple update responses in net tab for the socket in the second browser, one with request id for the update request and one for each subscription request id. There will be more subscription responses on subsequent tests if you don't unsubscribe before page unload like my simple case. (The left browser was loaded first in the next image.)
    image

Expected behavior
Requests from one browser should have responses directed to the same browser.

Debugging steps
I am able to observe that channel layers are working using this code from the channel docs.

python3 manage.py shell
>>> import channels.layers
>>> channel_layer = channels.layers.get_channel_layer()
>>> from asgiref.sync import async_to_sync
>>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'})
>>> async_to_sync(channel_layer.receive)('test_channel')
{'type': 'hello'}

The issue is reproducible for me when installing djangochannelsrestframework from the branch in #46.

[Feature] Support Pagination

This should wrap the list response so that the items are embedded within a pagination serializer.

offset and limit are logical args that can be passed as part of the request since not all DB backends support cursors for know that is out of scope.

Question on urls and routes

Hi. I just followed the path from channels_api to here, and thanks for your efforts! I am using Django2, and wanted to added websocket REST API endpoints to my backend.

But I am a bit lost by all the possibilities. I have a backend with multiple django apps inside. Say, one of these is called activities and its urls are accessible under http://api.example.com/activities/.

I would like for instance to subsribe to the creation of new activities. I wondered whether I can use my DRF API views over websocket, as the end of your README seems to suggest (with view_as_consumer). ?

Or should I use some other combination, with for instance the HyperMediaChannels ?

Thanks a lot for your help.

[Feature] Api to enable the processing of multiple actions concurrently on a consumer.

It would be nice to have a simple way to fire of a sub-async run loop as and when a request arrives (if this request is going to take some IO time) so that other messages on the same WebSocket connection can be processed.

I'm thinking that this could be done by adding an argument to that

@action(blocking=False) that then starts a nested runloop for this message.

or @nonblocking that requires a dedicated mixin to be added to the Consumer.

[BUG] Support for Python 3.6

Describe the bug
in generics.py line 1 has import of
from typing import Any, Dict, Type, Optional, List, OrderedDict, Union

OrderedDict is having support from only 3.7 later

To Reproduce
Steps to reproduce the behavior:

  1. Install 3.6
  2. Write an API using GenericAsyncAPIConsumer

Expected behavior
No error should come

[Improve Docs] Should include documentation for setting custom Observer serializer

Describe the bug
if the Observer._serializer is not empty, the .serialize function will try to use it to generate the message_body. Using a standard ModelSerializer from DRF will cause a TypeError because of the arguments.

Removing self from the call will then create another TypeError unexpected keyword because the Field init does not support kwargs.

This part of the code is not covered in the test.

To Reproduce
Steps to reproduce the behavior:

  1. Create a custom Observer extending ModelObserver (or similar).
  2. Override the init method, while first calling the super() then assigning a Serializer using self.serializer(serializer)
  3. Subscribe to the consumer using the custom Observer

Expected behavior
The serializer should be able to serialize the instance.

LOG
No Log

Additional context
Simply using self._serializer(instance).data will return the serialized instance under a ReturnDict format that can be converted into JSON.

Since this options is not covered in the test, I would guess that it is a very specific edge case or that it is a placeholder for something else but the fix seems to be simple.

KeyError: 'request_id'

I am getting KeyError: 'request_id'
I don't know this is an issue or I am doing something wrong. If I am doing something wrong, please point me a correction.

Consumer:

class MessageConsumer(ListModelMixin, PatchModelMixin, UpdateModelMixin, DeleteModelMixin, GenericAsyncAPIConsumer):
    '''A Consumer for Notifications'''

    queryset = models.Message.objects.all()
    serializer_class = serializers.MessageSerializer

    def get_queryset(self):
        return models.Message.objects.filter(record__in=models.Record.objects.filter(user=self.request.user)).order_by('-message_date_created')

Routing:

websocket_urlpatterns = [
    path('message', consumers.MessageConsumer),
]

Serializer:

class MessageSerializer(serializers.ModelSerializer):
    '''Serializer for Notification'''
    message_date_created = serializers.DateTimeField(required = False, format="%Y-%m-%d", read_only=True)

    class Meta:
        model = models.Message
        fields = [
            'record',
            'message_title', 
            'message_date_created',
            'message_status',
        ]

My request is, e.g.:

{"stream":"message","payload":{"action":"list","data":{},"request_id":"56def654-9087-45f8-9f12-7a7f6c78b04d"}}

I am receiving the following error:

WebSocket HANDSHAKING /message [127.0.0.1:56054]
WebSocket CONNECT /message [127.0.0.1:56054]
Exception inside application: 'request_id'
Traceback (most recent call last):
  File "/.../python3.8/site-packages/channels/consumer.py", line 58, in __call__
    await await_many_dispatch(
  File "/.../python3.8/site-packages/channels/utils.py", line 51, in await_many_dispatch
    await dispatch(result)
  File "/.../python3.8/site-packages/channels/consumer.py", line 73, in dispatch
    await handler(message)
  File "/.../python3.8/site-packages/channels/generic/websocket.py", line 196, in websocket_receive
    await self.receive(text_data=message["text"])
  File "/.../python3.8/site-packages/channels/generic/websocket.py", line 259, in receive
    await self.receive_json(await self.decode_json(text_data), **kwargs)
  File "/.../python3.8/site-packages/djangochannelsrestframework/consumers.py", line 158, in receive_json
    request_id = content.pop("request_id")
KeyError: 'request_id'
WebSocket DISCONNECT /message [127.0.0.1:56054]

Question about subscribing

Hi. I have few questions about subscribers. As I understand pagination and filtering is not handled yet.

  • Can I subscribe to all model changes that match permissions?
  • How is unsubscribe handled?

Thank you for your input

An open stream object is being garbage collected; call "stream.close()[BUG]

Describe the bug
I am getting this bug when using the

from marketplace.consumers import ListConsumer
TestChannel.objects.create(text="user21 creates a new comment", user=user_1)
TestChannel object (7)
An open stream object is being garbage collected; call "stream.close()" explicitly.
Traceback (most recent call last):
File "", line 1, in
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/query.py", line 453, in create
obj.save(force_insert=True, using=self.db)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/base.py", line 726, in save
self.save_base(using=using, force_insert=force_insert,
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/models/base.py", line 774, in save_base
post_save.send(
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/dispatch/dispatcher.py", line 180, in send
return [
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/dispatch/dispatcher.py", line 181, in
(receiver, receiver(signal=self, sender=sender, **named))
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 108, in post_save_receiver
self.database_event(instance, Action.CREATE)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 127, in database_event
connection.on_commit(partial(self.post_change_receiver, instance, action))
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/django/db/backends/base/base.py", line 645, in on_commit
func()
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 155, in post_change_receiver
self.send_messages(
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/djangochannelsrestframework/observer/model_observer.py", line 173, in send_messages
async_to_sync(channel_layer.group_send)(group_name, message_to_send)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/asgiref/sync.py", line 222, in call
return call_result.result()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/asgiref/sync.py", line 287, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 684, in group_send
) = self._map_channel_keys_to_connection(channel_names, message)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 819, in _map_channel_keys_to_connection
channel_key_to_message[key] = self.serialize(value)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/channels_redis/core.py", line 839, in serialize
value = msgpack.packb(message, use_bin_type=True)
File "/Users/ericel123/Documents/dev/django/lib/python3.8/site-packages/msgpack/init.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'TestChannelSerializer' object

To Reproduce
Steps to reproduce the behavior:

class ListConsumer(ListModelMixin, GenericAsyncAPIConsumer, ): 
    print('got here')
    queryset = TestChannel.objects.none()
    serializer_class = TestChannelSerializer
    permission_classes = [permissions.AllowAny]
    async def accept(self, **kwargs):
        await super().accept(** kwargs)
        await self.model_change.subscribe()

    @model_observer(TestChannel)
    async def model_change(self, message, action=None, **kwargs):
        await self.send_json(message)

    @model_change.serializer
    def model_serialize(self, instance, action, **kwargs):
        return TestChannelSerializer(instance).data

    def get_queryset(self, *args, **kwargs):
        #user = self.scope['user']
        return TestChannel.objects.all()

Expected behavior
I should expect the console update on the new comment created. But the console doesn't log updates.
LOG
If applicable, add a log to help explain your problem.
Screen Shot 2021-08-16 at 9 05 33 PM

  • OS: [MacOS]
    Can this be a Python version problem? I am using 3.8.0

[BUG] Error while subscribing to a model

Hi, first of all, I am trying your lib for the very first time and it seems really handy. Probably my issue it is just a miss configuration. I would be more then happy to help improving the documentation for this case.

Describe the bug
I am basically giving a try, so I just copied and pasted some piece of code from the documentation but it is not working properly, giving me a 500 (Server Internal error) while subscribing to an action.

To Reproduce
Steps to reproduce the behavior:
views.py

class TestConsumer(ObserverModelInstanceMixin):
    queryset = get_user_model().objects.all()
    serializer_class = UserSerializer

urls.py

urlpatterns = [
    re_path(r'', include(router.urls)),
    re_path('ws', TestConsumer)
]

Request

{
    "action": "subscribe_instance",
    "pk": 42,  
    "request_id": 4 
}

LOG
Error

Traceback (most recent call last):
  File "/lib/python3.8/site-packages/django/core/handlers/exception.py", line 34, in inner
    response = get_response(request)
  File "lib/python3.8/site-packages/django/core/handlers/base.py", line 115, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "lib/python3.8/site-packages/django/core/handlers/base.py", line 113, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "lib/python3.8/site-packages/djangochannelsrestframework/observer/generics.py", line 64, in __init__
    super().__init__(*args, **kwargs)
TypeError: object.__init__() takes exactly one argument (the instance to initialize)
  • OS: Ubuntu
  • Version 20.04
  • Python 3.8
  • djangorestframework==3.11.0
  • django==3.0.4

Current user in model observer

Is your feature request related to a problem? Please describe.
Current docs indicate the following case

 ''' If you want the data serializeded instead of pk '''
@model_change.serializer
def model_serialize(self, instance, action, **kwargs):
    return TestSerializer(instance).data

I have a serializer that requires self.context['scope'].get('user'). Used to return "You" if its self.context['scope'].get('user') else the User instance

when passing instance to TestSerializer in mode_serialize, getting key error on "scope"
I can do

return TestSerializer(instance, context={'request': request}).data

but I have no access to request to pass to the serializer.

How do I access the request on a model observer?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.