Comments (18)
It appears you have enabled synchronous behavior on gevent transport, and you have a pending read. If you're using the transport in synchronous mode, you need to not have any separate greenlets performing the read_frames
loop.
from haigha.
Many thanks for your quick reponse!
Yes, I'm using the synchronous mode of gevent_transport, and this is code:
rmq.py
import logging
import datetime
import gevent
from gevent import socket
from haigha.connection import Connection
from haigha.message import Message
from trace_error import full_stack
sock_opts = {
(socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
rmq_con = Connection(host="localhost", debug=2, transport='gevent', sock_opts=sock_opts, synchronous=True, password='passwd')
rmq_ch = rmq_con.channel()
def message_pump(conn):
while conn:
try:
conn.read_frames()
except Exception, e:
trace = full_stack()
print 'message_pump error ', datetime.datetime.now(), '\n', trace
#logger.error(trace)
gevent.sleep()
gevent.spawn(message_pump, rmq_con)
and my server.py
:
from gevent import monkey; monkey.patch_all();
import gevent
from gevent.server import StreamServer
from rmq import rmq_con
def _handle(socket, address):
rmq_ch = rmq_con.channel() # <---- This line will cause the blocking
# declare some new queues and exchanges
....
# adding consumers to these queues
# read data from socket, and publish( channel.basic.publish ) the data to queues
if __name__ == '__main__':
server = StreamServer(('0.0.0.0', 10545), _handle)
server.serve_forever()
It seems that I don't have any other connection.read_frames()
in my codes, is there any chance that I called some haigha apis and these apis called the read_frames
functions which caused this issue?
from haigha.
Yup, kill off that message pump and I think you'll be good. It's only necessary if you want to operate completely asynchronous with callbacks.
from haigha.
It seems that I misunderstood something...
If I delete the codes about message pump, the consumers still will be called once new messages are coming, am I right?
from haigha.
Ah, right, you need it for consumers. In that case, leave it in, remove it from the connection constructor, and then if you need/want channels to operate in a synchronous manner you can create them individually.
It's possible that you're encountering an edge case in how this can be used that needs to be fixed. The goal was to allow for using gevent in either synchronous or asynchronous mode, and separately to make channels synchronous on top of an asynchronous transport.
from haigha.
Sorry, I didn't get your point.
What's the meaning of leave it in,remove it from the connection constructor, and then if you need/want channels to operate in a synchronous manner you can create them individually.
?
Did you mean I need to removed the message_pump
function from the rmq.py
file, and create this message pump in each greenlet?
from haigha.
Leave the message pump in place, remove synchronous=True
from the connection constructor, and then if you need specific channels to operate synchronously try doing that when calling connection.channel
.
from haigha.
OK, I've removed it~
But I didn't understand what the 'channels operate synchronously ' is , did you means some APIs only can be called when the channels are in synchronous
mode?
from haigha.
You can create channels that act synchronously even though the rest of the connection is operating asynchronously. It's up to you, and useful for some kinds of business logic such as a chain of operations that set up your bindings.
from haigha.
I use these codes to define queues and bindings, you mean if the rmq_ch
is working on aysnc mode, these code will fail?
fanout_exchange_name = 'fanout-' + exchange_name
rmq_ch.queue.declare(queue_name, auto_delete=False)
logger.debug('declare queue_name over')
rmq_ch.exchange.declare(exchange_name, 'direct')
logger.debug('declare exchange over')
#rmq_ch.exchange.declare(fanout_exchange_name, 'fanout')
#logger.debug('declare fanout_exchange over')
rmq_ch.queue.bind(queue_name, exchange_name, routing_key_p2p)
logger.debug('binding p2p exchange over')
from haigha.
they won't fail, but you won't get any data returned for operations that return data, and any retry/fail logic is much more complicated if you use callbacks. The above is a good example of where using a synchronous channel is valuable.
from haigha.
Hmmm, I didn't use the rmq_ch.basic.get
to get data.
All I'm using are
rmq_ch.basic.ack
rmq_ch.basic.publish
rmq_ch.basic.cancel
rmq_ch.basic.consume
rmq_ch.close
(rmq_ch is in async mode)
I guess none of them will return data, and currently they all work good :)
from haigha.
In the example I commented on, queue.declare
can return data and exchange.declare
can throw an exception.
from haigha.
....
I just know queue.declare
can return data ....
Anyway, its returned data is useless for my codes~
And for exchange.declare
exception, currently I will just ignore it, it's kind of trade-off~~
I need to keep my channels under async
mode to fix the creating channel blocking issue
from haigha.
Hi,
I found that sometimes when I use
rmq_con = Connection(host="localhost", debug=2, transport='gevent', sock_opts=sock_opts, synchronous=False, password='')
to initialize a connection, rabbitmq would raise:
=ERROR REPORT==== 11-Oct-2014::20:54:34 ===
exception on TCP connection <0.19482.1189> from 127.0.0.1:34205
{handshake_timeout,frame_header}
=INFO REPORT==== 11-Oct-2014::20:54:34 ===
closing TCP connection <0.19482.1189> from 127.0.0.1:34205
Did I do something wrong when declaring the async
rabbitmq connection?
Thanks
from haigha.
Find the root cause: message_pump function is necessary when declaring a new connection~
from haigha.
You are correct. There's also a synchronous_connect option.
from haigha.
Really appreciate your help, this issue never occur again, :)
Many thanks!
from haigha.
Related Issues (20)
- SyntaxError on fresh install with python 7 HOT 5
- Delivery tags in publisher confirm mode are not handled properly
- Synchronous basic.get with consumer=None is broken HOT 1
- Connection blocked by one consumer HOT 2
- cannot publish message to default exchange using gevent transport HOT 1
- Status of Blocking Socket Transport in Haigha HOT 1
- basic.consume doesn't return consumer_tag, so how to get it? HOT 1
- Unroutable message hangs haigha in publisher confirm mode HOT 4
- Can't get close_info from synchronous connection
- TypeError: getsockaddrarg() takes exactly 2 arguments (4 given) HOT 3
- Connection.transport_closed should set self._closed
- BasicClass.get return value is inconsistent in synchronous mode HOT 1
- Message frames might be interleaved when sending from multiple greenlets HOT 8
- Failed to parse timestamp in frame reader HOT 2
- Connection heartbeat arg description doesn't match implementation HOT 13
- Python3.4 support?
- Socket fds are leaked upon connection failure HOT 1
- Is it possible to connect on a rabbitmq cluster? HOT 4
- Haigha write "s" for short int value in tables instead of "U/u"
- Haigha crash when trying to deal with a message with empty body
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from haigha.