Giter Site home page Giter Site logo

httpx-ws's Introduction

HTTPX WS

WebSockets support for HTTPX

build codecov All Contributors PyPI version Downloads

Subscribe


Documentation: https://frankie567.github.io/httpx-ws/

Source Code: https://github.com/frankie567/httpx-ws


Installation

pip install httpx-ws

Features

  • Sync and async client
  • Helper methods to send text, binary and JSON data
  • Helper methods to receive text, binary and JSON data
  • Automatic ping/pong answers
  • HTTPX transport to test WebSockets defined in ASGI apps
  • Automatic keepalive ping
  • asyncio and Trio support through AnyIO

Contributors ✨

Thanks goes to these wonderful people (emoji key):

François Voron
François Voron

🚧 💻
Kousik Mitra
Kousik Mitra

💻
David Brochart
David Brochart

📦
ysmu
ysmu

🐛
Sam Foreman
Sam Foreman

🐛
Marc-Antoine Parent
Marc-Antoine Parent

🐛 💻
Marcelo Trylesinski
Marcelo Trylesinski

🐛 🔬
MtkN1
MtkN1

🐛 🔬
Tom Christie
Tom Christie

🐛 🔬
David Montague
David Montague

🐛
Sean Wang
Sean Wang

💻

This project follows the all-contributors specification. Contributions of any kind welcome!

Development

Setup environment

We use Hatch to manage the development environment and production build. Ensure it's installed on your system.

Run unit tests

You can run all the tests with:

hatch run test

Format the code

Execute the following command to apply linting and check typing:

hatch run lint

License

This project is licensed under the terms of the MIT license.

httpx-ws's People

Contributors

allcontributors[bot] avatar frankie567 avatar kousikmitra avatar maparent avatar saforem2 avatar wsh032 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

httpx-ws's Issues

Connection loss is never detected by `receive_json()` (likely all receive methods)

Describe the bug

Connection loss is never detected by receive_json().

Providing a timeout parameter to ws.receive_json() doesn't help any.

To Reproduce

  async with AsyncClient() as client:
      async with aconnect_ws(self._uri, client, keepalive_ping_timeout_seconds=None) as ws:
          while ws.connection.state is ConnectionState.OPEN:
              await ws.receive_json()

Expected behavior

When connection is lost, ws.receive*() raises an error and the async context manager calls __aexit__.

Can't send close reason correctly

Describe the bug

When closing a WebSocket connection, httpx-ws can correctly send the close code, but it is unable to send the close reason.

To Reproduce

Run following code

import anyio
import httpx_ws
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from uvicorn import Config
from uvicorn.server import Server

CODE = 1002
REASON = "Bye"

PORT = 8699

app = FastAPI()

@app.websocket("/")
async def websocket(ws: WebSocket):
    await ws.accept()
    await ws.send_text("Hello, world!")
    try:
        await ws.receive_text()
    except WebSocketDisconnect as e:
        print(e)

        # NOTE: here we find that the close reason is not what we expected
        print("is wrong reason?", e.reason != REASON)

server = Server(Config(app=app, port=PORT))

async def main():
    async with httpx_ws.aconnect_ws(f"ws://127.0.0.1:{PORT}/") as ws:
        print(await ws.receive_text())
        await ws.close(CODE, REASON)

async with anyio.create_task_group() as tg:
    tg.start_soon(server.serve)
    # wait for server to start
    await anyio.sleep(5)
    tg.start_soon(main)
    # wait for main to finish
    await anyio.sleep(5)

    server.should_exit = True

You can see:

(<CloseReason.PROTOCOL_ERROR: 1002>, None)
is wrong reason? True

This is a colab demo, click to run

Expected behavior

WebSocketDisconnect should be (1002, 'Bye')

Configuration

fastapi==0.110.1 httpx-ws==0.6.0 anyio==4.3.0 uvicorn==0.29.0 hypercorn==0.16.0

receive_bytes returns weird bytes; max_message_size_bytes loses messages

Describe the bug

Received bytes are truncated

To Reproduce

  1. Install required deps:
pip install dag-cbor aiohttp httpx-ws starlette
  1. Create .py file and copy-paste the code below
import asyncio
import io

import aiohttp
import dag_cbor

from httpx_ws import aconnect_ws

URI = 'ws://127.0.0.1:8000/ws'  # for aithttp
URL = 'http://127.0.0.1:8000/ws'  # for httpx


def _decode_bytes(msg: io.BytesIO) -> None:
    header = dag_cbor.decode(msg, allow_concat=True)
    if header.get('t') != '#commit':
        return
    dag_cbor.decode(msg)
    print('Alive')


async def _aiohttp_client() -> None:
    async with aiohttp.ClientSession() as client, client.ws_connect(URI) as ws:
        while True:
            msg = io.BytesIO(await ws.receive_bytes())
            _decode_bytes(msg)


async def _httpx_client() -> None:
    max_bytes = 1024 * 1024 * 5  # 5MB
    async with aconnect_ws(URL, max_message_size_bytes=max_bytes) as ws:
        while True:
            msg = io.BytesIO(await ws.receive_bytes())
            _decode_bytes(msg)


async def main() -> None:
    # UNCOMMENT ME
    # await _aiohttp_client()
    await _httpx_client()


if __name__ == '__main__':
    asyncio.run(main())
  1. Create .py file and copy-paste the code below
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute


PAYLOADS = []
for i in range(100):
    with open(f'payloads/{i}.txt', 'rb') as f:
        PAYLOADS.append(f.read())


async def ws_bug(websocket):
    await websocket.accept()
    for p in PAYLOADS:
        await websocket.send_bytes(p)
    await websocket.close()


app = Starlette(
    routes=[
        WebSocketRoute('/ws', ws_bug),
    ],
)
  1. Download payloads.zip; unzip
  2. Run the server
  3. Run the client. See the exceptions
  4. Uncomment await _aiohttp_client() line in the main() and run again. See that there are no exceptions

Expected behavior

The decoding should not raise any exceptions (Unexpected EOF, invalid lenght and so on related to len). The log must be only with "Alive" messages

Configuration

  • Python version: 3.8.16
  • httpx-ws version: 0.4.1

Additional context

with aiohttp websocket client it decodes fine; i tried to increase max_message_size_bytes

Funding

  • You can sponsor this specific effort via a Polar.sh pledge below
  • We receive the pledge once the issue is completed & verified
Fund with Polar

Connection hangs forever if the WebSocket server just sends a message and close

Describe the bug

Connection hangs forever if the WebSocket server just sends a message and then close.

The client waits forever for a message, missing the only message that was sent by the server.

To Reproduce

Server code

import uvicorn
from starlette.applications import Starlette
from starlette.websockets import WebSocket
from starlette.routing import WebSocketRoute


async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await websocket.send_text("FOO")
    await websocket.close()


routes = [
    WebSocketRoute("/ws", endpoint=websocket_endpoint),
]

app = Starlette(debug=True, routes=routes)

if __name__ == "__main__":
    uvicorn.run(app)

Client code

import httpx
from httpx_ws import connect_ws


def my_client():
    with httpx.Client() as client:
        with connect_ws("http://localhost:8000/ws", client) as ws:
            text = ws.receive_text()
            return text


if __name__ == "__main__":
    print(my_client())

Expected behavior

The client should be able to read the message and close properly.

Configuration

  • Python version: 3.7+
  • httpx-ws version: 0.1.0
  • httpx version: 0.23.1

Additional context

  • Doesn't happen if we add an asyncio.sleep or a receive_* operation on the server side
  • Doesn't happen with websockets library

unhandled WebSocketNetworkError in ping.

Traceback (most recent call last):
  File "C:\Python37x86\lib\site-packages\httpx_ws\_api.py", line 978, in _background_keepalive_ping
    pong_callback = await self.ping()
  File "C:\Python37x86\lib\site-packages\httpx_ws\_api.py", line 625, in ping
    await self.send(event)
  File "C:\Python37x86\lib\site-packages\httpx_ws\_api.py", line 660, in send
    raise WebSocketNetworkError() from e
httpx_ws._api.WebSocketNetworkError

ssl.SSLError not handled

Task exception was never retrieved
future: <Task finished coro=<AsyncWebSocketSession._background_receive() done, defined at E:\proj\.venv\lib\site-packages\httpx_ws\_api.py:924> exception=SSLError(1, '[SSL: SSLV3_ALERT_BAD_RECORD_MAC] sslv3 alert bad record mac (_ssl.c:2570)')>
Traceback (most recent call last):
  File "E:\proj\.venv\lib\site-packages\httpx_ws\_api.py", line 941, in _background_receive
    self.stream.read(max_bytes=max_bytes)
  File "E:\proj\.venv\lib\site-packages\httpx_ws\_api.py", line 993, in _wait_until_closed
    return todo_task.result()
  File "E:\proj\.venv\lib\site-packages\anyio\streams\tls.py", line 196, in receive
    data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
  File "E:\proj\.venv\lib\site-packages\anyio\streams\tls.py", line 131, in _call_sslobject_method
    result = func(*args)
  File "C:\python37x86\lib\ssl.py", line 718, in read
    v = self._sslobj.read(len)
ssl.SSLError: [SSL: SSLV3_ALERT_BAD_RECORD_MAC] sslv3 alert bad record mac (_ssl.c:2570)

tasks not awaited when closing the connection

Describe the bug

In our tests we were getting warnings that tasks _background_receive_task and _background_keepalive_ping_task were terminated while pending.

Expected behavior

All tasks should be awaited to avoid warnings.

Configuration

  • Python version: 3.11
  • httpx-ws version: 0.4.1

Possible fix

The next patch fixes the issue for us:

diff --git a/httpx_ws/_api.py b/httpx_ws/_api.py
index 5abfebe..cb51a34 100644
--- a/httpx_ws/_api.py
+++ b/httpx_ws/_api.py
@@ -924,6 +924,7 @@ class AsyncWebSocketSession:
             except httpcore.WriteError:
                 pass
         await self.stream.aclose()
+        await asyncio.gather(self._background_receive_task, self._background_keepalive_ping_task)

     async def _background_receive(self, max_bytes: int) -> None:
         """

aconnect_ws does not work with AsyncClient bound to an app

Describe the bug

aconnect_ws does not work with an app which is bound to an existing FastAPI app.

To Reproduce

@pytest.fixture(name="client")
async def fixture_client(api: FastAPI) -> AsyncIterator[AsyncClient]:
    async with AsyncClient(app=api, base_url="http://test") as client:
        yield client

async def test_websocket(client: AsyncClient) -> None:
    async with aconnect_ws("/ws/", client) as ws:
        data = await ws.receive_json()

I get:

    @contextlib.asynccontextmanager
    async def _aconnect_ws(
        url: str,
        client: httpx.AsyncClient,
        *,
        max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
        queue_size: int = DEFAULT_QUEUE_SIZE,
        keepalive_ping_interval_seconds: typing.Optional[
            float
        ] = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
        keepalive_ping_timeout_seconds: typing.Optional[
            float
        ] = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
        subprotocols: typing.Optional[typing.List[str]] = None,
        **kwargs: typing.Any,
    ) -> typing.AsyncGenerator[AsyncWebSocketSession, None]:
        headers = kwargs.pop("headers", {})
        headers.update(_get_headers(subprotocols))
    
        async with client.stream("GET", url, headers=headers, **kwargs) as response:
            if response.status_code != 101:
>               raise WebSocketUpgradeError(response)
E               httpx_ws._exceptions.WebSocketUpgradeError: <Response [404 Not Found]>

Obviously, the endpoint /ws/ exists within the app.

Expected behavior

It should call directly the FastAPI app, no TCP/HTTP connection should be made.

Configuration

  • Python version: 3.12
  • httpx-ws version: 0.5.1

connect_ws and aconnect_ws always raise WebSocketNetworkError when keepalive_ping_timeout_seconds + keepalive_ping_interval_seconds

connect_ws and aconnect_ws always raise WebSocketNetworkError when keepalive_ping_timeout_seconds + keepalive_ping_interval_seconds (Actual WS is working normally)

image
image

To Reproduce

try:
    with Client(base_url=url, params={'token': token}) as client:
        with connect_ws('/traffic', client,
                        keepalive_ping_timeout_seconds=3,
                        keepalive_ping_interval_seconds=4
                        ) as ws:
            n = 0
            start = time.time()
            while True:
                n += 1
                print(n, ws.receive_json())
except WebSocketNetworkError:
    print('WebSocketNetworkError, total time:', time.time() - start)

Expected behavior

The server sends traffic once per second, which is stable and permanent, and should not trigger errors or timeouts

Configuration

  • Python version: 3.11.4
  • httpx-ws version: 0.4.1
  • httpx version: 0.24.1

Additional context

None

Use `anyio` rather than `asyncio`, to support Trio

httpx is built on the anyio library, in order to support both asyncio and Trio. Unfortunately, httpx-ws uses asyncio directly in a few places, which breaks Trio support!

Using the equivalent APIs from anyio would open your library up for enthusiastic use by those of us who prefer Trio to asyncio.

Upvote & Fund

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.
Fund with Polar

Using httpx-ws as background task to test websockets

Hi, reposting the issue to this github.

I am struggling with httpx-ws. It works when the server-side websocket connection is closed, but my implementations are based on websockets that are open until the client breaks them. Could you help me adjusting the code to make the implementation work?

Websocket implementation

from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
import asyncio

@router.websocket("/ws")
async def websocket_labtest(websocket: WebSocket):
    await websocket.accept()
    sub_id: uuid.UUID = uuid.uuid4()
    subjects.ADD_LABTEST_QUEUE[sub_id] = asyncio.Queue()

    try:
        while True:
            event: events.LabtestAddedEvent = await subjects.ADD_LABTEST_QUEUE[sub_id].get()
            await websocket.send_json(event.json())
    except WebSocketDisconnect:
        subjects.ADD_LABTEST_QUEUE.pop(sub_id)

Test implementation

@pytest.fixture
async def labtest_client_async() -> AsyncClient:
    async with AsyncClient(transport=ASGIWebSocketTransport(app), base_url='http://test') as client:
        yield client


async def subscribe_to_labtests_ws_async(labtest_client_async: httpx.AsyncClient, queue: asyncio.Queue):
    async with httpx_ws.aconnect_ws("/labtests/ws", labtest_client_async) as ws:
        message = await ws.receive_json()
        queue.put_nowait(message)


@pytest.mark.asyncio
async def test_add_lab_test(labtest_client_async: httpx.AsyncClient):
    sub_queue: asyncio.Queue = asyncio.Queue()
    task = asyncio.create_task(subscribe_to_labtests_ws_async(labtest_client_async, sub_queue))

    try:
        await asyncio.sleep(SHORT_WAIT)
        await add_labtest(labtest_client_async)
        await asyncio.sleep(SHORT_WAIT)
    finally:
        task.cancel()

    message_labtest1 = await sub_queue.get()
    print(f"{message_labtest1=}")

The websocket works fine when I use e.g. Postman to connect to the websocket. It seems httpx_ws is unable to close the connection even though I say task.cancel(). That makes the test run until I force quit.

It seems like the problem being related to asyncio.create_task. If I don't wait for the "add labtest event" in server, I manage to get the message sent to me. So it seems like the connection struggles to run as a background task. Maybe this is completely out of scope. I am open to ideas on how to do this kind of integration testing.

I know the code is not self-contained. If it's hard to understand, I can make a simpler self-contained example

Support `httpcore==1.*`

Describe the bug

Latest httpx version 0.25.2 depends on httpcore v1 (==1.*), while httpx-ws depends onf httpcore v0.17 (>=0.17.3,<0.18).

As a result, httpx cannot be upgraded to to 0.25.2 when used together with httpx-ws.

Poetry output:

To Reproduce

Steps to reproduce the behavior:

  1. Add httpx==2.5.2 and httpx-ws==0.4.2 into the dependency file of Poetry or Pipfile
  2. Try locking the dependencies (poetry lock --no-update or pipenv install)
  3. See error:
    Because httpx (0.25.2) depends on httpcore (==1.*)
     and httpx-ws (0.4.2) depends on httpcore (>=0.17.3,<0.18), httpx (0.25.2) is incompatible with httpx-ws (0.4.2).
    So, because <your-app> depends on both httpx (0.25.2) and httpx-ws (0.4.2), version solving failed.

Expected behavior

httpx-ws should support httpcore v1 and dependecnies should be locked without errors.

Configuration

  • Python version: 3.11.6
  • httpx-ws version: 0.4.2

Memory leak on multiple async connections.

Describe the bug

A clear and concise description of what the bug is.

To Reproduce

import asyncio

async def leak():
    async with httpx.AsyncClient(timeout=21) as client:
        async with httpx_ws.aconnect_ws(
            f"https://socketsbay.com/wss/v2/1/demo/",
            client,
        ) as ws_client:
            pass

input("before")
asyncio.run(asyncio.gather(*[leak() for _ in range(100)]))
input("after")

Expected behavior

No memory leak

Configuration

  • Python version: 3.7 (32 bit) on win 11 (64 bit)
  • httpx-ws version: 0.4.1

Additional context

In my run of above code memory bump from 10MB to 80MB

`ModuleNotFoundError: No module named 'httpcore.backends'`

Describe the bug

Trying to run a different project jpterm and encountered the following traceback:

Traceback (most recent call last):
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/bin/jpterm", line 5, in <module>
    from jpterm.cli import main
  File "/Users/samforeman/projects/jpterm/jpterm/cli.py", line 3, in <module>
    from txl.app import disabled
  File "/Users/samforeman/projects/jpterm/txl/txl/app.py", line 6, in <module>
    components = {
  File "/Users/samforeman/projects/jpterm/txl/txl/app.py", line 7, in <dictcomp>
    ep.name: ep.load()
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/lib/python3.10/site-packages/pkg_resources/__init__.py", line 2517, in load
    return self.resolve()
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/lib/python3.10/site-packages/pkg_resources/__init__.py", line 2523, in resolve
    module = __import__(self.module_name, fromlist=['__name__'], level=0)
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/lib/python3.10/site-packages/txl_remote_contents/components.py", line 9, in <module>
    from httpx_ws import aconnect_ws
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/lib/python3.10/site-packages/httpx_ws/__init__.py", line 3, in <module>
    from httpx_ws._api import (
  File "/Users/samforeman/projects/saforem2/l2hmc-qcd/venvs/py310/lib/python3.10/site-packages/httpx_ws/_api.py", line 14, in <module>
    from httpcore.backends.base import AsyncNetworkStream, NetworkStream
ModuleNotFoundError: No module named 'httpcore.backends'

I've identified the problem to be coming from this line in your httpx_ws/_api.py:L14 file

from httpcore.backends.base import AsyncNetworkStream, NetworkStream

Replacing

from httpcore.backends.base import AsyncNetworkStream, NetworkStream

with

from httpcore._backends.base import AsyncNetworkStream, NetworkStream

resolved the issue.

To Reproduce

Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior

A clear and concise description of what you expected to happen.

Configuration

  • Python version:
  • httpx-ws version:

Additional context

Add any other context about the problem here.

Termination errors out with async client

Describe the bug

Exiting the async context manager async with aconnect_ws(uri) errors out.

To Reproduce

# echo server

import asyncio
from websockets import serve

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def main():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()

asyncio.run(main())
# async client

import asyncio
from httpx_ws import aconnect_ws

async def hello(uri):
    async with aconnect_ws(uri) as ws:
        message = "Hello world!"
        await ws.send_text(message)
        print(f"sent: {message}")
        message = await ws.receive_text()
        print(f"received: {message}")

asyncio.run(hello("ws://localhost:8765"))

Shows the following error:

sent: Hello world!
received: Hello world!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<AsyncIOStream.read() done, defined at /home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/httpcore/backends/asyncio.py:23> exception=ReadError(ClosedResourceError())>
Traceback (most recent call last):
  File "/home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/httpcore/_exceptions.py", line 10, in map_exceptions
    yield
  File "/home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1272, in receive
    raise ClosedResourceError from None
anyio.ClosedResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/httpcore/backends/asyncio.py", line 31, in read
    with map_exceptions(exc_map):
  File "/home/david/micromamba/envs/jpterm/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/david/.local/share/hatch/env/virtual/jpterm/-V8cnvgI/dev/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc)
httpcore.ReadError

While it works fine with the non-async version:

# client

from httpx_ws import connect_ws

with connect_ws("ws://localhost:8765") as ws:
    message = "Hello world!"
    ws.send_text(message)
    print(f"sent: {message}")
    message = ws.receive_text()
    print(f"received: {message}")

Unhandled exception of pinging in closing state

Task exception was never retrieved
future: <Task finished coro=<AsyncWebSocketSession._background_keepalive_ping() done, defined at E:\proj\.venv\lib\site-packages\httpx_ws\_api.py:951> exception=LocalProtocolError('Event Ping(payload=b"vaY\\xd4\'\\x13\\x81\\x9ft\\x9a \\x953.a\\xfb\\xc6,\\xcbj\\x8c\\xa6\\xf4s\\x17&\\x80\\x11\\xc5\\x87y\\xfd") cannot be sent in state ConnectionState.LOCAL_CLOSING.')>
Traceback (most recent call last):
  File "E:\proj\.venv\lib\site-packages\httpx_ws\_api.py", line 957, in _background_keepalive_ping
    pong_callback = await self.ping()
  File "E:\proj\.venv\lib\site-packages\httpx_ws\_api.py", line 611, in ping
    await self.send(event)
  File "E:\proj\.venv\lib\site-packages\httpx_ws\_api.py", line 637, in send
    data = self.connection.send(event)
  File "E:\proj\.venv\lib\site-packages\wsproto\connection.py", line 108, in send
    f"Event {event} cannot be sent in state {self.state}."
wsproto.utilities.LocalProtocolError: Event Ping(payload=b"vaY\xd4'\x13\x81\x9ft\x9a \x953.a\xfb\xc6,\xcbj\x8c\xa6\xf4s\x17&\x80\x11\xc5\x87y\xfd") cannot be sent in state ConnectionState.LOCAL_CLOSING.

Newest anyio 4 has moved start_blocking_portal context manager to from_thread module

Describe the bug

When upgrading anyio to version 4, all tests (but one) in tests/test_transport.py fail with AttributeError: module 'anyio' has no attribute 'start_blocking_portal'
There is a call to anyio.start_blocking_portal at transport.py:46.
This should be replaced by anyio.from_thread.start_blocking_portal, which is also compatible with anyio==3.7.1

To Reproduce

Run tests

Expected behavior

Tests pass

Configuration

  • Python version: 3.10, 3.11
  • httpx-ws version: git main head (31b6683)

`anyio.BusyResourceError`: Another task is already writing to this resource

  File "~~~~.py", line 80, in resolve_url
    await self.ws.send_json(data)
  File "~~~~\Lib\site-packages\httpx_ws\_api.py", line 701, in send_json
    await self.send_text(serialized_data)
  File "~~~~\Lib\site-packages\httpx_ws\_api.py", line 659, in send_text
    await self.send(event)
  File "~~~~\Lib\site-packages\httpx_ws\_api.py", line 638, in send
    await self.stream.write(data)
  File "~~~~\Lib\site-packages\httpcore\backends\asyncio.py", line 51, in write
    await self._stream.send(item=buffer)
  File "~~~~\Lib\site-packages\anyio\_backends\_asyncio.py", line 1291, in send
    with self._send_guard:
  File "~~~~\Lib\site-packages\anyio\_core\_synchronization.py", line 584, in __enter__
    raise BusyResourceError(self.action)
anyio.BusyResourceError: Another task is already writing to this resource

I can't quite trace the conditions for the above error, because it only happens occasionally.

Propagate server errors in ASGI testing

Describe the bug

I'm writing some tests with the library and I noticed that this library does not propagate the server errors like the httpx test client and hangs the test indefinitely. Call the test with some asyncio timeout solves the hang issue, but I'm still unable to see the stacktrace of the exception. The issue gets slightly worse if the exception happens before websocket.accept(): it appears that it would block on thread.join() somewhere. In Linux this can be aborted with a Ctrl-C but in Windows this is uninterruptable :(

To Reproduce

I took the sample almost as-is from the docs. The main change is that I added a 1/0 in ws_hello.

import asyncio

import httpx
from httpx_ws import aconnect_ws
from httpx_ws.transport import ASGIWebSocketTransport
from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute


async def http_hello(request):
    return HTMLResponse("Hello World!")


async def ws_hello(websocket):
    1/0
    await websocket.accept()
    await websocket.send_text("Hello World!")
    await websocket.close()


app = Starlette(
    routes=[
        Route("/http", http_hello),
        WebSocketRoute("/ws", ws_hello),
    ],
)


async def main():
    async with httpx.AsyncClient(transport=ASGIWebSocketTransport(app)) as client:
        http_response = await client.get("http://server/http")
        assert http_response.status_code == 200

        print("connecting to ws")
        async with aconnect_ws("http://server/ws", client) as ws:
            message = await ws.receive_text()
            assert message == "Hello World!"

    print("done")


asyncio.run(main())

Expected behavior

The httpx client to propagate the error the server encountered.

Configuration

Python 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022, 23:13:41) [MSC v.1929 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import starlette, httpx, httpx_ws
>>> starlette.__version__
'0.26.1'
>>> httpx.__version__
'0.23.3'
>>> httpx_ws.__version__
'0.3.0'

Client instances not closed.

When using connect_ws() or aconnect_ws(), if you don't pass a client instance, then a new client instance is created, but remains unclosed once the call returns.

It's a little awkward to get right, but the safest approach is to stick with with ... blocks for handling open/close of resources:

@contextlib.contextmanager
def connect_ws(...):
    if client is None:
        with httpx.Client(...) as client:
            yield _connect_ws(..., client=client)
    else:
        yield _connect_ws(..., client=client)

@contextlib.contextmanager
def _connect_ws(...):
    # main body of `connect_ws()`, but with a *mandatory* client argument.

Although you can also handle it with try...except:

@contextlib.contextmanager
def connect_ws(...):
    should_close_client = False
    if client is None:
        client = ...
        should_close_client = True

    try:
        ... # main body of connect_ws()
    finally:
        if should_close_client:
            client.close()  # We only want to do this if we've created the client instance ourselves

Missing `subprotocols` field in the websocket scope

Describe the bug

Trying to use httpx-ws to test GraphQL subscriptions provided by the fastapi + strawberry combo using an example from the Testing ASGI section of the docs.

Establishing WebSocket connection always fails with the following error:

self = <httpx_ws.transport.ASGIWebSocketAsyncNetworkStream object at 0x7f0eee7210>

    async def __aenter__(self) -> "ASGIWebSocketAsyncNetworkStream":
        self.exit_stack = contextlib.ExitStack()
        self.portal = self.exit_stack.enter_context(
            anyio.start_blocking_portal("asyncio")
        )
        _: "Future[None]" = self.portal.start_task_soon(self._run)
        await self.send({"type": "websocket.connect"})
        message = await self.receive()
        if message["type"] == "websocket.close":
            await self.aclose()
>           raise WebSocketDisconnect(message["code"], message.get("reason"))
E           httpx_ws._api.WebSocketDisconnect: (<CloseReason.INTERNAL_ERROR: 1011>, "'subprotocols'")

.venv/lib/python3.11/site-packages/httpx_ws/transport.py:53: WebSocketDisconnect

Turns out, Strawberry relies on Starletter Websocket scope to contain subprotocols entry: https://github.com/strawberry-graphql/strawberry/blob/c6eb8656670ba859dd2dd959c9f7a84e6d80ef84/strawberry/fastapi/router.py#L271C6-L271C6
So it fails with a KeyError("subprotocols").

Comparing the scopes generated by httpx-ws test transport and the actual queries made by GraphQL clients I found these discrepancies:
Screenshot_20230628_092307_Termux

So seems like for the real calls, subprotocols field is being added to the scopes, while in httpx-ws transport it is not:

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
scope = {
"type": "websocket",
"path": request.url.path,
"raw_path": request.url.raw_path,
"root_path": self.root_path,
"scheme": scheme,
"query_string": request.url.query,
"headers": [(k.lower(), v) for (k, v) in request.headers.raw],
"client": self.client,
"server": (request.url.host, request.url.port),
}
return await self._handle_ws_request(request, scope)

Not being a WebSocket guru, I'm not sure if it is actually an httpx-ws or strawberry bug... 🤔

To Reproduce

Test code:

from httpx import AsyncClient
from httpx_ws import aconnect_ws
from httpx_ws.transport import ASGIWebSocketTransport

@pytest.mark.anyio
async def test_subscriptions(fastapi_app):
    async with AsyncClient(transport=ASGIWebSocketTransport(fastapi_app)) as client:
        async with aconnect_ws("ws://test/graphql", client, subprotocols=["graphql-transport-ws"]) as ws:
            pass

This raises an error described above.

Expected behavior

WebSocket connection is expected to be established successfully.

Potential fix

If I add something like this to the scope creation code in the httpx-ws transport:

scope = {
    ...,
    "subprotocols": [subprotocol for subprotocol in headers.get("sec-websocket-protocol", "").split(",") if subprotocol],
}

the connection is successful.

Configuration

  • Python version: 3.11.4
  • httpx-ws version: 0.4.0
  • fastapi version: 0.98.0
  • strawberry-graphql version: 0.190.0

`ASGIWebSocketTransport` does not respect the response headers of the ASGI app during WebSocket handshake.

Describe the bug

When testing with ASGIWebSocketTransport, it does not respect the response headers returned by the ASGI app, which includes the subprotocol header accepted by the app.

To Reproduce

import asyncio

import httpx
from fastapi import FastAPI
from httpx_ws import WebSocketDisconnect, aconnect_ws
from httpx_ws.transport import ASGIWebSocketTransport
from starlette.websockets import WebSocket

app = FastAPI()


@app.websocket("/")
async def _(ws: WebSocket):
    await ws.accept(subprotocol="foo")  # accept `foo` subprotocol
    await asyncio.sleep(0.1)
    await ws.close(1000, "bar")


httpx_client = httpx.AsyncClient(transport=ASGIWebSocketTransport(app=app))

async with aconnect_ws(
    "ws://www.example.com/", subprotocols=["foo", "bar"], client=httpx_client
) as ws:
    print(ws.subprotocol)  # expect `foo` but get `None`
    await asyncio.sleep(0.2)
    await ws.close(1000, "baz")

assert ws.subprotocol == "foo"  # AssertionError

Expected behavior

return Response(101, extensions={"network_stream": stream})

This code should not be hard-coded to generate a fixed response; rather, it should return the response generated by the app.

Configuration

  • Python version: py3.10
  • httpx-ws version: 0.4.2

Additional context

None.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.