Giter Site home page Giter Site logo

rabbitmq / rabbitmq-amqp1.0 Goto Github PK

View Code? Open in Web Editor NEW
93.0 30.0 20.0 1.39 MB

AMQP 1.0 support for RabbitMQ

Home Page: https://www.rabbitmq.com/

License: Other

Makefile 9.98% Erlang 79.80% Java 1.37% F# 8.85%
rabbitmq amqp1-0 messaging rabbitmq-plugin

rabbitmq-amqp1.0's Introduction

AMQP 1.0 support for RabbitMQ

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/rabbitmq_amqp1_0. All issues have been transferred.

Overview

This plugin adds AMQP 1.0 support to RabbitMQ.

Despite the name, AMQP 0-9-1 and 1.0 are very much different protocols and thus 1.0 is treated as a separate protocol supported by RabbitMQ, not a revision of the original protocol that will eventually supersede it.

This plugin is several years old and is moderately mature. It may have certain limitations with its current architecture but most major AMQP 1.0 features should be in place.

This plugin supports 0-9-1 and 1.0 client interoperability with certain limitations.

Configuration

This plugin ships with modern versions of RabbitMQ.

It will listen on the standard AMQP port, 5672. To reconfigure this, do so as you would for 0-9-1. Clients connecting with 0-9-1 will continue to work on the same port.

The following two configuration options (which are specific to the AMQP 1.0 adapter) are accepted in the rabbitmq_amqp1_0 section of the configuration file.

AMQP 1.0 conceptually allows connections that are not authenticated with SASL (i.e. where no username and password is supplied). By default these will connect as the "guest" user. To change this, set default_user to a string with the name of the user to use, or the atom none to prevent unauthenticated connections.

{default_user, "guest"}

The default virtual host can be specified using the default_vhost setting. See the "Virtual Hosts" section below for a description.

{default_vhost, <<"/">>}

The protocol_strict_mode setting controls how strictly peers must conform to the specification. The default is not to enforce strictness, which allows non-fatal byte-counts in frames and inaccuracies in flow-control from peers.

{protocol_strict_mode, false}

Configuration example using sysctl config format (currently only available in RabbitMQ master):

amqp1_0.default_user  = guest
amqp1_0.default_vhost = /
amqp1_0.protocol_strict_mode = false

Clients we have tested

The current field of AMQP 1.0 clients is somewhat limited. Therefore we have not achieved as much interoperability as we might like.

We have tested against:

  • SwiftMQ Java client [1] We have done most of our testing against this client and things seem to work.

  • QPid / Proton C client [2] We have successfully tested against the "proton" command line tool this client ships with.

  • QPid / Proton Java client [2] We have not been able to get this client to get as far as opening a network connection (tested against 0.2 and 0.4).

  • Windows Azure Service Bus [3] It seems that the URI scheme used by this client assumes that it is connecting to Azure; it does not seem to be possible to get it to connect to another server.

[1] http://www.swiftmq.com/products/router/swiftlets/sys_amqp/client/index.html

[2] http://qpid.apache.org/proton/

[3] http://www.windowsazure.com/en-us/develop/net/how-to-guides/service-bus-amqp/

As new clients appear we will of course work on interoperability with them.

Interoperability with AMQP 0-9-1

Message payloads

This implementation as a plugin aims for useful interoperability with AMQP 0-9-1 clients. AMQP 1.0 messages can be far more structured than AMQP 0-9-1 messages, which simply have a payload of bytes.

The way we deal with this is that an AMQP 1.0 message with a single data section will be transcoded to an AMQP 0-9-1 message with just the bytes from that section, and vice versa. An AMQP 1.0 with any other payload will keep exactly that payload (i.e., encoded AMQP 1.0 sections, concatenated), and for AMQP 0-9-1 clients the type field of the basic.properties will contain the value "amqp-1.0".

Thus, AMQP 0-9-1 clients may receive messages that they cannot understand (if they don't have an AMQP 1.0 codec handy, anyway); however, these will at least be labelled. AMQP 1.0 clients shall receive exactly what they expect.

Message properties, annotations, headers, etc.

The headers and properties map as follows:

AMQP 1.0                                 AMQP 0-9-1
Header                                   Properties
  durable              <--------------->   delivery-mode   [1]
  priority             <--------------->   priority
  ttl                  <--------------->   expiration      [2]
  first-acquirer                                           [3]
  delivery-count                                           [4]
Properties
  message-id           <--------------->   message-id      [5]
  user-id              <--------------->   user-id
  to                                                       [6]
  subject                                                  [6]
  reply-to             <--------------->   reply-to        [6]
  correlation-id       <--------------->   correlation-id
  content-type         <--------------->   content-type
  content-encoding     <--------------->   content-encoding
  absolute-expiry-time                                     [7]
  creation-time        <--------------->   timestamp
Application headers    <-------/------->   headers         [8]

[1] durable is true if and only if delivery-mode is 2.

[2] expiration is a shortstr; since RabbitMQ will expect this to be an encoded string, we translate a ttl to the string representation of its integer value.

[3] first-acquirer is true if and only if the basic.deliver field redelivered is false.

[4] delivery-count is left null.

[5] AMQP 0-9-1 expects this to be a shortstr.

[6] See Routing and Addressing below.

[7] absolute-expiry-time has no corresponding field in AMQP 0-9-1, and is not supported in RabbitMQ in any case.

[8] The application headers section and the basic.properties field headers are natural analogues. However, rather than try to transcode an AMQP 1.0 map to an AMQP 0-9-1 field-table, currently we discard application headers (of AMQP 1.0 messages) and headers (of AMQP 0-9-1 messages sent through to AMQP 1.0). In other words, the (AMQP 1.0) application headers section is only available to AMQP 1.0 clients, and the (AMQP 0-9-1) headers field is only available to AMQP 0-9-1 clients.

Note that properties (in both AMQP 1.0 and AMQP 0-9-1) and application properties (in AMQP 1.0) are immutable; however, this can only apply when the sending and receiving clients are using the same protocol.

Routing and Addressing

In AMQP 1.0 source and destination addresses are opaque values, and each message may have a subject field value.

For targets, addresses are:

= "/exchange/"  X "/" RK  Publish to exchange X with routing key RK
| "/exchange/"  X         Publish to exchange X with message subject as routing key
| "/topic/"     RK        Publish to amq.topic with routing key RK
| "/amq/queue/" Q         Publish to default exchange with routing key Q
| "/queue/"     Q         Publish to default exchange with routing key Q
| Q (no leading slash)    Publish to default exchange with routing key Q
| "/queue"                Publish to default exchange with message subj as routing key

For sources, addresses are:

= "/exchange/"  X "/" RK  Consume from temp queue bound to X with routing key RK
| "/topic/"     RK        Consume from temp queue bound to amq.topic with routing key RK
| "/amq/queue/" Q         Consume from Q
| "/queue/"     Q         Consume from Q
| Q (no leading slash)    Consume from Q

The intent is that the source and destination address formats should be mostly the same as those supported by the STOMP plugin, to the extent permitted by AMQP 1.0 semantics.

Virtual Hosts

AMQP 1.0 has no equivalent of AMQP 0-9-1 virtual hosts. A virtual host on the broker may be addressed when opening an AMQP 1.0 connection by setting the hostname field, prefixing with "vhost:". Setting the hostname field to "vhost:/" addresses the default virtual host. If the hostname field does not start with "vhost:" then the default_vhost configuration setting will be consulted.

Limitations and unsupported features

At the minute, the RabbitMQ AMQP 1.0 adapter does not support:

  • "Exactly once" delivery [9]
  • Link recovery [9]
  • Full message fragmentation [10]
  • Resuming messages
  • "Modified" outcome
  • Filters [11]
  • Transactions
  • Source/target expiry-policy other than link-detach and timeout other than 0
  • Max message size for links
  • Aborted transfers
  • TLS negotiation via the AMQP2100 handshake (although SSL is supported)

[9] We do not deduplicate as a target, though we may resend as a source (messages that have no settled outcome when an outgoing link is detached will be requeued).

[10] We do fragment messages over multiple frames; however, if this would overflow the session window we may discard or requeue messages.

[11] In principle, filters for consuming from an exchange could translate to AMQP 0-9-1 bindings. This is not implemented, so effectively only consuming from fanout exchanges and queues is useful currently.

rabbitmq-amqp1.0's People

Contributors

acogoluegnes avatar dcorbacho avatar dependabot[bot] avatar dumbbell avatar fenollp avatar flaper87 avatar gerhard avatar hairyhum avatar ingwinlu avatar kjnilsson avatar lukebakken avatar mbuhot avatar michaelklishin avatar rade avatar spring-operator avatar squaremo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-amqp1.0's Issues

Azure Service Bus

You mention: "Windows Azure Service Bus [3] It seems that the URI scheme used by this client assumes that it is connecting to Azure; it does not seem to be possible to get it to connect to another server."

Does that mean that using Service Bus was unsuccessful? Is support planned?

source outcomes are incorrectly encoded in attach response sent to a producer

When a producer tries to attach a sending link with source outcomes set, the attach response sent by the broker incorrectly encodes the source outcomes as a list of symbols instead of an array of symbols, causing the process to fail.

In contrast, when a consumer attaches a receiving link with source outcomes set the attach response from the broker correctly contains the required array encoding.

This is seen using broker version 3.6.5

unable to install the plugin

Hello,

I have been trying to compile and install rabbitmq-amqp1.0 on my local instance of RabbitMQ 2.5.1 (MacOS 10.7 - tried both with brew and macports), and getting the same error during the compilation:

ERL_LIBS=./build/dep-apps erlc -Wall +debug_info -I ./include -pa ebin -o ebin src/rabbit_amqp1_0_session.erl
src/rabbit_amqp1_0_session.erl:131: record 'basic.credit_state' undefined
src/rabbit_amqp1_0_session.erl:137: variable 'Available0' is unbound
src/rabbit_amqp1_0_session.erl:141: variable 'CTag' is unbound
src/rabbit_amqp1_0_session.erl:153: variable 'Count' is unbound
src/rabbit_amqp1_0_session.erl:154: variable 'LinkCredit' is unbound
src/rabbit_amqp1_0_session.erl:156: variable 'Drain' is unbound
src/rabbit_amqp1_0_session.erl:571: record 'basic.credit' undefined
src/rabbit_amqp1_0_session.erl:629: record 'basic.credit_ok' undefined
src/rabbit_amqp1_0_session.erl:632: record 'basic.credit' undefined
src/rabbit_amqp1_0_session.erl:636: variable 'Available' is unbound
src/rabbit_amqp1_0_session.erl:283: Warning: variable 'RemoteOutWindow' is unused
src/rabbit_amqp1_0_session.erl:368: Warning: variable 'Txfr' is unused
src/rabbit_amqp1_0_session.erl:468: Warning: variable 'LocalNextIn' is unused
src/rabbit_amqp1_0_session.erl:473: Warning: variable 'RemoteNextOut' is unused
src/rabbit_amqp1_0_session.erl:474: Warning: variable 'RemoteWindowOut' is unused
src/rabbit_amqp1_0_session.erl:567: Warning: variable 'Count' is unused
src/rabbit_amqp1_0_session.erl:628: Warning: variable 'CTag' is unused
src/rabbit_amqp1_0_session.erl:669: Warning: variable 'Content' is unused
src/rabbit_amqp1_0_session.erl:864: Warning: variable 'ExpiryPolicy' is unused
src/rabbit_amqp1_0_session.erl:920: Warning: variable 'ExpiryPolicy' is unused
make: *** [ebin/rabbit_amqp1_0_session.beam] Error 1

Anything you could help me with?

Thank you in advance.

Missing pattern for to_expiration

Apparently, based on my very bad erlang knowledge (and some Erlangers help), there's a missing pattern in the to_expiration function.

** Reason for termination == 
** {function_clause,
       [{rabbit_amqp1_0_message,to_expiration,[{uint,5000000}],[]},
        {rabbit_amqp1_0_message,translate_header,2,[]},
        {rabbit_amqp1_0_message,assemble,4,[]},
        {rabbit_amqp1_0_message,assemble,1,[]},
        {rabbit_amqp1_0_incoming_link,transfer,4,[]},
        {rabbit_amqp1_0_session_process,handle_control,2,[]},
        {rabbit_amqp1_0_session_process,handle_cast,2,[]},
        {gen_server2,handle_msg,2,[]}]}

Session is closed while creating link, target: test_purge

Hi,
I'm actually trying to make use of RabbitMQ with a "Informatica Cloud Real Time" iPaaS plateform.
This product is implementing AMQP 1-0 through Java QPid (I don't know the version yet but I'll know soon).

I'm able to have a listener getting messages without any problem.
The problem is when it tries to write to a queue (any queue).

If it can help here are what gives the 2 Rabbit log files.
log:

=INFO REPORT==== 21-Jul-2016::10:42:07 ===
accepting AMQP connection <0.6179.0> (127.0.0.1:56312 -> 127.0.0.1:5672)

=ERROR REPORT==== 21-Jul-2016::10:42:07 ===
** Generic server <0.6185.0> terminating
** Last message in was {'$gen_cast',
                        {frame,
                         {'v1_0.attach',
                          {utf8,
                           <<"test_purge<-79b9fd92-00f1-4a43-849e-7cd71e66c11a">>},
                          {uint,0},
                          false,
                          {ubyte,0},
                          {ubyte,0},
                          {'v1_0.source',
                           {utf8,<<"79b9fd92-00f1-4a43-849e-7cd71e66c11a">>},
                           undefined,undefined,undefined,undefined,undefined,
                           undefined,undefined,
                           {'v1_0.accepted'},
                           {list,
                            [{symbol,<<"amqp:accepted:list">>},
                             {symbol,<<"amqp:rejected:list">>}]},
                           undefined},
                          {'v1_0.target',
                           {utf8,<<"test_purge">>},
                           undefined,undefined,undefined,undefined,undefined,
                           undefined},
                          undefined,undefined,
                          {uint,0},
                          undefined,undefined,undefined,undefined},
                         <0.6179.0>}}
** When Server state == {state,<0.6188.0>,<0.6194.0>,65528,<0.6179.0>,
                            <0.6184.0>,
                            {[],[]},
                            {session,0,2048,2048,0,65535,65535,0,65535,65535,
                                0,0,
                                {0,nil},
                                {0,nil}}}
** Reason for termination == 
** {function_clause,
       [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
            [{list,
                 [{symbol,<<"amqp:accepted:list">>},
                  {symbol,<<"amqp:rejected:list">>}]}],
            []},
        {rabbit_amqp1_0_link_util,outcomes,1,[]},
        {rabbit_amqp1_0_incoming_link,attach,3,[]},
        {rabbit_amqp1_0_session_process,with_disposable_channel,2,[]},
        {rabbit_amqp1_0_session_process,handle_control,2,[]},
        {rabbit_amqp1_0_session_process,handle_cast,2,[]},
        {gen_server2,handle_msg,2,[]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

=ERROR REPORT==== 21-Jul-2016::10:42:10 ===
closing AMQP connection <0.6179.0> (127.0.0.1:56312 -> 127.0.0.1:5672):
{handshake_error,running,<0.6185.0>,
    {{symbol,<<"amqp:internal-error">>},
     "Session error: ~p~n~p~n",
     [function_clause,
      [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
           [{list,
                [{symbol,<<"amqp:accepted:list">>},
                 {symbol,<<"amqp:rejected:list">>}]}],
           []},
       {rabbit_amqp1_0_link_util,outcomes,1,[]},
       {rabbit_amqp1_0_incoming_link,attach,3,[]},
       {rabbit_amqp1_0_session_process,with_disposable_channel,2,[]},
       {rabbit_amqp1_0_session_process,handle_control,2,[]},
       {rabbit_amqp1_0_session_process,handle_cast,2,[]},
       {gen_server2,handle_msg,2,[]},
       {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]]}}

sasl.log

=CRASH REPORT==== 21-Jul-2016::10:42:07 ===
  crasher:
    initial call: gen:init_it/6
    pid: <0.6185.0>
    registered_name: []
    exception exit: {function_clause,
                        [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
                             [{list,
                                  [{symbol,<<"amqp:accepted:list">>},
                                   {symbol,<<"amqp:rejected:list">>}]}],
                             []},
                         {rabbit_amqp1_0_link_util,outcomes,1,[]},
                         {rabbit_amqp1_0_incoming_link,attach,3,[]},
                         {rabbit_amqp1_0_session_process,
                             with_disposable_channel,2,[]},
                         {rabbit_amqp1_0_session_process,handle_control,2,[]},
                         {rabbit_amqp1_0_session_process,handle_cast,2,[]},
                         {gen_server2,handle_msg,2,[]},
                         {proc_lib,init_p_do_apply,3,
                             [{file,"proc_lib.erl"},{line,240}]}]}
      in function  gen_server2:terminate/3 
    ancestors: [<0.6183.0>,<0.6180.0>,<0.6178.0>,<0.6177.0>,
                  rabbit_tcp_client_sup,rabbit_sup,<0.159.0>]
    messages: []
    links: [<0.6183.0>]
    dictionary: [{credit_flow_default_credit,{200,50}},
                  {{credit_to,<0.6179.0>},48}]
    trap_exit: true
    status: running
    heap_size: 987
    stack_size: 27
    reductions: 545
  neighbours:

=SUPERVISOR REPORT==== 21-Jul-2016::10:42:07 ===
     Supervisor: {<0.6183.0>,rabbit_amqp1_0_session_sup}
     Context:    child_terminated
     Reason:     {function_clause,
                     [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
                          [{list,
                               [{symbol,<<"amqp:accepted:list">>},
                                {symbol,<<"amqp:rejected:list">>}]}],
                          []},
                      {rabbit_amqp1_0_link_util,outcomes,1,[]},
                      {rabbit_amqp1_0_incoming_link,attach,3,[]},
                      {rabbit_amqp1_0_session_process,
                          with_disposable_channel,2,[]},
                      {rabbit_amqp1_0_session_process,handle_control,2,[]},
                      {rabbit_amqp1_0_session_process,handle_cast,2,[]},
                      {gen_server2,handle_msg,2,[]},
                      {proc_lib,init_p_do_apply,3,
                          [{file,"proc_lib.erl"},{line,240}]}]}
     Offender:   [{pid,<0.6185.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_amqp1_0_session_process,start_link,
                          [{0,<0.6179.0>,<0.6184.0>,
                            {user,<<"sfdc">>,
                                [administrator],
                                [{rabbit_auth_backend_internal,none}]},
                            <<"/">>,65528,
                            {amqp_adapter_info,
                                {127,0,0,1},
                                5672,
                                {127,0,0,1},
                                56312,<<"127.0.0.1:56312 -> 127.0.0.1:5672">>,
                                {'AMQP',"1.0"},
                                [{ssl,false}]},
                            <0.6181.0>}]}},
                  {restart_type,intrinsic},
                  {shutdown,4294967295},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 21-Jul-2016::10:42:07 ===
     Supervisor: {<0.6183.0>,rabbit_amqp1_0_session_sup}
     Context:    shutdown
     Reason:     reached_max_restart_intensity
     Offender:   [{pid,<0.6185.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_amqp1_0_session_process,start_link,
                          [{0,<0.6179.0>,<0.6184.0>,
                            {user,<<"sfdc">>,
                                [administrator],
                                [{rabbit_auth_backend_internal,none}]},
                            <<"/">>,65528,
                            {amqp_adapter_info,
                                {127,0,0,1},
                                5672,
                                {127,0,0,1},
                                56312,<<"127.0.0.1:56312 -> 127.0.0.1:5672">>,
                                {'AMQP',"1.0"},
                                [{ssl,false}]},
                            <0.6181.0>}]}},
                  {restart_type,intrinsic},
                  {shutdown,4294967295},
                  {child_type,worker}]

Hoping to find a way to make it work.
rabbitmq.config.txt

Consider dropping supervisor2

supervisor2 hasn't been updated to support map flags and child specs. While this can be corrected, we should reconsider its use. It seems to be used only in a few places and replacing it with the standard supervisor might be quite straightforward.

crash on ambiguous omission of field in disposition frame

I'm just putting this up to capture what we've learned on the mailing list. Proton sends a disposition frame without a "state" field which causes rabbitmq to crash on that connection with the following output (modified by @simonmacmullen):

=ERROR REPORT==== 20-Mar-2015::13:10:53 ===
** Generic server <0.5794.0> terminating
** Last message in was {'$gen_cast',
{frame,
{'v1_0.disposition',true,
{uint,0},
{uint,0},
true,undefined,undefined},
<0.5788.0>}}

** Reason for termination ==
** {{case_clause,undefined},
[{rabbit_amqp1_0_session_process,'-handle_control/2-fun-2-',3,[]},
{rabbit_amqp1_0_session,'-settle/3-fun-0-',3,[]},
{lists,foldl,3,[{file,"lists.erl"},{line,1248}]},
{rabbit_amqp1_0_session,settle,3,[]},
{rabbit_amqp1_0_session_process,handle_control,2,[]},
{rabbit_amqp1_0_session_process,handle_cast,2,[]},
{gen_server2,handle_msg,2,[]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

It seems to me that rabbitmq should at very least handle this case gracefully, rather than crashing, but perhaps take an opinionated position (if the client is sending back a disposition frame in the first place, its probably an ACK right?)

Cheers,
Matt

Qpid C++ client runs into an exception

Hello All,

RabbitMQ AMQP 1.0 errors out and closes the session when an Apache Qpid C++ client receive() call times out. The RabbitMQ server should not close the session, or throw an error when the Qpid C++ client receive() call times out.

I wrote this bug on QPID C++ API:
https://issues.apache.org/jira/browse/QPID-8347
I believe that some part of it may be related to the RabbitMQ server.

Source:

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <iostream>

using namespace qpid::messaging;

int main(int argc, char** argv) {
    std::string broker = argc > 1 ? argv[1] : "localhost:5672";
    std::cout << "broker: " << broker << std::endl;
    std::string address = argc > 2 ? argv[2] : "topic.hello.world";
    std::cout << "address: " << address << std::endl;
    std::string connectionOptions = argc > 3 ? argv[3] : "";
    std::cout << "connectionOptions: " << connectionOptions << std::endl;


    try {
        Connection connection(broker, connectionOptions);
        connection.open();
        Session session = connection.createSession();

        Receiver receiver = session.createReceiver(address);
        Message message;
        std::cout << "Pre Receive" << std::endl;
	message = receiver.fetch(Duration::SECOND * 10);
        std::cout << "Post Receive" << std::endl;
        session.acknowledge();

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cerr << error.what() << std::endl;
        return 1;
    }
}

Output:

└─╼ ./hello_world 
broker: localhost:5672
address: topic.hello.world
connectionOptions: 
Pre Receive
Session ended by peer with amqp:internal-error: {{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,[115,114,99,47,97,109,113,112,95,99,104,97,110,110,101,108,46,101,114,108]},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,[115,114,99,47,97,109,113,112,95,99,104,97,110,110,101,108,46,101,114,108]},{line,800}]},{gen_server,try_dispatch,4,[{file,[103,101,110,95,115,101,114,118,101,114,46,101,114,108]},{line,637}]},{gen_server,handle_msg,6,[{file,[103,101,110,95,115,101,114,118,101,114,46,101,114,108]},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,[112,114,111,99,95,108,105,98,46,101,114,108]},{line,249}]}]}

Server Error (Occurs at timeout, 10 seconds in)

  • Interestingly, the first connection attempt is rejected. I am guessing that connection.createSession() opens a connection to the Broker that is rejected, and receiver.fetch(Duration::SECOND * 10); reconnects (successfully). This is not a major concern for me but may be something worth looking into.
2019-07-29 19:57:21.555 [info] <0.4147.0> accepting AMQP connection <0.4147.0> ([::1]:50744 -> [::1]:5672)
2019-07-29 19:57:21.555 [error] <0.4147.0> closing AMQP connection <0.4147.0> ([::1]:50744 -> [::1]:5672):
{bad_version,{1,1,0,10}}
2019-07-29 19:57:21.566 [info] <0.4150.0> accepting AMQP connection <0.4150.0> ([::1]:50746 -> [::1]:5672)
2019-07-29 19:57:31.580 [error] <0.4165.0> ** Generic server <0.4165.0> terminating 
** Last message in was {send_command,{'basic.credit_drained',<<99,116,97,103,45,0,0,0,0>>,1}}
** When Server state == {state,1,<0.4159.0>,<0.4164.0>,direct,{[],[]},false,<0.4168.0>,none,none,0,true,none,{0,nil},{0,nil},true,false}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,"src/amqp_channel.erl"},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,"src/amqp_channel.erl"},{line,800}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}
2019-07-29 19:57:31.580 [warning] <0.4156.0> Closing session for connection <0.4150.0>:
{{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,"src/amqp_channel.erl"},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,"src/amqp_channel.erl"},{line,800}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}
2019-07-29 19:57:31.580 [error] <0.4165.0> CRASH REPORT Process <0.4165.0> with 0 neighbours crashed with reason: no match of right hand value {empty,{[],[]}} in amqp_channel:rpc_bottom_half/2 line 623
2019-07-29 19:57:31.580 [error] <0.4163.0> Supervisor {<0.4163.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.4159.0>, 1, <0.4164.0>, {<<"[::1]:50746 -> [::1]:5672">>,1}) at <0.4165.0> exit with reason no match of right hand value {empty,{[],[]}} in amqp_channel:rpc_bottom_half/2 line 623 in context child_terminated
2019-07-29 19:57:31.580 [error] <0.4163.0> Supervisor {<0.4163.0>,amqp_channel_sup} had child channel started with amqp_channel:start_link(direct, <0.4159.0>, 1, <0.4164.0>, {<<"[::1]:50746 -> [::1]:5672">>,1}) at <0.4165.0> exit with reason reached_max_restart_intensity in context shutdown
2019-07-29 19:57:31.580 [warning] <0.4159.0> Connection (<0.4159.0>) closing: internal error in channel (<0.4165.0>): {{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,"src/amqp_channel.erl"},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,"src/amqp_channel.erl"},{line,800}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}
2019-07-29 19:57:31.581 [error] <0.4168.0> ** Generic server <0.4168.0> terminating
** Last message in was {'EXIT',<0.4165.0>,{{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,"src/amqp_channel.erl"},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,"src/amqp_channel.erl"},{line,800}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.4165.0>,<0.4165.0>,<0.4159.0>,<<"[::1]:50746 -> [::1]:5672">>,rabbit_direct,{lstate,<0.4167.0>,false},none,1,{[],[]},{user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]},<<"/">>,<<>>,#{<0.3530.0> => {resource,<<"/">>,queue,<<"topic.hello.world">>}},{state,{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[<0.3530.0>|#Ref<0.1479989272.2034237442.31559>]]}}},erlang},#{<<99,116,97,103,45,0,0,0,0>> => {{amqqueue,{resource,<<"/">>,queue,<<"topic.hello.world">>},false,false,none,[],<0.3530.0>,[],[],[],undefined,undefined,[],[],live,0,[],<<"/">>,#{user => <<"guest">>}},{false,65535,false,[{<<"x-credit">>,table,[{<<"credit">>,long,0},{<<"drain">>,bool,false}]}]}}},#{<0.3530.0> => {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}},{set,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[<0.3530.0>]}}},<0.4162.0>,{state,fine,5000,#Ref<0.1479989272.2034237441.132594>},false,1,{{0,nil},{0,nil}},[],[],{{0,nil},{0,nil}},[{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,bool,true},{<<"consumer_cancel_notify">>,bool,true},{<<"connection.blocked">>,bool,true},{<<"authentication_failure_close">>,bool,true}],none,65535,none,flow,[]}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},[{amqp_channel,rpc_bottom_half,2,[{file,"src/amqp_channel.erl"},{line,623}]},{amqp_channel,handle_method_from_server1,3,[{file,"src/amqp_channel.erl"},{line,800}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,637}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,711}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,249}]}]}
2019-07-29 19:57:31.581 [error] <0.4168.0> CRASH REPORT Process <0.4168.0> with 0 neighbours exited with reason: no match of right hand value {empty,{[],[]}} in amqp_channel:rpc_bottom_half/2 line 623 in gen_server2:terminate/3 line 1172
2019-07-29 19:57:31.581 [error] <0.4166.0> Supervisor {<0.4166.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.4165.0>, <0.4165.0>, <0.4159.0>, <<"[::1]:50746 -> [::1]:5672">>, rabbit_framing_amqp_0_9_1, {user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.4162.0>, <0.4167.0>) at <0.4168.0> exit with reason no match of right hand value {empty,{[],[]}} in amqp_channel:rpc_bottom_half/2 line 623 in context child_terminated
2019-07-29 19:57:31.581 [error] <0.4166.0> Supervisor {<0.4166.0>,rabbit_channel_sup} had child channel started with rabbit_channel:start_link(1, <0.4165.0>, <0.4165.0>, <0.4159.0>, <<"[::1]:50746 -> [::1]:5672">>, rabbit_framing_amqp_0_9_1, {user,<<"guest">>,[administrator],[{rabbit_auth_backend_internal,none}]}, <<"/">>, [{<<"publisher_confirms">>,bool,true},{<<"exchange_exchange_bindings">>,bool,true},{<<"basic.nack">>,...},...], <0.4162.0>, <0.4167.0>) at <0.4168.0> exit with reason reached_max_restart_intensity in context shutdown
2019-07-29 19:57:31.585 [info] <0.4150.0> closing AMQP connection <0.4150.0> ([::1]:50746 -> [::1]:5672)

Configuration:
OS: Ubuntu Bionic
QPID C++ version 1.39.0
QPID Proton version 0.28.0
RabbitMQ Broker version 3.7.17-1
Erlang version 21.3.x
RabbimtMQ AMQP 1.0 plugin Version 3.7.17

Gracefully handle refused access (e.g. to insufficient permissions) in session process

Currently it fails with an exception that makes it hard to spot the root cause:

2019-05-08 08:43:38.977 [debug] <0.995.0> Supervisor {<0.995.0>,amqp_connection_sup} started amqp_gen_connection:start_link(<0.996.0>, {amqp_params_direct,<<"shovel-57">>,none,<<"/">>,rabbit@mercurio,{amqp_adapter_info,{0,0,0,0,0,65535,...},...},...}) at pid <0.997.0>
2019-05-08 08:43:38.977 [error] <0.994.0> CRASH REPORT Process <0.994.0> with 0 neighbours exited with reason: no match of right hand value {error,not_allowed} in rabbit_amqp1_0_session_process:init/1 line 52 in gen_server2:init_it/6 line 597
2019-05-08 08:43:38.977 [error] <0.986.0> AMQP 1.0 connection <0.986.0> (running), channel 0 - error:
<<"Reader error: {badmatch,\n               {error,\n                {'EXIT',\n                 {{badmatch,\n                   {error,\n                    {{{badmatch,{error,not_allowed}},\n                      [{rabbit_amqp1_0_session_process,init,1,\n                        [{file,\"src/rabbit_amqp1_0_session_process.erl\"},\n                         {line,52}]}

Discovered while QA'ing rabbitmq/rabbitmq-shovel#57.

Unable to send messages from QPID JMS client to RabbitMQ with AMQP 1.0 plugin

Hello -

I am trying to connect and send message from QPID JMS client to RabbitMQ with AMQP 1.0 plugin. However, I could create a durable queue but not achieve to send message and observe the following logs:

At QPID JMS client side (running HelloWorld example in the QPID JMS example repo):

2018-04-06 16:31:19,377 [68.247.44:5672]] - TRACE NettyTcpTransport              - Netty Transport using NIO mode
2018-04-06 16:31:19,799 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - Channel has become active! Channel is [id: 0x3a680018, L:/217.78.109.51:29431 - R:/47.168.247.44:5672]
2018-04-06 16:31:19,826 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 8 bytes
2018-04-06 16:31:19,841 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 51 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 51, cap: 65536)
2018-04-06 16:31:19,841 [68.247.44:5672]] - DEBUG SaslMechanismFinder            - Skipping SASL-PLAIN mechanism because the available credentials are not sufficient
2018-04-06 16:31:19,841 [68.247.44:5672]] - DEBUG SaslMechanismFinder            - Unknown SASL mechanism: [AMQPLAIN]
2018-04-06 16:31:19,841 [68.247.44:5672]] - INFO  SaslMechanismFinder            - Best match for SASL auth was: SASL-ANONYMOUS
2018-04-06 16:31:19,841 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: CONNECTION_INIT
2018-04-06 16:31:19,841 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 42 bytes
2018-04-06 16:31:19,841 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 17 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 17, cap: 65536)
2018-04-06 16:31:19,841 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 8 bytes
2018-04-06 16:31:19,857 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 8 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
2018-04-06 16:31:19,857 [68.247.44:5672]] - TRACE MetaDataSupport                - Problem generating primary version details
java.lang.NullPointerException
	at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
	at java.util.regex.Matcher.reset(Matcher.java:309)
	at java.util.regex.Matcher.<init>(Matcher.java:229)
	at java.util.regex.Pattern.matcher(Pattern.java:1093)
	at org.apache.qpid.jms.util.MetaDataSupport.<clinit>(MetaDataSupport.java:47)
	at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.createEndpoint(AmqpConnectionBuilder.java:116)
	at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.createEndpoint(AmqpConnectionBuilder.java:1)
	at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.buildResource(AmqpResourceBuilder.java:76)
	at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.buildResource(AmqpConnectionBuilder.java:58)
	at org.apache.qpid.jms.provider.amqp.AmqpProvider$4$1.processConnectionInfo(AmqpProvider.java:412)
	at org.apache.qpid.jms.meta.JmsConnectionInfo.visit(JmsConnectionInfo.java:417)
	at org.apache.qpid.jms.provider.amqp.AmqpProvider$4.run(AmqpProvider.java:371)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)

…

2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE FRAMES                         - SENT: Open{ containerId='ID:6d6fab0c-f5e6-4618-a407-c54a04b0b439:1', hostname='47.168.247.44', maxFrameSize=1048576, channelMax=32767, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, ANONYMOUS-RELAY, SHARED-SUBS], properties={product=QpidJMS, version=0.32.0-SNAPSHOT, platform=JVM: 1.8.0_161, 25.161-b12, Oracle Corporation, OS: Windows 10, 10.0, amd64}}
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 307 bytes
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: CONNECTION_LOCAL_OPEN
2018-04-06 16:31:19,873 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 340 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 340, cap: 65536)
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE FRAMES                         - RECV: Open{ containerId='rabbit@appstacleserver-HP-Compaq-Elite-8300-MT', hostname='null', maxFrameSize=1048576, channelMax=32767, idleTimeOut=60000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={cluster_name=rabbit@appstacleserver-HP-Compaq-Elite-8300-MT, copyright=Copyright (C) 2007-2018 Pivotal Software, Inc., information=Licensed under the MPL.  See http://www.rabbitmq.com/, platform=Erlang/OTP 20.2, product=RabbitMQ, version=3.7.4}}
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: CONNECTION_REMOTE_OPEN
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_INIT
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_LOCAL_OPEN
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE FRAMES                         - SENT: Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2018-04-06 16:31:19,873 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 32 bytes
2018-04-06 16:31:19,888 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 36 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 36, cap: 65536)
2018-04-06 16:31:19,888 [68.247.44:5672]] - TRACE FRAMES                         - RECV: Begin{remoteChannel=0, nextOutgoingId=0, incomingWindow=65535, outgoingWindow=65535, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2018-04-06 16:31:19,904 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_REMOTE_OPEN
2018-04-06 16:31:19,904 [68.247.44:5672]] - DEBUG AmqpConnectionBuilder          - AmqpConnection { ID:3c1ad029-0598-40a2-964a-b0f83288ded5:1 } is now open: 
2018-04-06 16:31:19,904 [68.247.44:5672]] - TRACE AmqpProvider                   - IdleTimeoutCheck being initiated, initial delay: 30000
2018-04-06 16:31:19,904 [68.247.44:5672]] - INFO  JmsConnection                  - Connection ID:3c1ad029-0598-40a2-964a-b0f83288ded5:1 connected to remote Broker: amqp://47.168.247.44:5672?amqp.traceFrames=true&org.apache.qpid.jms.provider.amqp.FRAMES=TRACE
2018-04-06 16:31:19,904 [68.247.44:5672]] - TRACE FRAMES                         - SENT: Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=2047, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2018-04-06 16:31:19,904 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 32 bytes
2018-04-06 16:31:19,920 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - New data read: 36 bytes incoming: PooledUnsafeDirectByteBuf(ridx: 0, widx: 36, cap: 65536)
2018-04-06 16:31:19,920 [68.247.44:5672]] - TRACE FRAMES                         - RECV: Begin{remoteChannel=1, nextOutgoingId=0, incomingWindow=65535, outgoingWindow=65535, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2018-04-06 16:31:19,920 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_INIT
2018-04-06 16:31:19,920 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_LOCAL_OPEN
2018-04-06 16:31:19,920 [68.247.44:5672]] - TRACE AmqpProvider                   - New Proton Event: SESSION_REMOTE_OPEN
2018-04-06 16:31:19,920 [68.247.44:5672]] - DEBUG AmqpProducerBuilder            - Creating AmqpFixedProducer for: helloagain
2018-04-06 16:31:19,935 [68.247.44:5672]] - TRACE FRAMES                         - SENT: Attach{name='qpid-jms:sender:ID:3c1ad029-0598-40a2-964a-b0f83288ded5:1:1:1:helloagain', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='ID:3c1ad029-0598-40a2-964a-b0f83288ded5:1:1:1', durable=CONFIGURATION, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list], capabilities=null}, target=Target{address='helloagain', durable=CONFIGURATION, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=[queue]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=[DELAYED_DELIVERY], properties=null}
2018-04-06 16:31:19,935 [68.247.44:5672]] - TRACE NettyTcpTransport              - Attempted write of: 307 bytes
2018-04-06 16:31:19,935 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - Exception on channel! Channel is [id: 0x3a680018, L:/217.78.109.51:29431 - R:/47.168.247.44:5672]
2018-04-06 16:31:19,935 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - Firing onTransportError listener
2018-04-06 16:31:19,935 [68.247.44:5672]] - INFO  AmqpProvider                   - Transport failed: An existing connection was forcibly closed by the remote host
2018-04-06 16:31:19,935 [ntLoopGroup-2-1] - TRACE NettyTcpTransport              - Channel has gone inactive! Channel is [id: 0x3a680018, L:/217.78.109.51:29431 ! R:/47.168.247.44:5672]
Connection ExceptionListener fired, exiting.
javax.jms.JMSException: An existing connection was forcibly closed by the remote host
	at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86)
	at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108)
	at org.apache.qpid.jms.JmsConnection.onAsyncException(JmsConnection.java:1392)
	at org.apache.qpid.jms.JmsConnection.onProviderException(JmsConnection.java:1376)
	at org.apache.qpid.jms.JmsConnection.onConnectionFailure(JmsConnection.java:1244)
	at org.apache.qpid.jms.provider.amqp.AmqpProvider.fireProviderException(AmqpProvider.java:1086)
	at org.apache.qpid.jms.provider.amqp.AmqpProvider$18.run(AmqpProvider.java:859)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host
	at sun.nio.ch.SocketDispatcher.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1108)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:345)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more
Caught exception, exiting.
javax.jms.JMSException: An existing connection was forcibly closed by the remote host
	at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:86)
	at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:108)
	at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:615)
	at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:599)
	at org.apache.qpid.jms.JmsMessageProducer.<init>(JmsMessageProducer.java:73)
	at org.apache.qpid.jms.JmsSession.createProducer(JmsSession.java:603)
	at org.apache.qpid.jms.example.HelloWorld.main(HelloWorld.java:54)
Caused by: java.io.IOException: An existing connection was forcibly closed by the remote host
	at sun.nio.ch.SocketDispatcher.read0(Native Method)

At RabbitMQ side:

2018-04-06 16:31:21.880 [info] <0.17194.6> accepting AMQP connection <0.17194.6> (217.78.109.51:29431 -> 47.168.247.44:5672)
2018-04-06 16:31:21.986 [error] <0.17216.6> CRASH REPORT Process <0.17216.6> with 0 neighbours crashed with reason: no function clause matching rabbit_amqp1_0_binary_generator:generate({array,[{symbol,<<"amqp:accepted:list">>},{symbol,<<"amqp:rejected:list">>},{symbol,<<"amqp:releas...">>}]}) line 42
2018-04-06 16:31:21.987 [error] <0.17215.6> Supervisor {<0.17215.6>,rabbit_amqp1_0_session_sup} had child writer started with rabbit_amqp1_0_writer:start_link(#Port<0.1441079>, 1, 1048568, rabbit_amqp1_0_framing, <0.17194.6>) at <0.17216.6> exit with reason no function clause matching rabbit_amqp1_0_binary_generator:generate({array,[{symbol,<<"amqp:accepted:list">>},{symbol,<<"amqp:rejected:list">>},{symbol,<<"amqp:releas...">>}]}) line 42 in context child_terminated
2018-04-06 16:31:21.987 [error] <0.17215.6> Supervisor {<0.17215.6>,rabbit_amqp1_0_session_sup} had child writer started with rabbit_amqp1_0_writer:start_link(#Port<0.1441079>, 1, 1048568, rabbit_amqp1_0_framing, <0.17194.6>) at <0.17216.6> exit with reason reached_max_restart_intensity in context shutdown
2018-04-06 16:31:21.987 [error] <0.17194.6> CRASH REPORT Process <0.17194.6> with 0 neighbours crashed with reason: no match of right hand value shutdown in rabbit_amqp1_0_reader:handle_dependent_exit/3 line 253
2018-04-06 16:31:21.987 [error] <0.17192.6> Supervisor {<0.17192.6>,rabbit_connection_sup} had child reader started with rabbit_reader:start_link(<0.17193.6>, {acceptor,{0,0,0,0,0,0,0,0},5672}, #Port<0.1441079>) at <0.17194.6> exit with reason no match of right hand value shutdown in rabbit_amqp1_0_reader:handle_dependent_exit/3 line 253 in context child_terminated
2018-04-06 16:31:21.988 [error] <0.17192.6> Supervisor {<0.17192.6>,rabbit_connection_sup} had child reader started with rabbit_reader:start_link(<0.17193.6>, {acceptor,{0,0,0,0,0,0,0,0},5672}, #Port<0.1441079>) at <0.17194.6> exit with reason reached_max_restart_intensity in context shutdown

test/proton/Makefile fails with "ERROR 404: Not Found"

The rabbitmq-amqp1.0/test/proton/Makefile fails with "ERROR 404: Not Found". It tries to download the proton 0.7 client from a mirror that stores newer versions only.

PROTON_VER=0.7
PROTON_URL=http://www.mirrorservice.org/sites/ftp.apache.org/qpid/proton/$(PROTON_VER)/$(PROTON_TARBALL)

However, the mirrorservice.org does not store v0.7 anymore. It stores 0.9, 0.9.1 and 0.10 only.

attach: crash if initial-delivery-count is not 0

Hey, running into another parsing error it seems. I was wondering if you could help me figure out what's going on with it from the rabbitmq side (if I can better understand why these errors are happening, I can hopefully start submitting more relevant issues 😄). The error is:

=INFO REPORT==== 26-Mar-2015::13:43:47 ===
accepting AMQP connection <0.2116.0> (192.168.1.100:52559 -> 192.168.1.104:5672)

=ERROR REPORT==== 26-Mar-2015::13:43:47 ===
** Generic server <0.2123.0> terminating
** Last message in was {'$gen_cast',
                           {frame,
                               {'v1_0.attach',
                                   {utf8,<<"/topic/inventory.cell.info_TX">>},
                                   {uint,0},
                                   false,
                                   {ubyte,2},
                                   {ubyte,0},
                                   {'v1_0.source',
                                       {utf8,<<"localhost">>},
                                       {uint,0},
                                       {symbol,<<"session-end">>},
                                       {uint,0},
                                       false,
                                       {map,[]},
                                       undefined,
                                       {map,[]},
                                       undefined,undefined,undefined},
                                   {'v1_0.target',
                                       {utf8,<<"/topic/inventory.cell.info">>},
                                       {uint,0},
                                       {symbol,<<"session-end">>},
                                       {uint,0},
                                       false,
                                       {map,[]},
                                       undefined},
                                   {map,[]},
                                   false,
                                   {uint,1},
                                   {ulong,0},
                                   undefined,undefined,
                                   {map,[]}},
                               <0.2116.0>}}
** When Server state == {state,<0.2126.0>,<0.2132.0>,4294967287,<0.2116.0>,
                            <0.2122.0>,
                            {[],[]},
                            {session,1,100,100,1,65535,65535,0,65535,65535,0,
                                0,
                                {0,nil},
                                {0,nil}}}
** Reason for termination == 
** {{case_clause,
        {ok,{'v1_0.target',
                {utf8,<<"/topic/inventory.cell.info">>},
                {uint,0},
                {symbol,<<"session-end">>},
                {uint,0},
                false,
                {map,[]},
                undefined},
            {incoming_link,
                {utf8,<<"/topic/inventory.cell.info_TX">>},
                <<"amq.topic">>,<<"inventory.cell.info">>,undefined,0,
                undefined,undefined,32768,[],
                {set,0,16,16,8,80,48,
                    {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                    {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}}}},
    [{rabbit_amqp1_0_incoming_link,attach,3,
         [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_incoming_link.erl"},
          {line,47}]},
     {rabbit_amqp1_0_session_process,with_disposable_channel,2,
         [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
          {line,377}]},
     {rabbit_amqp1_0_session_process,handle_control,2,
         [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
          {line,183}]},
     {rabbit_amqp1_0_session_process,handle_cast,2,
         [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
          {line,134}]},
     {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1034}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

=ERROR REPORT==== 26-Mar-2015::13:43:50 ===
closing AMQP connection <0.2116.0> (192.168.1.100:52559 -> 192.168.1.104:5672):
{handshake_error,running,<0.2123.0>,
    {{symbol,<<"amqp:internal-error">>},
     "Session error: ~p~n~p~n",
     [{case_clause,
          {ok,{'v1_0.target',
                  {utf8,<<"/topic/inventory.cell.info">>},
                  {uint,0},
                  {symbol,<<"session-end">>},
                  {uint,0},
                  false,
                  {map,[]},
                  undefined},
              {incoming_link,
                  {utf8,<<"/topic/inventory.cell.info_TX">>},
                  <<"amq.topic">>,<<"inventory.cell.info">>,undefined,0,
                  undefined,undefined,32768,[],
                  {set,0,16,16,8,80,48,
                      {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                      {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}}}}},
      [{rabbit_amqp1_0_incoming_link,attach,3,
           [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_incoming_link.erl"},
            {line,47}]},
       {rabbit_amqp1_0_session_process,with_disposable_channel,2,
           [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
            {line,377}]},
       {rabbit_amqp1_0_session_process,handle_control,2,
           [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
            {line,183}]},
       {rabbit_amqp1_0_session_process,handle_cast,2,
           [{file,"rabbitmq-amqp1.0/src/rabbit_amqp1_0_session_process.erl"},
            {line,134}]},
       {gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1034}]},
       {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]]}}

I think this might be a problem with an invalid match on initial_delivery_count? But to be frank I'm a bit lost, and trying to play catchup

Fail to connect to rabbitmq's AMQP 1.0 impl using a proton client

Trying to connect to the AMQP 1.0 impl using a proton client I get the following error:

=INFO REPORT==== 13-Mar-2013::16:09:06 ===
accepting AMQP connection <0.205.0> (127.0.0.1:34679 -> 127.0.0.1:5672)

=ERROR REPORT==== 13-Mar-2013::16:09:09 ===
closing AMQP connection <0.205.0> (127.0.0.1:34679 -> 127.0.0.1:5672):
{handshake_error,starting,0,
               {'v1_0.error',{symbol,"amqp:decode-error"},
                             {utf8,<<"response <<103,117,101,115,116,0,103,117,101,115,116,0>> invalid">>},
                             undefined}}

Any idea what's going wrong?

error while receiving transfer message

Hi
While server gets the transfer message following error is received :
** Reason for termination ==
** {function_clause,
[{rabbit_amqp1_0_message,to_expiration,
[{uint,1000}],
[{file,"src/rabbit_amqp1_0_message.erl"},{line,173}]},
{rabbit_amqp1_0_message,translate_header,2,
[{file,"src/rabbit_amqp1_0_message.erl"},{line,146}]},
{rabbit_amqp1_0_message,assemble,4,
[{file,"src/rabbit_amqp1_0_message.erl"},{line,35}]},
{rabbit_amqp1_0_message,assemble,1,
[{file,"src/rabbit_amqp1_0_message.erl"},{line,30}]},
{rabbit_amqp1_0_incoming_link,transfer,4,
[{file,"src/rabbit_amqp1_0_incoming_link.erl"},{line,147}]},
{rabbit_amqp1_0_session_process,handle_control,2,
[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,214}]},
{rabbit_amqp1_0_session_process,handle_cast,2,
[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,134}]},
{gen_server2,handle_msg,2,
[{file,"src/gen_server2.erl"},{line,1048}]}]}

heartbeat frames seem to cause connection failure

I'm working on a node amqp client and trying to run integration tests against rabbitmq, but it seems every time the internal heartbeat timer sends an empty (heartbeat) message rabbitmq immediately closes the connection, resulting in a local ECONNRESET.

Section 2.4.5 of the spec indicates that "Implementations MUST be prepared to handle empty frames arriving on any valid channel.." so this seems like a bug to me if that is indeed what is happening. To that end, I have no idea if that's actually what's happening because there isn't any trace information in the local rabbitmq log. Perhaps you could direct me as to how I might get more information?

rabbitmq-server with 1.0 plugin does not seem to honour flow control at all

In AMQP 1.0, transfers should not be made unless the receiving peer has granted credit. Using 3.6.16 of the 1.0 plugin, rabbitmq appears not to honour this.

To reproduce using the Apache Qpid proton python client, run the following against a queue containing messages ('examples' by default, so can use the simple_send.py example[1] to fill it up):

from __future__ import print_function
import optparse
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Recv(MessagingHandler):
    def __init__(self, url):
        super(Recv, self).__init__(prefetch=0)
        self.url = url

    def on_start(self, event):
        event.container.create_receiver(self.url)

    def on_message(self, event):
        print(event.message)

parser = optparse.OptionParser(usage="usage: %prog [options]")
parser.add_option("-a", "--address", default="localhost:5672/examples",
                  help="address from which messages are received (default %default)")
opts, args = parser.parse_args()

try:
    Container(Recv(opts.address)).run()
except KeyboardInterrupt: pass

This should never receive any messages as it does not issue credit.

Using PN_TRACE_FRM=1 when running that you see the following trace (queue populated with simple_send.py -m 5:

$ PN_TRACE_FRM=1 ./no_flow.py
[0x5626994241f0]:  -> SASL
[0x5626994241f0]:  <- SASL
[0x5626994241f0]:0 <- @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN, :AMQPLAIN]]
[0x5626994241f0]:0 -> @sasl-init(65) [mechanism=:ANONYMOUS, initial-response=b"[email protected]"]
[0x5626994241f0]:0 <- @sasl-outcome(68) [code=0]
[0x5626994241f0]:  -> AMQP
[0x5626994241f0]:0 -> @open(16) [container-id="d560cae9-d147-472c-9af4-da0278e4336e", hostname="localhost", channel-max=32767]
[0x5626994241f0]:0 -> @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=2147483647]
[0x5626994241f0]:0 -> @attach(18) [name="d560cae9-d147-472c-9af4-da0278e4336e-examples", handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], initial-delivery-count=0, max-message-size=0]
[0x5626994241f0]:  <- AMQP
[0x5626994241f0]:0 <- @open(16) [container-id="[email protected]", channel-max=32767, idle-time-out=60000, properties={:"cluster_name"="[email protected]", :copyright="Copyright (C) 2007-2018 Pivotal Software, Inc.", :information="Licensed under the MPL.  See http://www.rabbitmq.com/", :platform="Erlang/OTP 19.3.6.10", :product="RabbitMQ", :version="3.6.16"}]
[0x5626994241f0]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, incoming-window=65535, outgoing-window=65535, handle-max=4294967295]
[0x5626994241f0]:0 <- @attach(18) [name="d560cae9-d147-472c-9af4-da0278e4336e-examples", handle=0, role=false, snd-settle-mode=0, rcv-settle-mode=0, source=@source(40) [address="examples", durable=0, timeout=0, dynamic=false, default-outcome=@released(38) [], outcomes=@PN_SYMBOL[:"amqp:accepted:list", :"amqp:rejected:list", :"amqp:released:list"]], initial-delivery-count=0]
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=0, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x01", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x01\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x01"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=1, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x02", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x02\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x02"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=2, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x03", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x03\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x03"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=3, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x04", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x04\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x04"
[0x5626994241f0]:0 <- @transfer(20) [handle=0, delivery-id=4, delivery-tag=b"\x00\x00\x00\x00\x00\x00\x00\x05", message-format=0, settled=false, more=false, resume=false, aborted=false, batchable=false] (49) "\x00Sp\xc0\x06\x05B@@A@\x00Ss\xd0\x00\x00\x00\x06\x00\x00\x00\x01S\x05\x00Sw\xd1\x00\x00\x00\x10\x00\x00\x00\x02\xa1\x08sequenceT\x05"
Message{first_acquirer=1, id=1, body={"sequence"=1}}
Message{first_acquirer=1, id=2, body={"sequence"=2}}
Message{first_acquirer=1, id=3, body={"sequence"=3}}
Message{first_acquirer=1, id=4, body={"sequence"=4}}
Message{first_acquirer=1, id=5, body={"sequence"=5}}
[0x5626994241f0]:0 -> @disposition(21) [role=true, first=0, last=4, settled=true, state=@accepted(36) []]

As you can see, the client never send a flow performative, meaning no credit has been granted. The server therefore shoul not send transfer performatives.

[1] https://gitbox.apache.org/repos/asf?p=qpid-proton.git;a=blob;f=python/examples/simple_send.py;h=7717a16577c940e8fd9ad8650771f52f6e81ba8c;hb=HEAD

Unexpected trailing sections: v1_0.delivery_annotations

I'm getting the below error whenever I'm consuming messages from the cluster

** {{unexpected_trailing_sections,{{'v1_0.delivery_annotations',[]},<<0,83,114,193,1,0,0,83,115,208,0,0,0,18,0,0,0,3,64,64,161,10,116,101,115,116,45,113,117,101,117,101,0,83,116,193,1,0,0,83,119,161,5,104,101,108,108,111,0,83,120,193,1,0>>}},[{rabbit_amqp1_0_message,assemble,4,[{file,"src/rabbit_amqp1_0_message.erl"},{line,104}]},{rabbit_amqp1_0_message,assemble,1,[{file,"src/rabbit_amqp1_0_message.erl"},{line,21}]},{rabbit_amqp1_0_incoming_link,transfer,4,[{file,"src/rabbit_amqp1_0_incoming_link.erl"},{line,138}]},{rabbit_amqp1_0_session_process,handle_control,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,253}]},{rabbit_amqp1_0_session_process,handle_cast,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,173}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}
2020-09-27 19:01:26.332 [info] <0.1193.0> [{initial_call,{rabbit_amqp1_0_session_process,init,['Argument__1']}},{pid,<0.1193.0>},{registered_name,[]},{error_info,{exit,{{unexpected_trailing_sections,{{'v1_0.delivery_annotations',[]},<<0,83,114,193,1,0,0,83,115,208,0,0,0,18,0,0,0,3,64,64,161,10,116,101,115,116,45,113,117,101,117,101,0,83,116,193,1,0,0,83,119,161,5,104,101,108,108,111,0,83,120,193,1,0>>}},[{rabbit_amqp1_0_message,assemble,4,[{file,"src/rabbit_amqp1_0_message.erl"},{line,104}]},{rabbit_amqp1_0_message,assemble,1,[{file,"src/rabbit_amqp1_0_message.erl"},{line,21}]},{rabbit_amqp1_0_incoming_link,transfer,4,[{file,"src/rabbit_amqp1_0_incoming_link.erl"},{line,138}]},{rabbit_amqp1_0_session_process,handle_control,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,253}]},{rabbit_amqp1_0_session_process,handle_cast,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,173}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]},[{gen_server2,terminate,3,[{file,"src/gen_server2.erl"},{line,1183}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}},{ancestors,[<0.1191.0>,<0.1188.0>,<0.1186.0>,<0.1185.0>,<0.897.0>,<0.896.0>,<0.895.0>,rabbit_sup,<0.272.0>]},{message_queue_len,1},{messages,[{'DOWN',#Ref<0.2318329786.1542979586.132055>,process,<0.1202.0>,normal}]},{links,[<0.1191.0>]},{dictionary,[{gen_server_call_timeout,60000},{{in,{uint,0}},{incoming_link,{utf8,<<"test-channel">>},<<>>,<<"test-queue">>,undefined,0,undefined,{ubyte,0},32768,[],{set,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[<<"test-queue">>],[],[],[],[],[],[],[]}}}}},{{credit_to,<0.1187.0>},197},{credit_flow_default_credit,{400,200}}]},{trap_exit,true},{status,running},{heap_size,2586},{stack_size,28},{reductions,5639}], []
2020-09-27 19:01:26.333 [error] <0.1193.0> CRASH REPORT Process <0.1193.0> with 0 neighbours exited with reason: {unexpected_trailing_sections,{{'v1_0.delivery_annotations',[]},<<0,83,114,193,1,0,0,83,115,208,0,0,0,18,0,0,0,3,64,64,161,10,116,101,115,116,45,113,117,101,117,101,0,83,116,193,1,0,0,83,119,161,5,104,101,108,108,111,0,83,120,193,1,0>>}} in rabbit_amqp1_0_message:assemble/4 line 104 in gen_server2:terminate/3 line 1183
2020-09-27 19:01:26.334 [error] <0.1191.0> Supervisor {<0.1191.0>,rabbit_amqp1_0_session_sup} had child channel started with rabbit_amqp1_0_session_process:start_link({0,<0.1187.0>,<0.1192.0>,{user,<<"rabbit-user">>,[administrator],[{rabbit_auth_backend_internal,...}]},...}) at <0.1193.0> exit with reason {unexpected_trailing_sections,{{'v1_0.delivery_annotations',[]},<<0,83,114,193,1,0,0,83,115,208,0,0,0,18,0,0,0,3,64,64,161,10,116,101,115,116,45,113,117,101,117,101,0,83,116,193,1,0,0,83,119,161,5,104,101,108,108,111,0,83,120,193,1,0>>}} in rabbit_amqp1_0_message:assemble/4 line 104 in context child_terminated
2020-09-27 19:01:26.335 [error] <0.1191.0> Supervisor {<0.1191.0>,rabbit_amqp1_0_session_sup} had child channel started with rabbit_amqp1_0_session_process:start_link({0,<0.1187.0>,<0.1192.0>,{user,<<"rabbit-user">>,[administrator],[{rabbit_auth_backend_internal,...}]},...}) at <0.1193.0> exit with reason reached_max_restart_intensity in context shutdown
2020-09-27 19:01:29.332 [error] <0.1187.0> closing AMQP connection <0.1187.0> (10.1.7.1:60751 -> 10.1.7.199:5672):
{handshake_error,running,<0.1193.0>,{{symbol,<<"amqp:internal-error">>},"Session error: ~p~n~p~n",[{unexpected_trailing_sections,{{'v1_0.delivery_annotations',[]},<<0,83,114,193,1,0,0,83,115,208,0,0,0,18,0,0,0,3,64,64,161,10,116,101,115,116,45,113,117,101,117,101,0,83,116,193,1,0,0,83,119,161,5,104,101,108,108,111,0,83,120,193,1,0>>}},[{rabbit_amqp1_0_message,assemble,4,[{file,"src/rabbit_amqp1_0_message.erl"},{line,104}]},{rabbit_amqp1_0_message,assemble,1,[{file,"src/rabbit_amqp1_0_message.erl"},{line,21}]},{rabbit_amqp1_0_incoming_link,transfer,4,[{file,"src/rabbit_amqp1_0_incoming_link.erl"},{line,138}]},{rabbit_amqp1_0_session_process,handle_control,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,253}]},{rabbit_amqp1_0_session_process,handle_cast,2,[{file,"src/rabbit_amqp1_0_session_process.erl"},{line,173}]},{gen_server2,handle_msg,2,[{file,"src/gen_server2.erl"},{line,1067}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]]}}

here is the below kubernetes setup and config

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
data:
  enabled_plugins: |
    [rabbitmq_federation,rabbitmq_management,rabbitmq_peer_discovery_k8s,rabbitmq_amqp1_0].
  rabbitmq.conf: |
    loopback_users.guest = false
    listeners.tcp.default = 5672
    cluster_formation.peer_discovery_backend  = rabbit_peer_discovery_k8s
    cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
    cluster_formation.k8s.address_type = hostname
    cluster_formation.node_cleanup.only_log_warning = true
    ##cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
    ##cluster_formation.classic_config.nodes.1 = [email protected]
    ##cluster_formation.classic_config.nodes.2 = [email protected]
    ##cluster_formation.classic_config.nodes.3 = [email protected]
    ##rabbitmq_amqp1_0
# …

Not really sure what I missed here.

Make AMQP 1.0 plugin production worthy

Hi,

I am working at a large enterprise where we have architectural principles like conformity to standards & build flexible solutions.
Since AMQP 1.0 is OASIS standardized and nowadays widely supported by many client API's and message brokers I really thinks it is a pity that RabbitMQ only have this as "experimental" plugin. It should be seen as first class citizen in the RabbitMQ product.

Since it is "experimental" there are issues convincing management to use it in business critical solutions and not using it (instead using AMQP 0.9.1) introduces other issues like loosing flexibility (mixing RabbitMQ on-premise and Azure SB in the cloud).

I would strongly vote for making AMQP 1.0 plugin more complete by implementing support for the current major limitations like transactions and publish it as production ready.
Otherwise I think RabbitMQ will loose "market share" where competitors gain by support AMQP 1.0 (like Azure Service Bus, IBM MQ, Apache ActiveMQ and Apache Qpid)

Handshake error for AMQP 1.0

Hi,

I would like to use this plugin with rabbitmq 3.5, but I have the followed error message:

=INFO REPORT==== 28-Jul-2017::13:59:31 ===
accepting AMQP connection <0.5190.0> (172.17.0.1:59058 -> 172.17.0.2:5672)

=ERROR REPORT==== 28-Jul-2017::13:59:31 ===
** Generic server <0.5212.0> terminating
** Last message in was {'$gen_cast',
                        {frame,
                         {'v1_0.attach',
                          {utf8,
                           <<"qpid-jms:sender:ID:c69b06a5-ef8c-4795-a522-0503de00c95f:1:1:1:queue">>},
                          {uint,0},
                          false,
                          {ubyte,0},
                          {ubyte,0},
                          {'v1_0.source',
                           {utf8,
                            <<"ID:c69b06a5-ef8c-4795-a522-0503de00c95f:1:1:1">>},
                           {uint,0},
                           {symbol,<<"session-end">>},
                           {uint,0},
                           false,undefined,undefined,undefined,undefined,
                           {list,
                            [{symbol,<<"amqp:accepted:list">>},
                             {symbol,<<"amqp:rejected:list">>},
                             {symbol,<<"amqp:released:list">>},
                             {symbol,<<"amqp:modified:list">>}]},
                           undefined},
                          {'v1_0.target',
                           {utf8,<<"queue">>},
                           {uint,0},
                           {symbol,<<"session-end">>},
                           {uint,0},
                           false,undefined,
                           {list,[{symbol,<<"queue">>}]}},
                          undefined,false,
                          {uint,0},
                          undefined,undefined,
                          {list,[{symbol,<<"DELAYED_DELIVERY">>}]},
                          undefined},
                         <0.5190.0>}}
** When Server state == {state,<0.5215.0>,<0.5221.0>,1048568,<0.5190.0>,
                            <0.5211.0>,
                            {[],[]},
                            {session,1,2047,2147483647,1,65535,65535,0,65535,
                                65535,0,0,
                                {0,nil},
                                {0,nil}}}
** Reason for termination == 
** {function_clause,
       [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
            [{list,
                 [{symbol,<<"amqp:accepted:list">>},
                  {symbol,<<"amqp:rejected:list">>},
                  {symbol,<<"amqp:released:list">>},
                  {symbol,<<"amqp:modified:list">>}]}],
            []},
        {rabbit_amqp1_0_link_util,outcomes,1,[]},
        {rabbit_amqp1_0_incoming_link,attach,3,[]},
        {rabbit_amqp1_0_session_process,with_disposable_channel,2,[]},
        {rabbit_amqp1_0_session_process,handle_control,2,[]},
        {rabbit_amqp1_0_session_process,handle_cast,2,[]},
        {gen_server2,handle_msg,2,[]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

=CRASH REPORT==== 28-Jul-2017::13:59:31 ===
  crasher:
    initial call: gen:init_it/6
    pid: <0.5212.0>
    registered_name: []
    exception exit: {function_clause,
                        [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
                             [{list,
                                  [{symbol,<<"amqp:accepted:list">>},
                                   {symbol,<<"amqp:rejected:list">>},
                                   {symbol,<<"amqp:released:list">>},
                                   {symbol,<<"amqp:modified:list">>}]}],
                             []},
                         {rabbit_amqp1_0_link_util,outcomes,1,[]},
                         {rabbit_amqp1_0_incoming_link,attach,3,[]},
                         {rabbit_amqp1_0_session_process,
                             with_disposable_channel,2,[]},
                         {rabbit_amqp1_0_session_process,handle_control,2,[]},
                         {rabbit_amqp1_0_session_process,handle_cast,2,[]},
                         {gen_server2,handle_msg,2,[]},
                         {proc_lib,init_p_do_apply,3,
                             [{file,"proc_lib.erl"},{line,240}]}]}
      in function  gen_server2:terminate/3 
    ancestors: [<0.5210.0>,<0.5191.0>,<0.5189.0>,<0.5188.0>,
                  rabbit_tcp_client_sup,rabbit_sup,<0.73.0>]
    messages: []
    links: [<0.5210.0>]
    dictionary: [{credit_flow_default_credit,{200,50}},
                  {{credit_to,<0.5190.0>},48}]
    trap_exit: true
    status: running
    heap_size: 987
    stack_size: 27
    reductions: 533
  neighbours:

=SUPERVISOR REPORT==== 28-Jul-2017::13:59:31 ===
     Supervisor: {<0.5210.0>,rabbit_amqp1_0_session_sup}
     Context:    child_terminated
     Reason:     {function_clause,
                     [{rabbit_amqp1_0_link_util,'-outcomes/1-lc$^0/1-0-',
                          [{list,
                               [{symbol,<<"amqp:accepted:list">>},
                                {symbol,<<"amqp:rejected:list">>},
                                {symbol,<<"amqp:released:list">>},
                                {symbol,<<"amqp:modified:list">>}]}],
                          []},
                      {rabbit_amqp1_0_link_util,outcomes,1,[]},
                      {rabbit_amqp1_0_incoming_link,attach,3,[]},
                      {rabbit_amqp1_0_session_process,
                          with_disposable_channel,2,[]},
                      {rabbit_amqp1_0_session_process,handle_control,2,[]},
                      {rabbit_amqp1_0_session_process,handle_cast,2,[]},
                      {gen_server2,handle_msg,2,[]},
                      {proc_lib,init_p_do_apply,3,
                          [{file,"proc_lib.erl"},{line,240}]}]}
     Offender:   [{pid,<0.5212.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_amqp1_0_session_process,start_link,
                          [{1,<0.5190.0>,<0.5211.0>,
                            {user,<<"guest">>,
                                [administrator],
                                [{rabbit_auth_backend_internal,none}]},
                            <<"/">>,1048568,
                            {amqp_adapter_info,
                                {0,0,0,0,0,65535,44049,2},
                                5672,
                                {0,0,0,0,0,65535,44049,1},
                                59058,
                                <<"172.17.0.1:59058 -> 172.17.0.2:5672">>,
                                {'AMQP',"1.0"},
                                [{ssl,false}]},
                            <0.5192.0>}]}},
                  {restart_type,intrinsic},
                  {shutdown,4294967295},
                  {child_type,worker}]


=SUPERVISOR REPORT==== 28-Jul-2017::13:59:31 ===
     Supervisor: {<0.5210.0>,rabbit_amqp1_0_session_sup}
     Context:    shutdown
     Reason:     reached_max_restart_intensity
     Offender:   [{pid,<0.5212.0>},
                  {name,channel},
                  {mfargs,
                      {rabbit_amqp1_0_session_process,start_link,
                          [{1,<0.5190.0>,<0.5211.0>,
                            {user,<<"guest">>,
                                [administrator],
                                [{rabbit_auth_backend_internal,none}]},
                            <<"/">>,1048568,
                            {amqp_adapter_info,
                                {0,0,0,0,0,65535,44049,2},
                                5672,
                                {0,0,0,0,0,65535,44049,1},
                                59058,
                                <<"172.17.0.1:59058 -> 172.17.0.2:5672">>,
                                {'AMQP',"1.0"},
                                [{ssl,false}]},
                            <0.5192.0>}]}},
                  {restart_type,intrinsic},
                  {shutdown,4294967295},
                  {child_type,worker}]

My rabbitmq.config looks like this

[
	{ rabbit, [
		{ loopback_users, [ ] },
		{ tcp_listeners, [ 5672 ] },
		{ ssl_listeners, [ ] },
		{ hipe_compile, false }
	] },
	{ rabbitmq_management, [ { listener, [
		{ port, 15672 },
		{ ssl, false }
	] } ] },
	{rabbitmq_amqp1_0,[
	  {default_user,"guest"},
          {protocol_strict_mode, false},
          {default_vhost,<<"/">>}
	]}

]

Could you help me please?

Thanks in advance.

Queue Reconnect on different host is not working

Hi Team,
Reconnecting to the queue after restart is not happening.
Let me tell you the infrastructure we have.
RabbitMQ is installed in host A.
Worker using amqp client is installed in host B.
We have a durable queue REQUEST.

1. Now assume there are 100 messages in the queue and the client in host B is already connected and processing the messages, ie it is able to consume the messages.
2. Now to test durability we stopped RABBITMQ SERVICE which is running in host A.
3. Now the client in host B got the connection disconnected as the queue broker is shutdown.
4. When it fails the client keeps retrying to get the connection back, so the moment rabbitmq server comes up, it will get an connection.
5. Now the client is retrying...
6. We started the RABBITMQ SERVICE in host A.
7. Now the client in host B got a connection, declared the queue, and got binded to the exchange.
8. When the client tries to consume a message the amqp connection is terminated by the rabbitmq broker in host A.

Note: this is not the behaviour if we have the amqp client and rabbitmq in the same host.
          The reconnect works well if it is in the same machine...it fails it if is in a different host when we try to reconnect.
         During reconnect we always open a new factory connection.

          kindly let us know if this is an issue and if there is any work around.
          let me know if you need any further information.

Thanks & Regards,
Varun

Write sasl outcome on authentication failure

When using PLAIN sasl to authenticate the connection is just closed if the user or password are invalid. It would be nicer to return a sasl outcome frame with the code set to 1.

Incompatibility with Apache Qpid C++ API

RabbitMQ AMQP 1.0 errors out and closes the session when an Apache Qpid C++ client receive() call times out. The RabbitMQ server should not close the session when the Qpid C++ client receive() call times out.

I wrote this bug on QPID C++ API:
https://issues.apache.org/jira/browse/QPID-8347
I believe that some part of it may be related to the RabbitMQ server.

Source:

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>

#include <iostream>

using namespace qpid::messaging;

int main(int argc, char** argv) {
    std::string broker = argc > 1 ? argv[1] : "localhost:5672";
    std::cout << "broker: " << broker << std::endl;
    std::string address = argc > 2 ? argv[2] : "topic.hello.world";
    std::cout << "address: " << address << std::endl;
    std::string connectionOptions = argc > 3 ? argv[3] : "";
    std::cout << "connectionOptions: " << connectionOptions << std::endl;


    try {
        Connection connection(broker, connectionOptions);
        connection.open();
        Session session = connection.createSession();

        Receiver receiver = session.createReceiver(address);
        Message message;
        std::cout << "Pre Receive" << std::endl;
        message = receiver.fetch(Duration::SECOND * 10);
        std::cout << "Post Receive" << std::endl;
        session.acknowledge();

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cerr << error.what() << std::endl;
        return 1;
    }
}

Server Error (Occurs after 10 second timeout):

=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

=ERROR REPORT==== 23-Jul-2019::17:49:54 ===
closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):
{bad_version,{1,1,0,10}}

=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.675.0> terminating 
** Last message in was {send_command,
                           {'basic.credit_drained',
                               <<99,116,97,103,45,0,0,0,0>>,
                               1}}
** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,
                               {[],[]},
                               false,<0.678.0>,none,none,0,true,none,
                               {0,nil},
                               {0,nil},
                               true,false}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},
    [{amqp_channel,rpc_bottom_half,2,
                   [{file,"src/amqp_channel.erl"},{line,623}]},
     {amqp_channel,handle_method_from_server1,3,
                   [{file,"src/amqp_channel.erl"},{line,800}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,616}]},
     {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,686}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

=WARNING REPORT==== 23-Jul-2019::17:50:04 ===
Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,
                                                                         {empty,
                                                                          {[],
                                                                           []}}},
                                                                        [{amqp_channel,
                                                                          rpc_bottom_half,
                                                                          2,
                                                                          [{file,
                                                                            "src/amqp_channel.erl"},
                                                                           {line,
                                                                            623}]},
                                                                         {amqp_channel,
                                                                          handle_method_from_server1,
                                                                          3,
                                                                          [{file,
                                                                            "src/amqp_channel.erl"},
                                                                           {line,
                                                                            800}]},
                                                                         {gen_server,
                                                                          try_dispatch,
                                                                          4,
                                                                          [{file,
                                                                            "gen_server.erl"},
                                                                           {line,
                                                                            616}]},
                                                                         {gen_server,
                                                                          handle_msg,
                                                                          6,
                                                                          [{file,
                                                                            "gen_server.erl"},
                                                                           {line,
                                                                            686}]},
                                                                         {proc_lib,
                                                                          init_p_do_apply,
                                                                          3,
                                                                          [{file,
                                                                            "proc_lib.erl"},
                                                                           {line,
                                                                            247}]}]}

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.678.0> terminating
** Last message in was {'EXIT',<0.675.0>,
                           {{badmatch,{empty,{[],[]}}},
                            [{amqp_channel,rpc_bottom_half,2,
                                 [{file,"src/amqp_channel.erl"},{line,623}]},
                             {amqp_channel,handle_method_from_server1,3,
                                 [{file,"src/amqp_channel.erl"},{line,800}]},
                             {gen_server,try_dispatch,4,
                                 [{file,"gen_server.erl"},{line,616}]},
                             {gen_server,handle_msg,6,
                                 [{file,"gen_server.erl"},{line,686}]},
                             {proc_lib,init_p_do_apply,3,
                                 [{file,"proc_lib.erl"},{line,247}]}]}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,
                         <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,
                         {lstate,<0.677.0>,false},
                         none,1,
                         {[],[]},
                         {user,<<"guest">>,
                          [administrator],
                          [{rabbit_auth_backend_internal,none}]},
                         <<"/">>,<<>>,
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [[<0.583.0>|
                              {resource,<<"/">>,queue,
                               <<"topic.hello.world">>}]],
                            [],[],[],[],[],[]}}},
                         {state,
                          {dict,1,16,16,8,80,48,
                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                           {{[],[],[],[],[],[],[],[],[],
                             [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],
                             [],[],[],[],[],[]}}},
                          erlang},
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],
                            [[<<99,116,97,103,45,0,0,0,0>>|
                              {{amqqueue,
                                {resource,<<"/">>,queue,
                                 <<"topic.hello.world">>},
                                false,false,none,[],<0.583.0>,[],[],[],
                                undefined,[],[],live,0},
                               {false,65535,false,
                                [{<<"x-credit">>,table,
                                  [{<<"credit">>,long,0},
                                   {<<"drain">>,boolean,false}]}]}}]],
                            [],[],[],[]}}},
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [[<0.583.0>|
                              {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],
                            [],[],[],[],[],[]}}},
                         {set,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],
                            [<0.583.0>],
                            [],[],[],[],[],[]}}},
                         <0.672.0>,
                         {state,fine,5000,
                          #Ref<0.4039704202.3895984129.147347>},
                         false,1,
                         {{0,nil},{0,nil}},
                         [],
                         {{0,nil},{0,nil}},
                         [{<<"publisher_confirms">>,bool,true},
                          {<<"exchange_exchange_bindings">>,bool,true},
                          {<<"basic.nack">>,bool,true},
                          {<<"consumer_cancel_notify">>,bool,true},
                          {<<"connection.blocked">>,bool,true},
                          {<<"authentication_failure_close">>,bool,true}],
                         none,65535,none,flow,[]}
** Reason for termination == 
** {{badmatch,{empty,{[],[]}}},
    [{amqp_channel,rpc_bottom_half,2,
                   [{file,"src/amqp_channel.erl"},{line,623}]},
     {amqp_channel,handle_method_from_server1,3,
                   [{file,"src/amqp_channel.erl"},{line,800}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,616}]},
     {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,686}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}

Configuration:
OS: Ubuntu Bionic
QPID C++ version 1.39.0
QPID Proton version 0.28.0
RabbitMQ Broker version 3.6.10
Erlang version 20.2.2
RabbimtMQ AMQP 1.0 plugin Version 3.6.10

Publishing to a queue pre-declared with arguments fails

When I declare a queue with some arguments (e.g. x-max-length, or x-expires) and then try to publish to this queue via AMQP 1.0's amqp://localhost/queue, the publish fails and broker reports "PRECONDITION_FAILED - inequivalent arg".
This is because the AMQP 1.0 plug-in tries to declare the queue before publishing the message and declares it without any argument. Since the queue is already declared without any argument, the operation fails.
Should the amqp 1.0 plugin really declare the queue before publishing a message, it should ignore the error #406 because the queue might have been declared with other arguments.

Decoding issues: perhaps missing vbin8 support?

I, unfortunately, don't have a good reproducer for this but since I don't know much erlang, I prefer to post this here and you'll hopefully be able to track the issue down.

qpid-proton sends the body as a value section of type binary encoded as vbin8. By reading the log below, It seems that support for this econding is missing in the implementation. Is that correct?

** Reason for termination == 
** {function_clause,
       [{rabbit_amqp1_0_framing,fill_from_binary,
            [{'v1_0.amqp_value',undefined},
             <<"{\"request\": {\"args\": {}, \"method\": \"ping\"}, \"context\": {}}">>],
            []},
        {rabbit_amqp1_0_message,decode_section,1,[]},
        {rabbit_amqp1_0_message,assemble,4,[]},
        {rabbit_amqp1_0_message,assemble,1,[]},
        {rabbit_amqp1_0_incoming_link,transfer,4,[]},
        {rabbit_amqp1_0_session_process,handle_control,2,[]},
        {rabbit_amqp1_0_session_process,handle_cast,2,[]},
        {gen_server2,handle_msg,2,[]}]}
=ERROR REPORT==== 15-Jun-2015::10:24:24 ===
** Generic server <0.27360.0> terminating
** Last message in was {'$gen_cast',
                           {frame,
                               {{'v1_0.transfer',
                                    {uint,0},
                                    {uint,0},
                                    {binary,<<"pyngus-tag-0">>},
                                    {uint,0},
                                    false,false,undefined,undefined,undefined,
                                    undefined,undefined},
                                <<0,83,112,208,0,0,0,11,0,0,0,5,66,80,4,64,66,
                                  82,0,0,83,115,208,0,0,0,99,0,0,0,13,64,64,
                                  161,64,47,101,120,99,104,97,110,103,101,47,
                                  97,109,113,46,102,97,110,111,117,116,47,46,
                                  98,46,99,51,101,53,99,53,98,98,45,102,50,53,
                                  49,45,52,54,102,102,45,97,48,53,98,45,49,
                                  100,100,102,55,57,54,49,52,100,57,98,46,97,
                                  108,108,64,64,64,64,64,131,0,0,0,0,0,0,0,0,
                                  131,0,0,0,0,0,0,0,0,64,82,0,64,0,83,119,160,
                                  58,123,34,114,101,113,117,101,115,116,34,58,
                                  32,123,34,97,114,103,115,34,58,32,123,125,
                                  44,32,34,109,101,116,104,111,100,34,58,32,
                                  34,112,105,110,103,34,125,44,32,34,99,111,
                                  110,116,101,120,116,34,58,32,123,125,125>>},
                               <0.27088.0>}}
** When Server state == {state,<0.27363.0>,<0.27369.0>,unlimited,<0.27088.0>,
                            <0.27359.0>,
                            {[],[]},
                            {session,4,2147483647,0,0,65535,65535,0,65535,
                                65535,1,0,
                                {0,nil},
                                {0,nil}}}

=ERROR REPORT==== 15-Jun-2015::10:24:27 ===
closing AMQP connection <0.27088.0> (127.0.0.1:60998 -> 127.0.0.1:5672):
{handshake_error,running,<0.27360.0>,
    {{symbol,<<"amqp:internal-error">>},
     "Session error: ~p~n~p~n",
     [function_clause,
      [{rabbit_amqp1_0_framing,fill_from_binary,
           [{'v1_0.amqp_value',undefined},
            <<"{\"request\": {\"args\": {}, \"method\": \"ping\"}, \"context\": {}}">>],
           []},
       {rabbit_amqp1_0_message,decode_section,1,[]},
       {rabbit_amqp1_0_message,assemble,4,[]},
       {rabbit_amqp1_0_message,assemble,1,[]},
       {rabbit_amqp1_0_incoming_link,transfer,4,[]},
       {rabbit_amqp1_0_session_process,handle_control,2,[]},
       {rabbit_amqp1_0_session_process,handle_cast,2,[]},
       {gen_server2,handle_msg,2,[]}]]}}

UPDATE

Here's a reproducer:

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class HelloWorld(MessagingHandler):
    def __init__(self, server, address):
        super(HelloWorld, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(self.server, reconnect=False)
        event.container.create_receiver(conn, self.address)
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):
        event.sender.send(Message(body="Hello World"))
        event.sender.close()

    def on_message(self, event):
        print event.message.body
        event.connection.close()

Container(HelloWorld("127.0.0.1:5672", "examples")).run()

The above code will fail. If instead we use u"Hello World" instead of "Hello World" (note the unicode prefix), the script works as expected.

No custom fields can be added to the head of the properties of an Amqp 1.0 message.

Hello -

I am trying to connect and send message from QPID JMS client to RabbitMQ with AMQP 1.0 plugin.I could send message to a queue.
However,
I can't add custom fields to the head of the properties of an AMQP 1.0 message.But I could achieve to add custom fields when I send AMQP 0.9.1 message with using Jmeter.
You can see the custom fields the following image;

-AMQP 0.9.1
image

My AMQP 1.0 clinet's code is below to set the properties and the headers of the properties;

`JmsMapMessage message = (JmsMapMessage) session.createMapMessage();

AmqpJmsMessageFacade messageFacade = (AmqpJmsMessageFacade) message.getFacade();

Properties properties = new Properties();

properties.setReplyTo("adem1");
properties.setContentType(Symbol.getSymbol("application/json"));

properties.set_headers("arg0111");//I added this method to Properties class(org.apache.qpid.proton.amqp.messaging.Properties)
//And I am not sure this because of this method is true way to set the headers.

messageFacade.setProperties(properties);

messageProducer.send(message);`

The messages is received to the queue but header is not set.
-AMQP 1.0
image

So Does RabbitMq AMQP 1.0 plugin crush or override the sent fields of the header?
How did you test this scenario?

Thank you for your supporting.

Sender (proton) trying to creating exchange gets error: no exchange in vhost "/"

We use qpid proton (c++) to create a sender and set address "/exchange/TopicGPS/GPS". The exchange TopicGPS does not exist before. Then we get an error from Rabbitmq: no exchange in vhost "/".

if a sender publishes messages to an exchange, should the exchange be pre-created by user manually, or it can be automatically?

Different AMQP protocols

Hello,

I am on v.3.6.5, Erlang v.19.1, AMQP Plugin 1.0. I have manually created a virtual host through the Admin console, but under the Connections tab, I see that my virtual host has a version that is different from the "/" host:

"/" is AMQP 1.0
bep_events_host is AMQP 0-9-1

I don't understand why the virtual host has a different version, is this expected behavior?

I'm using this with Informatica Cloud ICRT and it's not able to connect with my virtual host "bep_events_host", but seems to be able to connect to the "/" , so I'm wondering if it's due to the mismatched versions.

I also noticed a reference to a patch on this trail: om//issues/31#issuecomment-234960297 and I was wondering how I could get a hold of this patch, perhaps it will resolve my issue.

Thank you in advance,
Kim

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.