Giter Site home page Giter Site logo

haigha's Introduction

Haigha - Synchronous and asynchronous AMQP client library

image

Version

0.9.0

Download

http://pypi.python.org/pypi/haigha

Source

https://github.com/agoragames/haigha

Keywords

python, amqp, rabbitmq, event, libevent, gevent

Overview

Haigha provides a simple to use client library for interacting with AMQP brokers. It currently supports the 0.9.1 protocol and is integration tested against the latest RabbitMQ 2.8.1 (see errata). Haigha is a descendant of py-amqplib and owes much to its developers.

The goals of haigha are performance, simplicity, and adherence to the form and function of the AMQP protocol. It adds a few useful features, such as the ChannelPool class and Channel.publish_synchronous, to ease use of powerful features in real-world applications.

By default, Haigha operates in a completely asynchronous mode, relying on callbacks to notify application code of responses from the broker. Where applicable, nowait defaults to True. The application code is welcome to call a series of methods, and Haigha will manage the stack and synchronous handshakes in the event loop.

Starting with the 0.5.0 series, haigha natively supports 3 transport types; libevent, gevent and standard sockets. The socket implementation defaults to synchronous mode and is useful for an interactive console or scripting, and the gevent transport is the preferred asynchronous backend though it can also be used synchronously as well.

Documentation

This file and the various files in the scripts directory serve as a simple introduction to haigha. For more complete documentation, see DOCUMENTATION.rst.

Example

See the scripts and examples directories for several examples, in particular the stress_test script which you can use to test the performance of haigha against your broker. Below is a simple example of a client that connects, processes one message and quits. :

from haigha.connection import Connection
from haigha.message import Message

connection = Connection( 
  user='guest', password='guest', 
  vhost='/', host='localhost', 
  heartbeat=None, debug=True)

ch = connection.channel()
ch.exchange.declare('test_exchange', 'direct')
ch.queue.declare('test_queue', auto_delete=True)
ch.queue.bind('test_queue', 'test_exchange', 'test_key')
ch.basic.publish( Message('body', application_headers={'hello':'world'}),
  'test_exchange', 'test_key' )
print ch.basic.get('test_queue')
connection.close()

To use protocol extensions for RabbitMQ, initialize the connection with the haigha.connections.rabbit_connection.RabbitConnection class.

Roadmap

  • Documentation (there's always more)
  • Improved error handling
  • Implementation of error codes in the spec
  • Testing and integration with brokers other than RabbitMQ
  • Identify and improve inefficient code
  • Edge cases in frame management
  • Improvements to usabililty
  • SSL
  • Allow nowait when asynchronous transport but Connection put into synchronous mode.

Haigha has been tested exclusively with Python 2.6 and 2.7, but we intend for it to work with the 3.x series as well. Please report any issues you may have.

Installation

Haigha is available on pypi and can be installed using pip :

pip install haigha

If installing from source:

  • with development requirements (e.g. testing frameworks) :

    pip install -r development.txt
  • without development requirements :

    pip install -r requirements.txt

Note that haigha does not install either gevent or libevent support automatically. For libevent, haigha has been tested and deployed with the event-agora==0.4.1 library.

Testing

Unit tests can be run with either the included script, or with nose :

./haigha$ scripts/test 
./haigha$ nosetests

There are two other testing scripts of note. rabbit_table_test is a simple integration test that confirms compliance with RabbitMQ errata. The stress_test script is a valuable tool that offers load-testing capability similar to Apache Bench or Siege. It is used both to confirm the robustness of haigha, as well as benchmark hardware or a broker configuration.

Bug tracker

If you have any suggestions, bug reports or annoyances please report them to our issue tracker at https://github.com/agoragames/haigha/issues

License

This software is licensed under the New BSD License. See the LICENSE.txt file in the top distribution directory for the full license text.

haigha's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

haigha's Issues

Channel.process_frames crashes when channel-close listener raises an exception

[This is related to https://github.com//issues/11 and https://github.com//issues/12]

  1. Create a channel-close listener function that raises Exception() and register it via Channel.add_close_listener(); the exception doesn't have to come explicitly from the channel-closer listener: if the listener calls a Gevent API that results in context switch and then another greenlet kills the greenlet that was executing the channel-close listener, then a GreenletExit (or another exception such as LinkedFailed, etc.) would be injected into that greenlet, causing the exception to be raised in channel-close listener function.
  2. Then do something that causes the broker to close your channel; this happened to me with RabbitMQ when I set qos as ch.basic.qos(prefetch_count=1), but forgot to pass no_ack=False to ch.basic.consume(), and then sent some messages to an exchange that was bound to the consumer's queue; when my code ACK'ed the incoming message, the broker no longer had a matching message in the queue (since no_ack was True by default) and closed the channel to signal an error condition.

If the channel-close listener callback is called from the scope of self.dispatch(), then by the time the exception that was raised in my channel-close listener is caught by process_frames(), both self._connection and self._logger of the Channel instance are None (cleared by Channel._closed_cb()). Since self.logger maps to self._connection.logger and self.close() maps to self.channel.close(), both of these calls would result in AttributeError exception. E.g.,


  File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/build/lib/haigha/connection.py", line 318, in read_frames
    self._transport.process_channels( p_channels )
  File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/build/lib/haigha/transports/transport.py", line 32, in process_channels
    channel.process_frames()
  File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/build/lib/haigha/channel.py", line 148, in process_frames
    self.logger.error(
  File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/build/lib/haigha/channel.py", line 60, in logger
    return self._connection.logger
AttributeError: 'NoneType' object has no attribute 'logger'

def process_frames(self):
  """
  Process the input buffer.
  """
  while len(self._frame_buffer):
    try:
      # It would make sense to call next_frame, but it's technically faster
      # to repeat the code here.
      frame = self._frame_buffer.popleft()
      self.dispatch( frame )
    except ProtocolClass.FrameUnderflow:
      return
    except:
      self.logger.error( 
        "Failed to dispatch %s", frame, exc_info=True )
      self.close( 500, "Failed to dispatch %s"%(str(frame)) )
      return

Connection blocked by one consumer

Hi,
Sorry for the interruption.
I found my server can't receive data/msg from rabbitmq server for several minutes and then it recovered itself.

When I look into the log, I found that one consumer were trying to send data to mobile app through gevent.socket.sendall function, this function blocked....
Several minutes later, this sendall function raise the exception

2014-12-01 22:53:30,350 - root.tcp - ERROR - consumer sendall ERROR! Info from full_stack:

Traceback (most recent call last):

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run

    result = self._run(*self.args, **self.kwargs)

  File "/root/python-workspace/p2g_staging/mq-stage2/rmq.py", line 27, in message_pump

    conn.read_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 432, in read_frames

    self._transport.process_channels(p_channels)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/transports/transport.py", line 40, in process_channels

    channel.process_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 234, in process_frames

    self.dispatch(frame)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 212, in dispatch

    klass.dispatch(method_frame) 

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/protocol_class.py", line 80, in dispatch

    callback(method_frame)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/basic_class.py", line 203, in _recv_deliver

    func(msg) 

  File "tcp_server.py", line 79, in rmq_msg_handler

    sock.sendall(raw_msg)  ##########  <---- This line is my code :P

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/socket.py", line 458, in sendall

    data_sent += self.send(_get_memory(data, data_sent), flags)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/socket.py", line 443, in send

    return sock.send(data, flags)

error: [Errno 32] Broken pipe

and the same time , this socket's recv function raised the Connection timed out error

2014-12-01 22:53:30,349 - root.tcp - DEBUG - [5] recv error: _role_ None - None : Error: [Errno 110] Connection timed out

So I guess the tcp connection was closed before calling the sendall function, and sendall function was blocked until time out error. When it was blocked, all the other consumers couldn't receive the data from Rabbitmq server.
To fix this bug, I'm planning to add the SO_SNDTIMEO option for the sendall function.

_But on haigha's side, could I do something to avoid one blocked consumer blocking the entire Rabbitmq connection? How about running every consumer in a new greenlet or running all consumers on a greenlets pool?_

Any comments would be helpful, many thanks!

Getting publisher confirms

I try to use the publisher confirms extension, using the default socket transport.

If my understanding is correct, callback function's for publisher confirms do not run before closing the channel.

I tried to explicitly read the frames, by calling connection.read_frames() method. While this worked for getting the confirms, this function seems to block when there are no frames to read.
Shouldn't this function have a timeout argument ?

What is the appropriate way of processing publisher confirms, without waiting for the channel to be closed ?

how to implement a heartbeat?

I'd like to publish heartbeat messages on a set interval (like every 3 seconds), but at least using Greenlet's doesn't seem to work (nor gevent.core.timer). Looks like conn.read_frames() on connection blocks any other greenlet running. Any advice on getting something to run on a timeout?

Exception handling too broad, interferes with GeventExit and SystemExit

In a couple of places, haigha's exception handling is too broad, resulting in undesirable behavior for system-level exceptions such as SystemExit and GeventExit. Think of GeventExit and SystemExit like "kill -9".

Instead of "except:", these should be handling specific error classes that inherit from Exception. Ideally, these should be "except specific-expected-error-classes:", and at the minimum they should be "except Exception:", but never just "except:".

in haigha/channel.py, see the "except:" block below:

  def process_frames(self):
    '''
    Process the input buffer.
    '''
    while len(self._frame_buffer):
      try:
        # It would make sense to call next_frame, but it's technically faster
        # to repeat the code here.
        frame = self._frame_buffer.popleft()
        self.dispatch( frame )
      except ProtocolClass.FrameUnderflow:
        return
      except:
        # Spec says that channel should be closed if there's a framing error.
        # Unsure if we can send close if the current exception is transport
        # level (e.g. gevent.GreenletExit)
        self.close( 500, "Failed to dispatch %s"%(str(frame)) )
        raise

And in haigha/connection.py:

  def disconnect(self):
    '''
    Disconnect from the current host, but do not update the closed state. After
    the transport is disconnected, the closed state will be True if this is 
    called after a protocol shutdown, or False if the disconnect was in error.

    TODO: do we really need closed vs. connected states? this only adds 
    complication and the whole reconnect process has been scrapped anyway.

    '''
    self._connected = False
    if self._transport!=None:
      try:
        self._transport.disconnect()
      except: 
        self.logger.error("Failed to disconnect from %s", self._host, exc_info=True)
        raise
      finally:
        self._transport = None

When publishing a Message created with None body (default) publish dies

This is because len tries to len(self._body)

File "/Library/Python/2.7/site-packages/haigha/classes/basic_class.py", line 161, in publish
self.send_frame( HeaderFrame(self.channel_id, 60, 0, len(msg), msg.properties) )
File "/Library/Python/2.7/site-packages/haigha/message.py", line 27, in len
return len( self._body )

AttributeError: 'Channel' object has no attribute 'channel' when closing channel with gevent transport

This happens every time with haigha 0.5.3 with gevent transport when running the following app. However, the app works fine with haigha 0.4.1. I am using gevent 1.0b2, but am pretty sure this failure is not related to gevent.

The traceback looks like this:

$ python haigha_channel_test.py 
WARNING: DEPLOYING GEVENT-FRIENDLINESS BUG WORK-AROUND FOR HAIGHA v0.5.3
Connecting...
/Users/vkrug/Packages/haigha/haigha-0.5.3/build/lib/haigha/transports/gevent_transport.py:12: DeprecationWarning: gevent.coros has been renamed to gevent.lock
  from gevent.coros import Semaphore
Starting message pump greenlet...
Closing channel1...
Traceback (most recent call last):
  File "/Users/vkrug/nta/current/lib/python2.6/site-packages/gevent/greenlet.py", line 328, in run
    result = self._run(*self.args, **self.kwargs)
  File "haigha_channel_test.py", line 57, in readFrames
    conn.read_frames()
  File "/Users/vkrug/Packages/haigha/haigha-0.5.3/build/lib/haigha/connection.py", line 354, in read_frames
    self._transport.process_channels( p_channels )
  File "/Users/vkrug/Packages/haigha/haigha-0.5.3/build/lib/haigha/transports/transport.py", line 38, in process_channels
    channel.process_frames()
  File "/Users/vkrug/Packages/haigha/haigha-0.5.3/build/lib/haigha/channel.py", line 208, in process_frames
    self.close( 500, "Failed to dispatch %s"%(str(frame)) )
  File "/Users/vkrug/Packages/haigha/haigha-0.5.3/build/lib/haigha/channel.py", line 156, in close
    self.channel.close(reply_code, reply_text, class_id, method_id)
AttributeError: 'Channel' object has no attribute 'channel'
<Greenlet at 0x1009d42f8: readFrames(conn=<haigha.connection.Connection object at 0x1009de45)> failed with AttributeError

This is the app that reproduces the failure with gevent 0.5.3

import sys

import gevent
from gevent.event import AsyncResult
from haigha.connection import Connection
from haigha.message import Message

# NOTE: Work around a bug in Haigha 0.5.1-0.5.3 that breaks gevent
#  compatibility
import haigha
try:
  haigha_version = haigha.__version__
except AttributeError:
  pass
else:
  from distutils import version
  if (version.StrictVersion(haigha_version) >=
      version.StrictVersion("0.5.1")
      and
      version.StrictVersion(haigha_version) <=
      version.StrictVersion("0.5.3")):
    print >>sys.stderr, \
    "WARNING: DEPLOYING GEVENT-FRIENDLINESS BUG WORK-AROUND FOR HAIGHA v%s" % (
      haigha_version)
    from haigha.transports import socket_transport
    import gevent.socket
  socket_transport.socket = gevent.socket


def test_haigha():
  """
  A simple test to check Haigha's connection/channel opening and closing.

  Note that Rabbit MQ must be running
  """

  channel1CloseWaiter = AsyncResult()
  connectionCloseWaiter = AsyncResult()

  def handleChannel1Closed(ch, channelImpl):
    print "CHANNEL1 CLOSED: %r" % (channelImpl.close_info,)
    channel1CloseWaiter.set()

  def handleConnectionClosed():
    print "CONNECTION CLOSED!"
    connectionCloseWaiter.set()

  print "Connecting..."
  connection = Connection(
    user='guest', password='guest',
    vhost='/', host='localhost',
   heartbeat=None, debug=True, transport="gevent",
   close_cb=handleConnectionClosed)

  def readFrames(conn):
    while True:
      conn.read_frames()
      if connectionCloseWaiter.ready():
        break

  # Start Haigha message pump
  print "Starting message pump greenlet..."
  g = gevent.spawn(readFrames, conn=connection)

  ch1 = connection.channel()
  channel1Impl = ch1.channel
  ch1.add_close_listener(lambda ch: handleChannel1Closed(ch, channel1Impl))

  # Close the channels and wait for close-done
  print "Closing channel1..."
  ch1.close()
  with gevent.Timeout(seconds=10) as myTimeout:
    channel1CloseWaiter.wait()

  # Close the connection and wait for close-done
  print "Closing connection..."
  connection.close()
  with gevent.Timeout(seconds=10) as myTimeout:
    connectionCloseWaiter.wait()

  print "Killing message pump..."
  sys.stdout.flush()

  g.kill()



if __name__ == '__main__':
  test_haigha()

Callback hooks for established connection, channel, etc.

Following best practices, I want my daemon to indicate that it's ready for work only after all of its communication channels are ready. It seems that haigha schedules connections (and possibly channel setup?) to happen asynchronously, which is great, but it makes it very difficult to know when connections, channels, and queues are fully established. To that end, how about some more callback hooks?

EDIT: This would also be useful for programs that start up with data to publish to a queue, but have to wait for the message broker (which might be started later) to become available.

'event' extension fails to build (OSX and Linux)

When trying to install haigha 0.4.1 on OSX or Arch Linux, python 2.7 the event extension fails to build with the following output

Arch Linux

python --version
Python 2.7.2
found system libevent for linux2

running install

running build

running build_ext

building 'event' extension

creating build

creating build/temp.linux-i686-2.7

gcc -pthread -fno-strict-aliasing -march=i686 -mtune=generic -O2 -pipe -fstack-protector --param=ssp-buffer-size=4 -D_FORTIFY_SOURCE=2 -DNDEBUG -march=i686 -mtune=generic -O2 -pipe -fstack-protector --param=ssp-buffer-size=4 -D_FORTIFY_SOURCE=2 -fPIC -I/usr/include -I/usr/include/python2.7 -c event.c -o build/temp.linux-i686-2.7/event.o

event.c: In function ‘__pyx_pf_5event_11bufferevent_read’:

event.c:4137:87: error: dereferencing pointer to incomplete type

event.c:4149:99: error: dereferencing pointer to incomplete type

event.c:4177:109: error: dereferencing pointer to incomplete type

event.c: In function ‘__pyx_pf_5event_11bufferevent_write’:

event.c:4247:3: warning: passing argument 2 of ‘PyObject_AsCharBuffer’ from incompatible pointer type [enabled by default]

/usr/include/python2.7/abstract.h:476:22: note: expected ‘const char **’ but argument is of type ‘char **’

event.c: In function ‘__pyx_pf_5event_dns_resolve_reverse’:

event.c:5307:3: warning: passing argument 1 of ‘evdns_resolve_reverse’ from incompatible pointer type [enabled by default]

/usr/include/event2/dns_compat.h:220:5: note: expected ‘const struct in_addr *’ but argument is of type ‘char *’

event.c: In function ‘__pyx_pf_5event_dns_resolve_reverse_ipv6’:

event.c:5463:3: warning: passing argument 1 of ‘evdns_resolve_reverse’ from incompatible pointer type [enabled by default]

/usr/include/event2/dns_compat.h:220:5: note: expected ‘const struct in_addr *’ but argument is of type ‘char *’

event.c: In function ‘__pyx_f_5event___path_handler’:

event.c:6607:24: warning: assignment discards ‘const’ qualifier from pointer target type [enabled by default]

event.c:6630:23: warning: assignment discards ‘const’ qualifier from pointer target type [enabled by default]

event.c:6653:16: warning: assignment discards ‘const’ qualifier from pointer target type [enabled by default]

error: command 'gcc' failed with exit status 1

OSX

python --version
Python 2.7.1
found installed libevent in /usr/local/lib

running install

running build

running build_ext

building 'event' extension

llvm-gcc-4.2 -fno-strict-aliasing -fno-common -dynamic -g -Os -pipe -fno-common -fno-strict-aliasing -fwrapv -mno-fused-madd -DENABLE_DTRACE -DMACOSX -DNDEBUG -Wall -Wstrict-prototypes -Wshorten-64-to-32 -DNDEBUG -g -fwrapv -Os -Wall -Wstrict-prototypes -DENABLE_DTRACE -arch i386 -arch x86_64 -pipe -I/usr/local/include -I/System/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c event.c -o build/temp.macosx-10.7-intel-2.7/event.o

event.c: In function '__pyx_pf_5event_5event_add':

event.c:2069: warning: implicit conversion shortens 64-bit value into a 32-bit value

event.c:2076: warning: implicit conversion shortens 64-bit value into a 32-bit value

event.c: In function '__pyx_pf_5event_11bufferevent_read':

event.c:4137: error: dereferencing pointer to incomplete type

event.c:4149: error: dereferencing pointer to incomplete type

event.c:4177: error: dereferencing pointer to incomplete type

event.c: In function '__pyx_pf_5event_11bufferevent_write':

event.c:4247: warning: passing argument 2 of 'PyObject_AsCharBuffer' from incompatible pointer type

event.c:4247: warning: passing argument 3 of 'PyObject_AsCharBuffer' from incompatible pointer type

event.c: In function '__pyx_pf_5event_dns_resolve_reverse':

event.c:5307: warning: passing argument 1 of 'evdns_resolve_reverse' from incompatible pointer type

event.c: In function '__pyx_pf_5event_dns_resolve_reverse_ipv6':

event.c:5463: warning: passing argument 1 of 'evdns_resolve_reverse' from incompatible pointer type

event.c: In function '__pyx_pf_5event_12__wsgi_input_read':

event.c:6320: warning: pointer targets in passing argument 1 of 'PyString_FromStringAndSize' differ in signedness

event.c: In function '__pyx_f_5event___path_handler':

event.c:6607: warning: assignment discards qualifiers from pointer target type

event.c:6630: warning: assignment discards qualifiers from pointer target type

event.c:6653: warning: assignment discards qualifiers from pointer target type

event.c: In function '__pyx_pf_5event_5event___simple_callback':

event.c:1689: warning: implicit conversion shortens 64-bit value into a 32-bit value

event.c: In function '__pyx_pf_5event_5event_add':

event.c:2069: warning: implicit conversion shortens 64-bit value into a 32-bit value

event.c:2076: warning: implicit conversion shortens 64-bit value into a 32-bit value

event.c: In function '__pyx_pf_5event_11bufferevent_read':

event.c:4137: error: dereferencing pointer to incomplete type

event.c:4149: error: dereferencing pointer to incomplete type

event.c:4177: error: dereferencing pointer to incomplete type

event.c: In function '__pyx_pf_5event_11bufferevent_write':

event.c:4247: warning: passing argument 2 of 'PyObject_AsCharBuffer' from incompatible pointer type

event.c:4247: warning: passing argument 3 of 'PyObject_AsCharBuffer' from incompatible pointer type

event.c: In function '__pyx_f_5event___evdns_callback':

event.c:4536: warning: cast from pointer to integer of different size

event.c: In function '__pyx_pf_5event_dns_resolve_ipv4':

event.c:4995: warning: cast to pointer from integer of different size

event.c: In function '__pyx_pf_5event_dns_resolve_ipv6':

event.c:5151: warning: cast to pointer from integer of different size

event.c: In function '__pyx_pf_5event_dns_resolve_reverse':

event.c:5307: warning: cast to pointer from integer of different size

event.c:5307: warning: passing argument 1 of 'evdns_resolve_reverse' from incompatible pointer type

event.c: In function '__pyx_pf_5event_dns_resolve_reverse_ipv6':

event.c:5463: warning: cast to pointer from integer of different size

event.c:5463: warning: passing argument 1 of 'evdns_resolve_reverse' from incompatible pointer type

event.c: In function '__pyx_pf_5event_12__wsgi_input_read':

event.c:6320: warning: pointer targets in passing argument 1 of 'PyString_FromStringAndSize' differ in signedness

event.c: In function '__pyx_f_5event___path_handler':

event.c:6607: warning: assignment discards qualifiers from pointer target type

event.c:6630: warning: assignment discards qualifiers from pointer target type

event.c:6653: warning: assignment discards qualifiers from pointer target type

lipo: can't open input file: /var/folders/qv/z4k1qngx15g9bytkl9wlbbvm0000gp/T//cc7gb8Jl.out (No such file or directory)

error: command 'llvm-gcc-4.2' failed with exit status 1

'auto_delete=True' caused error when declare a exchange

Hi,
The doc said

exchange.declare Accepts auto_delete and internal keyword arguments

But when I declare exchange like

rmq_ch.exchange.declare(exchange_name, 'direct', auto_delete=True)

it will raise an exception TypeError: declare() got an unexpected keyword argument 'auto_delete', I read the source code, found that the exchange's declare method didn't accept this keywords, so does the doc is wrong? Or I missed something?

Thanks!

v0.5.3: TypeError: exchange.declare() got an unexpected keyword argument 'auto_delete'

This used to work for me using Haigha 0.4.1, but using haigha 0.5.3, I get the following exception:

ch.exchange.declare('test_exchange', 'direct', auto_delete=True)
TypeError: declare() got an unexpected keyword argument 'auto_delete'

Code:
from haigha.connection import Connection

connection = Connection(
user='guest', password='guest',
vhost='/', host='localhost',
heartbeat=None, debug=True, transport="gevent")

ch = connection.channel()

ch.exchange.declare('test_exchange', 'direct', auto_delete=True)

event example

I'm evaluating rabbitmq and python clients. This library looks much more mature than pika, but I need a little help figuring out how to use it. I am interested in asynchronous operation because I need an event loop in which I can schedule various tasks, however I prefer event over gevent; we haven't been using gevent and it seems to add more complexity than I want at the moment. I guess my other option is to use the 'socket' transport and implement my own loop in which I call connection.read_frames().

Here's my attempt at a publisher:

#!/usr/bin/env python
import logging
import sys
import event
from haigha.connection import Connection
from haigha.message import Message

class Client(object):
    def __init__(self):
        print('instantiating Connection')
        self.connection = Connection(transport='event', port=5673, open_cb=self.on_connect)

    def on_connect(self):
        print('on_connect')
        self.connection.channel().add_open_listener(self.on_open)

    def on_open(self, channel):
        print('on_open: %r' % channel)
        self.channel = channel
        self.channel.queue.declare(queue='hello', cb=self.on_declare)

    def on_declare(self):
        print('on_declare')
        self.channel.basic.publish(Message('Hello World!'),
                              exchange='',
                              routing_key='hello',
                              cb=self.on_publish
                              )

    def on_publish(self):
        print('on_publish')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    c = Client()
    event.signal(2, sys.exit)
    event.dispatch()

Here's what I see when I run the code:

$ python send.py 
instantiating Connection
on_connect
on_open: <haigha.channel.Channel object at 0x7f45feab9dd0>
WARNING:root:error on connection to localhost:5673: error processing socket input buffer

Using the tracer supplied with the rabbit java client, I see the following:


1337545216819: <Tracer-0> ch#0 <- {#method<connection.start>(version-major=0, version-minor=9, server-properties={product=RabbitMQ, information=Licensed under the MPL.  See http://www.rabbitmq.com/, platform=Erlang/OTP, capabilities={exchange_exchange_bindings=true, consumer_cancel_notify=true, basic.nack=true, publisher_confirms=true}, copyright=Copyright (C) 2007-2012 VMware, Inc., version=2.8.2}, mechanisms=PLAIN AMQPLAIN, locales=en_US), null, ""}
1337545216819: <Tracer-0> ch#0 -> {#method<connection.start-ok>(client-properties={library_version=0.5.3, library=Haigha}, mechanism=AMQPLAIN, response=�LOGINS�guesPASSWORDS�guest, locale=en_US), null, ""}
1337545216821: <Tracer-0> ch#0 <- {#method<connection.tune>(channel-max=0, frame-max=131072, heartbeat=0), null, ""}
1337545216856: <Tracer-0> ch#0 -> {#method<connection.tune-ok>(channel-max=65535, frame-max=131072, heartbeat=0), null, ""}
1337545216856: <Tracer-0> ch#0 -> {#method<connection.open>(virtual-host=/, capabilities=, insist=true), null, ""}
1337545216896: <Tracer-0> ch#0 <- {#method<connection.open-ok>(known-hosts=), null, ""}
1337545216936: <Tracer-0> ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
1337545216976: <Tracer-0> ch#1 <- {#method<channel.open-ok>(channel-id=), null, ""}
1337545217016: <Tracer-0> ch#1 -> {#method<queue.declare>(ticket=0, queue=hello, passive=false, durable=false, exclusive=false, auto-delete=true, nowait=false, arguments={}), null, ""}
1337545217056: <Tracer-0> ch#1 <- {#method<queue.declare-ok>(queue=hello, message-count=0, consumer-count=1), null, ""}
1337545217099: <Tracer-0> ch#1 -> {#method<channel.close>(reply-code=500, reply-text=Failed to dispatch MethodFrame[channel: 1, class_id: 50, method_id: 11, args: \x00\x32\x00\x0b\x05\x68\x65\x6c\x6c\x6f\x00\x00\x00\x00\x00\x00\x00\x01], class-id=0, method-id=0), null, ""}
1337545217138: <Tracer-0> ch#1 <- {#method<channel.close-ok>(), null, ""}

How do I make it work?

easy_install/pip installation error.

Both on installation complain about a missing README.rst:

gmr-0x12:~ gmr$ sudo easy_install haigha
Searching for haigha
Reading http://pypi.python.org/simple/haigha/
Reading https://github.com/agoragames/haigha
Best match: haigha 0.2.1
Downloading http://pypi.python.org/packages/source/h/haigha/haigha-0.2.1.tar.gz#md5=033124197f3dd61d6b80a957c89b9913
Processing haigha-0.2.1.tar.gz
Running haigha-0.2.1/setup.py -q bdist_egg --dist-dir /tmp/easy_install-yPoY7S/haigha-0.2.1/egg-dist-tmp-CEre80
error: README.rst: No such file or directory

haigha/gevent example

I modified your "hello-world" example to work with gevent. In case you need a gevent example to include with the package, the following worked for me:


import gevent
from haigha.connection import Connection
from haigha.message import Message
 
connection = Connection(
  user='guest', password='guest',
  vhost='/', host='localhost',
 heartbeat=None, debug=True, transport="gevent")
 
def consumer(msg):
  print "CONSUMER: MESSAGE RECEIVED: ", msg 
  gevent.kill(g)
  #NOTE: haigha's hello-world example had connection.close()
  # here (instead of our geven.kill(g) statement).  The close()
  # resulted in deadlock when running with gevent as we're here.
  # Even Ctrl-c wouldn't budge this deadlock! 
 
ch = connection.channel()
 
ch.exchange.declare('test_exchange', 'direct', auto_delete=True)
 
ch.queue.declare('test_queue', auto_delete=True)
 
ch.queue.bind('test_queue', 'test_exchange', 'test_key')
 
ch.basic.consume('test_queue', consumer)
 
ch.basic.publish( Message('body', application_headers={'hello':'world'}),
   'test_exchange', 'test_key' )
 
 
def readFrames(conn):
  while True:
    conn.read_frames()
 
 
g = gevent.spawn(readFrames, conn=connection)
 
result = g.get()
print "readFrames result: ", repr(result)
 
connection.close()

It produced the following output:

CONSUMER: MESSAGE RECEIVED: Message[body: body, delivery_info: {'exchange': 'test_exchange', 'consumer_tag': 'channel-1-1', 'routing_key': 'test_key', 'redelivered': 0, 'delivery_tag': 1, 'channel': <haigha.channel.Channel object at 0x10dbaeb10>}, properties: {'application_headers': {'hello': 'world'}}]
readFrames result: GreenletExit()

Consumer bring high CPU usage when `close_cb` not specified in connection constructor

I use haigha.connections.rabbit_connection.RabbitConnection class to get connection instance. I have almost 100% CPU usage (with no messages in queues) when the following conditions are True:

  • there is no close_cb specified to connection
  • I try to self._channel.basic.consume(queue='bad_queue', consumer=self._consume) and bad_queue not in RabbitMQ in this moment

Channel protocol class properties on closed channels

When a closed channel is accessed, the protocol classes are no longer bound, e.g. channel.basic. The reason for this is that they're cleaned up so that circular references can be quickly removed rather than waiting for the garbage collector.

One of the problems however is that the user has to either check for the closed state, or handle an ambiguous AttributeError, before calling channel.basic, etc. A better approach would be to replace the bindings with properties that raise the ChannelClosed exception on access.

test_read_timestamp and test_write_timestamp failure on Mac OS

System/library information (TEST LOG is at the bottom):

MacOS: Darwin Kernel Version 11.3.0: Thu Jan 12 18:47:41 PST 2012; root:xnu-1699.24.23~1/RELEASE_X86_64 x86_64

Python 2.6.7

In [5]: haigha.VERSION
Out[5]: '0.4.1'

In [3]: event.version
Out[3]: '0.4.1'

In [11]: gevent.version
Out[11]: '0.13.6'

py_eventsocket-0.1.5

chai-0.1.21

In [9]: termcolor.VERSION
Out[9]: (1, 1, 0)

libevent-1.4.14b

TEST LOG:

$ scripts/test

....................................................................................................................................................................................................................................................................F..............F................................................F..................F

FAIL: test_field_timestamp (haigha.tests.unit.reader_test.ReaderTest)

Traceback (most recent call last):
File "/Users/vkruglikov/Packages/chai/chai-0.1.21/chai/chai.py", line 48, in wrapper
func(self, _args, *_kwargs)
File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/haigha/tests/unit/reader_test.py", line 304, in test_field_timestamp
assert_equals( d, b._field_timestamp() )
AssertionError: datetime.datetime(2011, 1, 17, 17, 36, 33) != datetime.datetime(2011, 1, 17, 14, 36, 33)

FAIL: test_read_timestamp (haigha.tests.unit.reader_test.ReaderTest)

Traceback (most recent call last):
File "/Users/vkruglikov/Packages/chai/chai-0.1.21/chai/chai.py", line 48, in wrapper
func(self, _args, *_kwargs)
File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/haigha/tests/unit/reader_test.py", line 184, in test_read_timestamp
assert_equals( d, b.read_timestamp() )
AssertionError: datetime.datetime(2011, 1, 17, 17, 36, 33) != datetime.datetime(2011, 1, 17, 14, 36, 33)

FAIL: test_field_timestamp (haigha.tests.unit.writer_test.WriterTest)

Traceback (most recent call last):
File "/Users/vkruglikov/Packages/chai/chai-0.1.21/chai/chai.py", line 48, in wrapper
func(self, _args, *_kwargs)
File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/haigha/tests/unit/writer_test.py", line 224, in test_field_timestamp
assert_equals( 'T\x00\x00\x00\x00\x4d\x34\xc4\x71', w._output_buffer )
AssertionError: 'T\x00\x00\x00\x00M4\xc4q' != bytearray(b'T\x00\x00\x00\x00M4\xee\xa1')

FAIL: test_write_timestamp (haigha.tests.unit.writer_test.WriterTest)

Traceback (most recent call last):
File "/Users/vkruglikov/Packages/chai/chai-0.1.21/chai/chai.py", line 48, in wrapper
func(self, _args, *_kwargs)
File "/Users/vkruglikov/Packages/haigha/agoragames-haigha-6540e82/haigha/tests/unit/writer_test.py", line 143, in test_write_timestamp
assert_equals( '\x00\x00\x00\x00\x4d\x34\xc4\x71', w._output_buffer )
AssertionError: '\x00\x00\x00\x00M4\xc4q' != bytearray(b'\x00\x00\x00\x00M4\xee\xa1')


Ran 344 tests in 0.156s

FAILED (failures=4)

Unable to use Haigha with python3

When trying to install Haigha from git:

[16:44:11] vvs@randir haigha % python3.2 setup.py clean                          
running clean
[16:44:40] vvs@randir haigha % python3.2 setup.py build         
running build
running build_py
[16:44:44] vvs@randir haigha % python3.2 setup.py install --user
running install
running bdist_egg
running egg_info
writing haigha.egg-info/PKG-INFO
writing top-level names to haigha.egg-info/top_level.txt
writing dependency_links to haigha.egg-info/dependency_links.txt
reading manifest file 'haigha.egg-info/SOURCES.txt'
reading manifest template 'MANIFEST.in'
warning: no files found matching '*' under directory 'scripts/'
warning: no files found matching '*.py' under directory 'haigha/'
no previously-included directories found matching '*.pyc'
writing manifest file 'haigha.egg-info/SOURCES.txt'
installing library code to build/bdist.macosx-10.8-x86_64/egg
running install_lib
running build_py
creating build/bdist.macosx-10.8-x86_64/egg
creating build/bdist.macosx-10.8-x86_64/egg/haigha
copying build/lib/haigha/__init__.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
copying build/lib/haigha/channel.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
copying build/lib/haigha/channel_pool.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
creating build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/__init__.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/basic_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/channel_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/exchange_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/protocol_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/queue_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/classes/transaction_class.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/classes
copying build/lib/haigha/connection.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
creating build/bdist.macosx-10.8-x86_64/egg/haigha/connections
copying build/lib/haigha/connections/__init__.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/connections
copying build/lib/haigha/connections/rabbit_connection.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/connections
copying build/lib/haigha/exceptions.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
creating build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/__init__.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/content_frame.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/frame.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/header_frame.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/heartbeat_frame.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/frames/method_frame.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/frames
copying build/lib/haigha/message.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
copying build/lib/haigha/reader.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
creating build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/transports/__init__.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/transports/event_transport.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/transports/gevent_transport.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/transports/socket_transport.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/transports/transport.py -> build/bdist.macosx-10.8-x86_64/egg/haigha/transports
copying build/lib/haigha/writer.py -> build/bdist.macosx-10.8-x86_64/egg/haigha
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/__init__.py to __init__.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/channel.py to channel.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/channel_pool.py to channel_pool.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/__init__.py to __init__.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/basic_class.py to basic_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/channel_class.py to channel_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/exchange_class.py to exchange_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/protocol_class.py to protocol_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/queue_class.py to queue_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/classes/transaction_class.py to transaction_class.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/connection.py to connection.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/connections/__init__.py to __init__.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/connections/rabbit_connection.py to rabbit_connection.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/exceptions.py to exceptions.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/__init__.py to __init__.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/content_frame.py to content_frame.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/frame.py to frame.cpython-32.pyc
  File "build/bdist.macosx-10.8-x86_64/egg/haigha/frames/frame.py", line 65
    raise Frame.FormatError, str(e), sys.exc_info()[-1]
                           ^
SyntaxError: invalid syntax

byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/header_frame.py to header_frame.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/heartbeat_frame.py to heartbeat_frame.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/frames/method_frame.py to method_frame.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/message.py to message.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/reader.py to reader.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/transports/__init__.py to __init__.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/transports/event_transport.py to event_transport.cpython-32.pyc
  File "build/bdist.macosx-10.8-x86_64/egg/haigha/transports/event_transport.py", line 13
    print 'Failed to load EventSocket and event modules'
                                                       ^
SyntaxError: invalid syntax

byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/transports/gevent_transport.py to gevent_transport.cpython-32.pyc
  File "build/bdist.macosx-10.8-x86_64/egg/haigha/transports/gevent_transport.py", line 21
    print 'Failed to load gevent modules'
                                        ^
SyntaxError: invalid syntax

byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/transports/socket_transport.py to socket_transport.cpython-32.pyc
  File "build/bdist.macosx-10.8-x86_64/egg/haigha/transports/socket_transport.py", line 25
    def connect(self, (host,port), klass=socket.socket):
                      ^
SyntaxError: invalid syntax

byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/transports/transport.py to transport.cpython-32.pyc
byte-compiling build/bdist.macosx-10.8-x86_64/egg/haigha/writer.py to writer.cpython-32.pyc
creating build/bdist.macosx-10.8-x86_64/egg/EGG-INFO
copying haigha.egg-info/PKG-INFO -> build/bdist.macosx-10.8-x86_64/egg/EGG-INFO
copying haigha.egg-info/SOURCES.txt -> build/bdist.macosx-10.8-x86_64/egg/EGG-INFO
copying haigha.egg-info/dependency_links.txt -> build/bdist.macosx-10.8-x86_64/egg/EGG-INFO
copying haigha.egg-info/top_level.txt -> build/bdist.macosx-10.8-x86_64/egg/EGG-INFO
zip_safe flag not set; analyzing archive contents...
creating 'dist/haigha-0.7.0-py3.2.egg' and adding 'build/bdist.macosx-10.8-x86_64/egg' to it
removing 'build/bdist.macosx-10.8-x86_64/egg' (and everything under it)
Processing haigha-0.7.0-py3.2.egg
Removing /Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg
Copying haigha-0.7.0-py3.2.egg to /Users/vvs/Library/Python/3.2/lib/python/site-packages
haigha 0.7.0 is already the active version in easy-install.pth

Installed /Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg
Processing dependencies for haigha==0.7.0
Finished processing dependencies for haigha==0.7.0

I've tried to invoke 2to3, installation succeeded but Connection constructor throwed an exception:

  File "/Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg/haigha/connection.py", line 98, in __init__
  File "/Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg/haigha/writer.py", line 186, in write_table
  File "/Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg/haigha/writer.py", line 195, in _write_item
  File "/Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg/haigha/writer.py", line 143, in write_shortstr
  File "/Users/vvs/Library/Python/3.2/lib/python/site-packages/haigha-0.7.0-py3.2.egg/haigha/writer.py", line 79, in write_octet
TypeError: an integer is required

Add test running via travisci

Seems kind of silly to me that there would be tests for the package but they don't run on CI. Does it just need rabbitmq to run tests? If so, I can work on it tomorrow.

Suppression of critical exceptions breaks gevent apps

I am using Haigha with Gevent. Not sure how to work around this cleanly/elegantly:

When a user-specified callback is executed, various functions in Haigha suppress most exceptions, which interferes with robust processing of exceptions in Gevent greenlets. For example, if a Gevent greenlet is executing this block of code:

def haigha_driver_greenthread(conn):
    while True:
        conn.read_frames()

def channel_closed_callback(ch):
   do_something_that_causes_an_exception_or_gevent_context_switch()

def message_consumer_callback(msg):
    do_something_else_that_causes_an_exception_or_gevent_context_switch()

ch = conn.channel()
ch.add_close_listener(channel_closed_callback)

ch.basic.consume(queue="my queue name",
                                consumer=message_consumer_callback

g = gevent.spawn(haigha_driver_greenthread, conn)

and some other code wishes to kill that greenlet: g.kill()

or if an exception occurs in one of the callbacks, then GreenletExit (result of g.kill()) or that other exception that occurred in the callback gets suppressed, which breaks the normal flow in the Gevent programming model: If GreenletExit (or LinkedFailed, etc.) is suppressed, then either a deadlock occurs (if g.kill was used without timeout) or the greenlet refuses to die against the wishes of the programmer. Other exceptions raised from the scope of the callbacks also get suppressed, leading to incorrect behavior of the greenlet: the failing greenlet becomes a "zombie" instead of dying and possibly notifying a "parent" greenlet via LinkedFailed exception.

Here are a couple of examples of functions that exhibit this problem:

channel.py: User callbacks are called from the scope of self.dispatch():


def process_frames(self):
  """
  Process the input buffer.
  """
  while len(self._frame_buffer):
    try:
      # It would make sense to call next_frame, but it's technically faster
      # to repeat the code here.
      frame = self._frame_buffer.popleft()
      self.dispatch( frame )
    except ProtocolClass.FrameUnderflow:
      return
    except:
      self.logger.error( 
        "Failed to dispatch %s", frame, exc_info=True )
      self.close( 500, "Failed to dispatch %s"%(str(frame)) )
      return

connection.py:


def _callback_close(self):
  """Callback to any close handler."""
  if self._close_cb:
    try: self._close_cb()
    except SystemExit: raise
    except: self.logger.error( 'error calling close callback' )

Here, too:
channel_class.py


  def close(self, reply_code=0, reply_text='', class_id=0, method_id=0):
    '''
    Close this channel.  Caller has the option of specifying the reason for
    closure and the class and method ids of the current frame in which an error
    occurred.  If in the event of an exception, the channel will be marked
    as immediately closed.  If channel is already closed, call is ignored.
    '''
    if self._closed: return

    self._close_info = {
      'reply_code'    : reply_code,
      'reply_text'    : reply_text,
      'class_id'      : class_id,
      'method_id'     : method_id
    }

    # exception likely due to race condition as connection is closing
    try:
      args = Writer()
      args.write_short( reply_code )
      args.write_shortstr( reply_text )
      args.write_short( class_id )
      args.write_short( method_id )
      self.send_frame( MethodFrame(self.channel_id, 20, 40, args) )
      
      self.channel.add_synchronous_cb( self._recv_close_ok )
    except:
      self.logger.error("Failed to close channel %d", 
        self.channel_id, exc_info=True)

basic.get is broken

I have a simple test program that is basically the same as the example on PyPi except that it uses basic.get and no_ack = False. Also the queues are durable=True and I have a test queue with a few messages in it. When I use basic.consume it all works fine and I can run it again and again. Change to basic.get and it craps out with the exact same messages (unacked in durable queue).

In classes/basic_class.py I changed this function

def _recv_get_ok(self, method_frame):
msg = self._read_gokmsg( method_frame )
cb = self._get_cb.pop()
if cb: cb( msg )

_read_gokmsg is the same as _read_msg except for this:

delivery_tag = method_frame.args.read_longlong()
redelivered = method_frame.args.read_bit()
exchange = method_frame.args.read_shortstr()
routing_key = method_frame.args.read_shortstr()
message_count = method_frame.args.read_long()

delivery_info = {
  'channel': self.channel,
  'delivery_tag': delivery_tag,
  'redelivered': redelivered,
  'exchange': exchange,
  'routing_key': routing_key,
  'message_count': message_count,
}

Basically I removed the consumer tag and added the message count. Now I can use basic.get just fine.

P.S. your code is an awful lot easier to dig into than kombu or amqplib

Logic error in SocketTransport.read()

In the code snippet below from SocketTransport.read(), there is an elif block "elif isinstance(e, socket.timeout):" inside the "except EnvironmentError as e:". It's not clear why that elif is inside "except EnvironmentError as e:" since socket.timeout has its own "except socket.timeout as e:" block:

  def read(self, timeout=None):
    '''
    Read from the transport. If timeout>0, will only block for `timeout` 
    seconds.
    '''
    if not hasattr(self,'_sock'):
      return None

    try:
      # Note that we ignore both None and 0, i.e. we either block with a
      # timeout or block completely and let gevent sort it out.
      if timeout:
        self._sock.settimeout( timeout )
      else:
        self._sock.settimeout( None )
      data = self._sock.recv( self._sock.getsockopt(socket.SOL_SOCKET,socket.SO_RCVBUF) )

      if len(data):
        if self.connection.debug > 1:
          self.connection.logger.debug( 'read %d bytes from %s'%(len(data), self._host) )
        if len(self._buffer):
          self._buffer.extend( data )
          data = self._buffer
          self._buffer = bytearray()
        return data

      # Note that no data means the socket is closed and we'll mark that
      # below

    except socket.timeout as e:
      # Note that this is implemented differently and though it would be
      # caught as an EnvironmentError, it has no errno. Not sure whose
      # fault that is.
      return None

    except EnvironmentError as e:
      # thrown if we have a timeout and no data
      if e.errno in (errno.EAGAIN,errno.EWOULDBLOCK):
        return None
      # gevent throws this too, and rather than handle separately just catch
      # that case here
      elif isinstance(e, socket.timeout):
        return None

gevent_transport is broken in v0.5.1 - v0.5.3

The code below worked great with Haigha 0.4.1. When I tried to upgrade to Haigha 0.5.3, I noticed that the test_haigha() function (in the code snippet below) never returns. I investigated GeventTransport and found that it now relies on SocketTransport to create the socket. Since SocketTransport creates a regular built-in socket.socket (not gevent.socket), GeventTransport is no longer gevent-compatible.

The gevent_transport unit test doesn't test for gevent hub compatibility. Something like starting a read from a Greenlet, gevent.sleep()'ing for a second, then gevent.kill()'ing that greenlet and, finally waiting for the greenlet to finish via greenlet.get() should do the trick. Also, could you please incorporate the code snippet below into a unit test so that we can test Haigha with real gevent concurrency? Thank you.

import gevent
from haigha.connection import Connection
from haigha.message import Message


def test_haigha():
  """
  A simple test to check Haigha's NuPIC installation
  Note that Rabbit MQ must be running

  Upon success this test should print out:

  CONSUMER: MESSAGE RECEIVED:  Message[body: body, delivery_info: {'exchange': 'test_exchange', 'consumer_tag': 'channel-1-1', 'routing_key': 'test_key', 'redelivered': 0, 'delivery_tag': 1, 'channel': <haigha.channel.Channel object at 0x10ac87b50>}, properties: {'application_headers': {'hello': 'world'}}]
  readFrames result:  GreenletExit()
  """
  connection = Connection(
    user='guest', password='guest',
    vhost='/', host='localhost',
   heartbeat=None, debug=True, transport="gevent")

  def consumer(msg):
    print "CONSUMER: MESSAGE RECEIVED: ", msg
    gevent.kill(g)
    #NOTE: haigha's hello-world example had connection.close()
    # here (instead of our geven.kill(g) statement).  The close()
    # resulted in deadlock when running with gevent as we're here.
    # Even Ctrl-c wouldn't budge this deadlock!

  ch = connection.channel()

  ch.exchange.declare('test_exchange', 'direct')

  ch.queue.declare('test_queue', auto_delete=True)

  ch.queue.bind('test_queue', 'test_exchange', 'test_key')

  ch.basic.consume('test_queue', consumer)

  ch.basic.publish( Message('body', application_headers={'hello':'world'}),
     'test_exchange', 'test_key' )


  def readFrames(conn):
    while True:
      conn.read_frames()


  g = gevent.spawn(readFrames, conn=connection)

  result = g.get()
  print "readFrames result: ", repr(result)

  connection.close()


if __name__ == '__main__':
  test_haigha()

channel.close() cause the channel_error,"expected 'channel.open'",'channel.close' error

Hi,
I met a closing connection issue, and after I look into that issue, I found the following records in rabbitmq's log:

=ERROR REPORT==== 10-Aug-2014::21:35:55 ===
connection <0.9891.128>, channel 2306 - error:
{amqp_error,channel_error,"expected 'channel.open'",'channel.close'}

=INFO REPORT==== 10-Aug-2014::21:35:56 ===
closing TCP connection <0.9891.128> from 127.0.0.1:56872

I googled about it, and found similar case in pika repo:
pika/pika#222
pika/pika#256

I know haigha and pika are different, but is there any chance haigha has this same bug?

Thanks!

connection.channel() blocked

Hi,
I'm using haigh with gevent to build a tcp based realtime chatroom, each socket handler (one greenlet) have their own channel, and all these channels are created from one connection ( singleton ).

My issue is that sometimes when my server are publishing and receiving message very quickly (not "very quickly" actually,it's less than 100 msgs per second) , the connection.channel() block. And once it block, it will never recovery, it seems that the whole connection is blocked, the gevent's streamsearver still works and handle the new connected sockets, but all these sockets are blocked in this step(creating the channel), unless I terminate (ctrl-c) my server process, and restart it.
Sometimes, I can get below trace back information after I type the ctrl-c (terminating the process):

^CTraceback (most recent call last):

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run

    result = self._run(*self.args, **self.kwargs)

  File "tcp_server.py", line 237, in _handle

    rmq_ch = rmq_con.channel()  <------ This is the place where it blocks #####################

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 343, in channel

    rval.open()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 177, in open

    self.channel.open()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/channel_class.py", line 48, in open

    self.channel.add_synchronous_cb(self._recv_open_ok)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 316, in add_synchronous_cb

    self.connection.read_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 407, in read_frames

    data = self._transport.read(self._heartbeat)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/transports/gevent_transport.py", line 72, in read

    self._read_wait.wait(timeout)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/event.py", line 74, in wait

    timer = Timeout.start_new(timeout)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 119, in start_new

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 119, in start_new

    timeout = cls(timeout, exception, ref=ref)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 90, in __init__

    self.timer = get_hub().loop.timer(seconds or 0.0, ref=ref, priority=priority)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/hub.py", line 166, in get_hub

    return _threadlocal.hub

Any ideas?
Really appreciate your help!

Haigha doesn't abide by PEP8

Understanding and accepting that there are variations of style across projects, it would be nice to see Haigha updated to respect the guidelines set forth in PEP8. It is very difficult to work on Haigha without altering linter configuration to ignore basically all the warnings and errors.

http://legacy.python.org/dev/peps/pep-0008/

AttributeError: 'Channel' object has no attribute 'basic' when publish message

Hi! I write functional tests for my project, where I create RabbitMQ consumer using haigha =) There are both test producer and consumer in functional tests (started one by one). The producer randomly fails - opened channel closes randomly. When channel closed - no info is provided to close cb.

The only error I get is AttributeError: 'Channel' object has no attribute 'basic':

  File "/home/dizpers/work/myproject/.env/local/lib/python2.7/site-packages/haigha/channel.py", line 194, in publish
    self.basic.publish(*args, **kwargs)
AttributeError: 'Channel' object has no attribute 'basic'

What may cause this? How I can get more info from channel close callback? TIA!

PS The only cause I see for now is that consumer and producer started together.

Lost connection triggers TypeError exception

When rabbitmq is restarted while my application is connected, my app dies with the following exception:

TypeError: 'NoneType' object is not callable

Looks like the problem is that ConnectionChannel._recv_close() schedules a call to self.connection._close_cb even if it is None. I can work around the problem by passing a close_cb argument to haigha.connection.Connection().

Diagnostic info loss or app failure results when connection is closed by broker

Channel._closed_cb() sets various member variables to None, such as self.channel. If you later try to check if the channel is closed via ch.closed (which translates in ch.channel.closed) or ch.close_info (which translates into ch.channel.close_info), you get one of those "AttributeError("'NoneType' object has no attribute..." exceptions.

On a related note, if you you registered a channel-close callback via Channel.add_close_listener(), then the listener callback gets called after self.channel and friends are set to None, so ch.close_info (that you would want to log) also causes the above-mentioned exception.

I could do something ugly, like save a reference to the channel's internal channel member immediately after creating the channel so that I can access close_info, etc. later, but that has the usual pitfalls of messing with the internals of a class.



  def _closed_cb(self, final_frame=None):
    if final_frame:
      self._connection.send_frame( final_frame )
    self._pending_events = deque()
    self._frame_buffer = deque()

    # clear out other references for faster cleanup
    for protocol_class in self._class_map.values():
      protocol_class._cleanup()
    self._connection = None
    self.channel = None
    self.exchange = None
    self.queue = None
    self.basic = None
    self.tx = None
    self._class_map = None

    for listener in self._close_listeners:
      listener( self )
    self._close_listeners = set()

Silenced errors

Haigha seems to silently discard errors from server.

from haigha.connection import Connection
from haigha.message import Message

connection = Connection(user='guest', password='guest', vhost='/', host='10.10.1.87', heartbeat=None, debug=True)
ch = connection.channel()
ch.exchange.declare('test_exchange', 'direct', durable=True)
ch.queue.declare('test_queue', durable=True)
ch.queue.bind('test_queue', 'test_exchange', 'test_key')
ch.basic.publish( Message("hello world", application_headers={'hello':'world'}), 'test_exchange', 'test_key' )

connection.close()

connection = Connection(user='guest', password='guest', vhost='/', host='10.10.1.87', heartbeat=None, debug=True)
ch = connection.channel()
ch.exchange.declare('test_exchange', 'direct')
ch.queue.declare('test_queue')
ch.queue.bind('test_queue', 'test_exchange', 'test_key')
ch.basic.publish( Message("hello world", application_headers={'hello':'world'}), 'test_exchange', 'test_key' )

connection.close()

In this example the server tells that the exchange cannot be redeclared with diferent parameters but haihga crashes with traceback

Traceback (most recent call last):
File "test2.py", line 15, in
ch.exchange.declare('test_exchange', 'direct')
File "/home/santiago/virtualenvs/test/local/lib/python2.7/site-packages/haigha-0.5.11-py2.7.egg/haigha/classes/exchange_class.py", line 60, in declare
self.channel.add_synchronous_cb( self._recv_declare_ok )
File "/home/santiago/virtualenvs/test/local/lib/python2.7/site-packages/haigha-0.5.11-py2.7.egg/haigha/channel.py", line 268, in add_synchronous_cb
self.connection.read_frames()
AttributeError: 'NoneType' object has no attribute 'read_frames'

Haigha with gevent v1.0b2 prints DeprecationWarning to stderr

I started running Haigha with gevent v1.0b2 [gevent-1.0b2.tar.gz from http://code.google.com/p/gevent/downloads/list]. Nothing terribly eventful happened, except that the new gevent prints this deprecation warning to stderr:

/Users/vkruglikov/nta/current/lib/python2.6/site-packages/haigha/transports/gevent_transport.py:10: DeprecationWarning: gevent.coros has been renamed to gevent.lock
from gevent.coros import Semaphore

Unhandled exception when connect() fails

I love that haigha is smart enough to continually retry a failed connect (or reconnect), producing helpful log messages the whole time. On the first failed try, however, an unhandled exception spews a traceback onto my console:

2011-07-20 12:15:10 myapp DEBUG disconnecting connection
2011-07-20 12:15:10 myapp DEBUG Pending connect: None
2011-07-20 12:15:10 myapp DEBUG Scheduling a connection in 0
2011-07-20 12:15:10 myapp DEBUG Connecting to myhost on 5672
2011-07-20 12:15:10 myapp ERROR Failed to connect to myhost:5672, will try again in 2 seconds
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/haigha/connection_strategy.py", line 128, in _connect_cb
    self._connection.connect( self._cur_host.host, self._cur_host.port )
  File "/usr/local/lib/python2.7/dist-packages/haigha/connection.py", line 165, in connect
    self._sock.connect( (host,port) )
  File "/usr/local/lib/python2.7/dist-packages/eventsocket.py", line 243, in connect
    raise socket.error( err, os.strerror(err) )
error: [Errno 111] Connection refused

dead lock when calling basic.consume fucntion

Hi awestendorf,

The environment:

keywords: gevent, monkey patch, gevent transport, concurrent

from gevent import monkey; monkey.patch_all();
import gevent
from gevent import socket

from haigha.connection import Connection
from haigha.message import Message


sock_opts = {
    (socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
rmq_con = Connection(host="localhost", debug=True, transport='gevent', sock_opts=sock_opts)
rmq_ch = rmq_con.channel(synchronous=True)

def message_pump(conn):
    while conn:
        conn.read_frames()
        gevent.sleep()
gevent.spawn(message_pump, rmq_con)

The blocked line:

Some times the process will block, so I look into it and find the root cause is this line

rmq_ch.basic.consume(queue_name, rmq_msg_handler, consumer_tag=queue_name)

is blocked.

Can you imagine the situation which may cause this? or do you have any ideas about it?
Any comments are welcomed~

Thanks !

Can't use Haigha-gevent without installing pyevent

(initially discussed in agoragames/pyevent#3 (comment))

Haigha includes all transports from its connection.py script (from haigha.transports import *), which automatically picks up event_transport.py and crashes if pyevent can't be loaded.

I think that this can be overcome by doing lazy-imports inside the following code block instead of "import *", but I am not sure what would be the best for the tests:

if not isinstance(transport, Transport):
  if transport=='event':
    from haigha.transports.event_transport import EventTransport
    self._transport = EventTransport( self )
  elif transport=='gevent':
    from haigha.transports.gevent_transport import GeventTransport
    self._transport = GeventTransport( self )
  elif transport=='gevent_pool':
    self._transport = GeventPoolTransport( self )

Add friendly names to debug output

The debug output channel: 1, class_id: 20, method_id: 40, ... is useful, but not easy to use as it requires referencing the spec or code to know what it means. Add a concept of friendly names so that it is easier to debug and in addition to the ids, the above example would also include , name: channel.close, ...

How to use gevent.select module to read from gevent.socket and rabbitmq connection?

Hi,
I'm writing a App server based on gevent.StreamServer . And in the handler of each connection (from users' App), I need receive and process data from both App and RabbitMQ, and for high-performance, I choose gevent.select to take care of these two connection. So I expect it should be:

While 1:
    gevent.select([socket_from_app, socket_from_rabbitmq],[])

but I've read the doc of haigha, and didn't find something like socket_from_rabbitmq, should I use an instance of GeventTransport instead?

Thanks~~ :)

Errors don't seem to be handled/reported gracefully

I'm using haigha with the gevent transport and I'm getting a few errors (normal expected errors) but they don't seem to be reported well. The main one that I keep seeing is that the queue property on the channel instance disappears; from looking at the debug logging I can see that rabbitmq has reported an error immediately before, but it would be nice if this was reported in some way. Here's some logging and a stack trace as an example:

2013-11-26 17:28:01,589 DEBUG [haigha][Dummy-2] READ: MethodFrame[channel: 1, class_id: 20, method_id: 40, args: \x00\x14\x00\x28\x01\x94\x33\x4e\x4f\x54\x5f\x46\x4f\x55\x4e\x44\x20\x2d\x20\x6e\x6f\x20\x65\x78\x63\x68\x61\x6e\x67\x65\x20\x27\x61\x6c\x6c\x5f\x6d\x65\x73\x73\x61\x67\x65\x73\x27\x20\x69\x6e\x20\x76\x68\x6f\x73\x74\x20\x27\x2f\x27\x00\x3c\x00\x28]
2013-11-26 17:28:01,590 DEBUG [haigha][Dummy-2] WRITE: MethodFrame[channel: 1, class_id: 20, method_id: 41, args: None]
2013-11-26 17:28:01,590 DEBUG [haigha][Dummy-2] sent 12 bytes to localhost:5672
2013-11-26 17:28:02,090 ERROR [hatchd.utils.scripts.runner][Dummy-1] Uncaught exception running script
Traceback (most recent call last):
  File "/Users/daniel/Projects/myapp/lib/hatchd.utils/hatchd/utils/scripts/runner.py", line 93, in run_script
    func(settings=settings, arguments=args, argparser=parser)
  File "/Users/daniel/Projects/myapp/myapp/scripts/run_component.py", line 15, in run_component
    component.run()
  File "/Users/daniel/Projects/myapp/myapp/components/heartbeat.py", line 22, in run
    self.message_broker.publish(Message('hb', {}))
  File "/Users/daniel/Projects/myapp/myapp/messaging.py", line 34, in publish
    self.channel.queue.declare(
AttributeError: 'Channel' object has no attribute 'queue'
2013-11-26 17:28:02,091 DEBUG [haigha][Dummy-3] WRITE: MethodFrame[channel: 0, class_id: 10, method_id: 50, args: \x00\x00\x00\x00\x00\x00\x00]
2013-11-26 17:28:02,091 DEBUG [haigha][Dummy-3] sent 19 bytes to localhost:5672

and if I print out that last bit of data:

>>> print '\x00\x14\x00\x28\x01\x94\x33\x4e\x4f\x54\x5f\x46\x4f\x55\x4e\x44\x20\x2d\x20\x6e\x6f\x20\x65\x78\x63\x68\x61\x6e\x67\x65\x20\x27\x61\x6c\x6c\x5f\x6d\x65\x73\x73\x61\x67\x65\x73\x27\x20\x69\x6e\x20\x76\x68\x6f\x73\x74\x20\x27\x2f\x27\x00\x3c\x00\x28'
(�3NOT_FOUND - no exchange 'all_messages' in vhost '/'<(

If this is by design then all good, I'm just curious if I'm doing something wrong, or if there is something else I should be doing to handle errors?

p.s. Thank you for you work :)

Confused with basic.get no_ack

Hi, guys!

I'm a bit confused with no_ack.
if I call channel.basic.get(q_name, no_ack=False) it will "preserve" message after getting one.
And when I call channel.basic.get(q_name, no_ack=True)

So the question is the following: wouldn't no_ack=False mean "false no_ack" => "ack msg" (ie double no means yes)
I couldn't figure whats the param value should be until I've found some info in issue #6 (closed one) #6

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.