robertmrk / aiocometd Goto Github PK
View Code? Open in Web Editor NEWCometD client for asyncio
License: MIT License
CometD client for asyncio
License: MIT License
Hi there, We're using this to connect to salesforce. One thing we've observed is that there is no exception when salesforce runs out of platformevents for the day. We feel it would make sense for some kind of exception to be thrown when this happens. (But we're certainly open to other ways to detect and manage any error codes returned from the server). Does it make sense to check the response.status in transports/long_polling.py and throw an exception if it is not a 200 after the session.post which collects the payload from salesforce? The code we are thinking of looks like:
if response.status != 200:
raise TransportError(f"Session post for payload failed with: {response.status}: {response.reason}")
I can't find any other discussion forum for this, but I am looking for an example of how to authenticate, because I can't seem to figure it out. I need to include an Authorization
header with a "Bearer tokenvalue" and I am stumped. If anyone has an example, I would appreciate it.
We encountered a bug today that I would have expected mypy to catch. Our code looks something like this:
from aiocometd.typing import JsonObject
def some_func(response_message: JsonObject) -> None:
reveal_type(response_message)
Mypy output:
file.py: note: In member "some_func":
file.py:203:25: note: Revealed type is 'Any'
I can see that aiocometd defines JsonObject
to be a Dict[str, Any]
, so why does mypy think the response_message
is of type Any
?
This might be a mypy bug, but I thought I'd ask here first.
Usage of the "loop" variable within asyncio was deprecated in 3.8. Now in 3.10 it appears the use of the variable now errors.
Good news seems to be simply removing the use of loop allows the code to work "as is". I'm still trying to wrap my brain around the library, however at least wanted to drop a message that support for 3.10+ may be easy to implement.
Data sub is dropping all subs on reconnect and I'd like a way to call the resub function
Hi @robertmrk
I'm trying using your library to receive notifications from a platform that only supports long-polling. I am rather new to Cometd but I did testing using the platform's example through Postman. It seems like the order of messages to set up that they are suggesting is /meta/handshake, /meta/subscribe, then /meta/connect, where the actual long-polling is held at the connect message until a notification is received.
However as I adapted your example to receive notifications from the platform, I can see the outgoing handshake message, followed by the incoming handshake message and finally the outgoing connect message where the long-polling is held. As the subscribe method is not
called until client is set up, I'm never able to receive any notification from the channel I'm interested in.
I am not sure if the order was defined in the protocol spec or whether there's something I've missed, but this is what I found in Salesforce's documentation (not the platform I'm referring to), where 'subscribe' happens before 'connect'.
https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/using_streaming_api_client_connection.htm
Code:
async def chat():
# connect to the server
async with Client(f'https://platform.com/notifications', extensions=[MyExtension]) as client:
await client.subscribe("/DecodedEventListener/trigger")
# listen for incoming messages
async for message in client:
print(message)
Output:
Outgoing messages: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0'}]
response message: {'id': '0', 'minimumVersion': '1.0', 'supportedConnectionTypes': ['long-polling', 'smartrest-long-polling'], 'successful': True, 'channel': '/meta/handshake', 'ext': {'ack': True}, 'clientId': '2918jmzu83wu9gwi7av3thi1h5s', 'version': '1.0'}
Outgoing messages: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '2918jmzu83wu9gwi7av3thi1h5s', 'connectionType': 'long-polling', 'id': '1'}]
Hi,
I'm not too much familiar with Python or cometD but I was trying my way out and when trying to implement the AuthExtension, for the outgoing function I'm getting this type error
TypeError: outgoing() takes 2 positional argument but 3 were given
Here is the code for that class
`class MyAuthExtension(AuthExtension):
async def incoming(payload, headers=None):
pass
async def outgoing(payload, headers):
pass
async def authenticate():
return <SOME_VALUE>`
Hi,
i'm using your cometd implementation with your aiofstream to connect to a salesforce streaming api. It seems that the reconnect advice "handshake" is broken.
The following happens:
I'm not sure if it is a salesforce bug not sending a new advice, or there is some issue with your client, not reconnecting after a handshake.
Here are some log entries:
2019-02-11 13:06:52,761 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '36', 'successful': True}
2019-02-11 13:08:43,359 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '37', 'successful': True}
2019-02-11 13:10:33,381 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '38', 'successful': True}
2019-02-11 13:12:23,401 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '39', 'successful': True}
2019-02-11 13:14:13,434 - DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'xxxx0', 'channel': '/meta/connect', 'id': '40', 'successful': True}
2019-02-11 13:15:20,519 - WARNING:aiocometd.transports.long_polling:Failed to send payload, None
2019-02-11 13:15:20,520 - DEBUG:aiocometd.transports.base:Connect task finished with: TransportError('None')
2019-02-11 13:15:24,738 - DEBUG:aiocometd.transports.base:Connect task finished with: {'advice': {'interval': 0, 'reconnect': 'handshake'}, 'channel': '/meta/connect', 'id': '43', 'error': '403::Unknown client', 'successful': False}
2019-02-11 13:15:24,770 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx1', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}
2019-02-11 13:15:24,791 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx2', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}
.
. 2000 requests later
.
2019-02-11 13:16:03,756 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx1999', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}
2019-02-11 13:16:03,777 - DEBUG:aiocometd.transports.base:Connect task finished with: {'ext': {'replay': True, 'payload.format': True}, 'minimumVersion': '1.0', 'clientId': 'xxxx2000', 'supportedConnectionTypes': ['long-polling'], 'channel': '/meta/handshake', 'id': '0', 'version': '1.0', 'successful': True}
2019-02-11 13:16:04,018 - DEBUG:aiocometd.transports.base:Connect task finished with: KeyError('interval')
2019-02-11 13:16:04,018 - WARNING:aiocometd.transports.base:No reconnect advice provided, no more operations will be scheduled.
2019-02-11 13:16:04,019 - INFO:aiocometd.client:Closing client...
2019-02-11 13:16:04,276 - INFO:aiocometd.client:Client closed.
aiosfstream.exceptions.ServerError: ('Connection closed by the server', {'ext': {'sfdc': {'failureReason': '403::Organization concurrent user limit exceeded'}, 'replay': True, 'payload.format': True}, 'advice': {'reconnect': 'none'}, 'channel': '/meta/handshake', 'id': '1', 'error': '403::Handshake denied', 'successful': False})
Regards Christoph
My code:
async def genesys():
# connect to the server
async with Client("https://myserver") as client:
# subscribe to channels to receive chat messages and
# notifications about new members
await client.subscribe("/api/v2/interactions")
# send initial message
await client.publish("/api/v2/interactions", {
"data": ref,
})
# listen for incoming messages
async for message in client:
#if message["channel"] == "/api/v2/interactions":
data = message["data"]
print(f"{data}: {data}")
loop = asyncio.get_event_loop()
loop.run_until_complete(genesys())
Getting error:
>>> CometD Response...
Traceback (most recent call last):
File "test3.py", line 253, in <module>
get_me()
File "test3.py", line 247, in get_me
get_contact_history(id)
File "test3.py", line 233, in get_contact_history
get_cometd(ref)
File "test3.py", line 196, in get_cometd
loop.run_until_complete(genesys())
File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete
return future.result()
File "test3.py", line 178, in genesys
async with Client("https://myserver") as client:
File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 432, in __aenter__
await self.open()
File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 273, in open
self._transport = await self._negotiate_transport()
File "C:\Program Files\Python37\lib\site-packages\aiocometd\client.py", line 209, in _negotiate_transport
response = await transport.handshake(self._connection_types)
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 252, in handshake
supportedConnectionTypes=connection_type_values
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 303, in _send_message
return await self._send_payload_with_auth([message])
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 318, in _send_payload_with_auth
response = await self._send_payload(payload)
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 345, in _send_payload
return await self._send_final_payload(payload, headers=headers)
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\long_polling.py", line 43, in _send_final_payload
find_response_for=payload[0]
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 451, in _consume_payload
self._update_subscriptions(message)
File "C:\Program Files\Python37\lib\site-packages\aiocometd\transports\base.py", line 394, in _update_subscriptions
if response_message["channel"] == MetaChannel.SUBSCRIBE:
TypeError: string indices must be integers
Any idea?
When using aiocometd to connect the CometD cluster of nginx reverse proxy, the server returns 402,When using aiocometd to connect CometD single server of nginx reverse proxy, the server returns normal data。
/root/PycharmProjects/tomorrow/venv/bin/python /root/PycharmProjects/client/tt.py
Traceback (most recent call last):
File "/root/PycharmProjects/client/tt.py", line 58, in
loop.run_until_complete(chat())
File "/usr/local/python3.6/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
return future.result()
File "/root/PycharmProjects/client/tt.py", line 32, in chat
await client.open()
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 264, in open
self._verify_response(response)
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 357, in _verify_response
self._raise_server_error(response)
File "/root/PycharmProjects/tomorrow/venv/lib/python3.6/site-packages/aiocometd/client.py", line 373, in _raise_server_error
raise ServerError(message, response)
aiocometd.exceptions.ServerError: ('Connect request failed.', {'id': '0', 'error': '402::Unknown client', 'successful': False, 'advice': {'interval': 0, 'reconnect': 'handshake'}, 'channel': '/meta/connect'})
I am getting a type error when I run a python script. It gives me a type error with loops in semaphore, which was deprecated in the latest python releases.
Is there any alternatives for this issue?
Hi @robertmrk ,
I'm the author and maintainer of another repo where I've had to hardcode my own Bayeux implementation. I'd really like to switch to using your aiocometd as it looks fantastic but after days of toiling I've discovered 2 things:
I'd like to help implement these if possible; point 1
is relatively easy for me as I've been digging around and modifying the code but point 2
has me stumped and I keep getting 402 :: Unknown client
from the remote server when I try to 'fake' the service channel requests through publish()
.
Example below of what my traffic looks like:
2019-02-20 23:39:03,391 Connecting to url: wss://tasty.dxfeed.com/live/cometd
2019-02-20 23:39:03,393 Opening client with connection types ['websocket', 'long-polling'] ...
2019-02-20 23:39:04,325 [bob] sending: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0', 'ext': {'com.devexperts.auth.AuthToken': '==REDACTED=='}}]
2019-02-20 23:39:04,557 [bob] received: [{"minimumVersion":"1.0","clientId":"==REDACTED==","supportedConnectionTypes":["websocket"],"advice":{"interval":0,"timeout":30000,"reconnect":"retry"},"channel":"/meta/handshake","id":"0","version":"1.0","successful":true}]
2019-02-20 23:39:04,557 Connection types supported by the server: ['websocket']
2019-02-20 23:39:05,982 [bob] sending: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '==REDACTED==', 'connectionType': 'websocket', 'id': '0'}]
2019-02-20 23:39:06,213 [bob] received: [{"channel":"/meta/connect","id":"0","successful":true}]
2019-02-20 23:39:06,213 Connect task finished with: {'channel': '/meta/connect', 'id': '0', 'successful': True}
2019-02-20 23:39:06,214 Client opened with connection_type 'websocket'
2019-02-20 23:39:06,214 Connection setup completed!
2019-02-20 23:39:06,214 [bob] sending: [{'channel': <MetaChannel.CONNECT: '/meta/connect'>, 'clientId': '==REDACTED==', 'connectionType': 'websocket', 'id': '1'}]
2019-02-20 23:39:08,525 Adding subscription: {'Quote': ['/ES']}
2019-02-20 23:39:08,525 [dxFeed] sending: {'reset': True, 'add': {'Quote': ['/ES']}}
2019-02-20 23:39:09,459 [bob] sending: [{'channel': '/service/sub', 'clientId': '==REDACTED==', 'data': {'reset': True, 'add': {'Quote': ['/ES']}}, 'id': '2'}]
2019-02-20 23:39:09,690 [bob] received: [{"channel":"/service/sub","id":"2","error":"402::Unknown client","successful":false}]
Any help and advice is greatly appreciated 😄
Hi, I'm trying to setup a connection to the cometd interface of a Logitech Media Server.
I don't get passed the handshake with an Exception:
No response message received for the first message in the payload
I used the Extension to intercept the messages and this is the response:
outgoing payload: [{'channel': <MetaChannel.HANDSHAKE: '/meta/handshake'>, 'version': '1.0', 'supportedConnectionTypes': ['websocket', 'long-polling'], 'minimumVersion': '1.0', 'id': '0'}]
outgoing headers: {}
incoming payload: [{'clientId': 'effe08c7', 'version': '1.0', 'supportedConnectionTypes': ['long-polling', 'streaming'], 'channel': '/meta/handshake', 'advice': {'timeout': 60000, 'reconnect': 'retry', 'interval': 0}, 'successful': True}]
incoming headers: <CIMultiDictProxy('Server': 'Logitech Media Server (7.9.2 - 1554701435)', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', 'Vary': 'Accept-Encoding', 'Content-Encoding': 'gzip', 'Content-Length': '174', 'Content-Type': 'application/json', 'Expires': '-1', 'X-Time-To-Serve': '0.00304794311523438')>
Traceback (most recent call last):
File "main.py", line 61, in lms_events
async with Client("http://192.168.1.1:9006/cometd", extensions=[MyExtension()]) as client:
File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 432, in __aenter__
await self.open()
File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 273, in open
self._transport = await self._negotiate_transport()
File "/usr/local/lib/python3.7/site-packages/aiocometd/client.py", line 209, in _negotiate_transport
response = await transport.handshake(self._connection_types)
File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 252, in handshake
supportedConnectionTypes=connection_type_values
File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 303, in _send_message
return await self._send_payload_with_auth([message])
File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 318, in _send_payload_with_auth
response = await self._send_payload(payload)
File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/base.py", line 345, in _send_payload
return await self._send_final_payload(payload, headers=headers)
File "/usr/local/lib/python3.7/site-packages/aiocometd/transports/long_polling.py", line 50, in _send_final_payload
raise TransportError(error_message)
aiocometd.exceptions.TransportError: No response message received for the first message in the payload
Any clue what is going on ?
great project, thank you
in v 0.4.5: BUG: does not handle own CancelledError on client.close():
2020-01-19 08:54:02 INFO aiocometd.client:287 close Closing client...
2020-01-19 08:54:02 ERRO asyncio:199 exception_reactor Exception in callback <bound method TransportBase._connect_done of <aiocometd.transports.websocket.WebSocketTransport object at 0x7f9e62e90fa0>>
handle: <Handle TransportBase._connect_done>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
File "/usr/lib/python3.8/site-packages/aiocometd/transports/base.py", line 541, in _connect_done
result: Union[JsonObject, Exception] = future.result()
asyncio.exceptions.CancelledError
2020-01-19 08:54:02 ERRO asyncio:199 exception_reactor Exception in callback <bound method WebSocketTransport._receive_done of <aiocometd.transports.websocket.WebSocketTransport object at 0x7f9e62e90fa0>>
handle: <Handle WebSocketTransport._receive_done>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 70, in uvloop.loop.Handle._run
File "/usr/lib/python3.8/site-packages/aiocometd/transports/websocket.py", line 261, in _receive_done
result = future.result()
asyncio.exceptions.CancelledError
2020-01-19 08:54:03 INFO aiocometd.client:298 close Client closed.
in aiocometd/transports/base.py
:
https://github.com/robertmrk/aiocometd/blob/0.4.5/aiocometd/transports/base.py#L541
in aiocometd/transports/websocket.py
:
https://github.com/robertmrk/aiocometd/blob/0.4.5/aiocometd/transports/websocket.py#L261
in both cases these are internal tasks, which are both created and canceled by aiocometd
,
and they both propagate own aiocometd
initiated CancelledError
to the event loop
exception handler
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.