Giter Site home page Giter Site logo

amqpnetlite's Introduction

AMQP.Net Lite

AMQP.Net Lite is a lightweight AMQP 1.0 library for the .Net Framework, .Net Core, Windows Runtime platforms, .Net Micro Framework, .NET nanoFramework and Mono. The library includes both a client and listener to enable peer to peer and broker based messaging.
Documentation

Build status

NuGet Package Status
AMQPNetLite (main package) NuGet Version and Downloads count
AMQPNetLite.Core (.Net Core) NuGet Version and Downloads count
AMQPNetLite.Serialization (.Net Core) NuGet Version and Downloads count
AMQPNetLite.WebSockets (.Net Core) NuGet Version and Downloads count
AMQPNetLite.NetMF (NETMF) NuGet Version and Downloads count
AMQPNetMicro (NETMF) NuGet Version and Downloads count
AMQPNetLite.nanoFramework (nanoFramework) NuGet Version and Downloads count
AMQPNetMicro.nanoFramework (nanoFramework) NuGet Version and Downloads count

Features

  • Full control of AMQP 1.0 protocol behavior.
  • Peer-to-peer and brokered messaging.
  • Secure communication via TLS and SASL.
  • Extensible transport providers.
  • Sync and async API support.
  • Listener APIs to enable wide range of listener applications, including brokers, routers, proxies, and more.
  • A lightweight messaging library that runs on all popular .NET and Windows Runtime platforms.

The following table shows what features are supported on each platform/framework.

TLS SASL2 Txn Task Serializer Listener WebSockets BufferPooling
net45 + + + + + + + +
net40 + + + +3 + + +
net35 + + +
netmf   +1 +    
nanoFramework + +    
uap10 + + +
netcore451 + + +
wpa81 + + +
win8/wp8 + + +
netstandard1.34 + + + + + + +
mono/Xamarin5 + + + + + + +
  1. requires a TLS-capable device.
  2. only SASL PLAIN, EXTERNAL, and ANONYMOUS are currently supported.
  3. requires Microsoft.Bcl.Async.
  4. has 3 packages. Supports WebSocket client but not listener.
  5. projects targeting Mono/Xamarin should be able to consume the netstandard1.3 library.

Tested Platforms

  • .Net Framework 3.5, 4.0 and 4.5+.
  • .NET Micro Framework 4.2, 4.3, 4.4.
  • .NET nanoFramework 1.0.
  • .NET Compact Framework 3.9.
  • Windows Phone 8 and 8.1.
  • Windows Store 8 and 8.1. Universal Windows App 10.
  • .Net Core 1.0 on Windows 10 and Ubuntu 14.04.
  • Mono on Linux (requires v4.2.1 and up. Only the client APIs are verified and state of the listener APIs is unknown).

Getting Started

  • Quick Start Build applications from simple to complex.
  • Examples Please take a minute to look at the examples.
  • .Net Core If you are looking for information about using amqpnetlite on .Net Core (coreclr, dnxcore50, etc.), your can find the code and a Hello AMQP! example here.
  • Interested in the code? Clone and build the projects.

Contributing

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

If you would like to become an contributor to this project please follow the instructions provided in Microsoft Azure Projects Contribution Guidelines.

References

For more information about the Azure Service Bus and AMQP, refer to:

amqpnetlite's People

Contributors

binzywu avatar bmesetovic avatar clebertsuconic avatar georgeonofrei avatar havret avatar herecydev avatar jiridanek avatar johansme avatar joseandresc avatar josesimoes avatar jwfx avatar lukeabsent avatar markushorstmann avatar mbroadst avatar microsoft-github-policy-service[bot] avatar nfbot avatar nicodeslandes avatar petero-dk avatar petertiedemann avatar ppatierno avatar ramtinkermani avatar rgsholmesr avatar robreeves avatar rusuionut21 avatar scholzj avatar szehetner avatar tabish121 avatar woppa684 avatar xinchen10 avatar zmhh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqpnetlite's Issues

Unique link names are not enforced

According to section 2.6.1 of the AMQP 1.0 specification all links in the same direction between two containers must have the a unique name. This is not enforced in ContainerHost/ConnectionListener. I would expect a second link with the same name, in the same direction to be rejected.

If a connection and session is used per link they both are attached and can send data. Here is a test to reproduce it.

    [TestMethod]
    public void TwoLinksSameName_SeparateConnection()
    {
        const string MsgProcName = "msg_processor";
        const string LinkName = "duplicate name";
        const string Endpoint = "amqp://guest:[email protected]:5762";

        var processor = new Mock<IMessageProcessor>();

        processor.Setup(proc => proc.Credit).Returns(300);

        processor.Setup(proc => proc.Process(It.IsAny<MessageContext>()))
            .Callback<MessageContext>(msg =>
            {
                Console.WriteLine("Name: {0}, Handle: {1}", msg.Link.Name, msg.Link.Handle);
                msg.Complete();
            });

        var uri = new Uri(Endpoint);
        var host = new ContainerHost(new List<Uri> { uri }, null, uri.UserInfo);
        host.RegisterMessageProcessor(MsgProcName, processor.Object);
        host.Open();

        var conn1 = new Connection(new Address(Endpoint));
        var sess1 = new Session(conn1);
        var link1 = new SenderLink(sess1, LinkName, MsgProcName);
        link1.Send(new Message("1"), 5000); //output in msg processor: Name: duplicate name, Handle: 0

        var conn2 = new Connection(new Address(Endpoint));
        var sess2 = new Session(conn2);
        var link2 = new SenderLink(sess2, LinkName, MsgProcName);
        link2.Send(new Message("2"), 5000); //output in msg processor: Name: duplicate name, Handle: 0

        link1.Send(new Message("12"), 5000); //output in msg processor: Name: duplicate name, Handle: 0
        link2.Send(new Message("22"), 5000); //output in msg processor: Name: duplicate name, Handle: 0
    }

If two links share a session the second link attaches, but hangs and eventually times out when it tries to send data.

    [TestMethod]
    public void TwoLinksSameName_SharedSession()
    {
        const string MsgProcName = "msg_processor";
        const string LinkName = "duplicate name";
        const string Endpoint = "amqp://guest:[email protected]:5762";

        var processor = new Mock<IMessageProcessor>();

        processor.Setup(proc => proc.Credit).Returns(300);

        processor.Setup(proc => proc.Process(It.IsAny<MessageContext>()))
            .Callback<MessageContext>(msg => 
                {
                    Console.WriteLine("Name: {0}, Handle: {1}", msg.Link.Name, msg.Link.Handle);
                    msg.Complete();
                });

        var uri = new Uri(Endpoint);
        var host = new ContainerHost(new List<Uri> { uri }, null, uri.UserInfo );
        host.RegisterMessageProcessor(MsgProcName, processor.Object);
        host.Open();

        var conn = new Connection(new Address(Endpoint));
        var sess = new Session(conn);
        var link1 = new SenderLink(sess, LinkName, MsgProcName);
        var link2 = new SenderLink(sess, LinkName, MsgProcName);

        link1.Send(new Message("a"), 5000);
        link2.Send(new Message("a"), 5000); //times out
    }

The TcpTransport leaks memory when using SSL

Here are the steps that cause the leak.

  1. TcpTransport.Writer.Write(ByteBuffer) is called. It is not currently writing anything so this.writing is false and the buffer is not added to this.bufferQueue. The buffer is added to this.bufferInProgress.

  2. TcpTransport.SslSocket.SendAsync is called. This calls SslStream.WriteAsync. At the time the method checks the SslStream.WriteAsync return task it is not completed so a continuation is added to the task that calls TcpTransport.Writer.ContinueWriting().

  3. TcpTransport.SslSocket.SendAsync returns to TcpTransport.Writer.Write(ByteBuffer), which does not call TcpTransport.Writer.DisposeWriteBuffers() because pending is true.

  4. When the SslStream.WriteAsync task finishes TcpTransport.Writer.ContinueWriting() is called.
    This returns before calling TcpTransport.Writer.ContinueWriting() because bufferQueue is empty.

This means bufferInProgress does not get cleared. The only way this leak is mitigated is if we get lucky and at the time the SslStream.WriteAsync task is checked, it is completed. Or two writes occur at once and the second message gets added to bufferQueue.

This was a tough one to explain in words so let me know if this is unclear.

Cancelable SenderLink.Send method

I think the library could benefit from the SenderLink.Send methods to be cancelable. For example I may be in the middle of sending a large message and receive a stop request of some sorts (i.e. a Windows service stop). I think it would also be helpful for SenderLink.SendAsync to be cancelable.

What do you think? Is this a feature you'd be open to?

Here is a quick example of what I mean with some borrowed code from the synchronous SenderLink.Send.

public static class SenderLinkExtensions
{
    public static void Send(this SenderLink sender, Message message, CancellationTokenSource cancelToken)
    {
        using (var mres = new ManualResetEvent(false))
        {
            Outcome outcome = null;
            OutcomeCallback callback = (m, o, s) =>
            {
                outcome = o;
                mres.Set();
            };

            sender.Send(message, callback, null);

            while (true)
            {
                //Check if canceled
                //TODO - remove message from outgoingList if canceled?
                cancelToken.Token.ThrowIfCancellationRequested();

                if (mres.WaitOne(100))
                {
                    //Handle error outcomes
                    if (outcome != null)
                    {
                        if (outcome.Descriptor.Code == 38)
                        {
                            Released released = (Released)outcome;
                            throw new AmqpException(ErrorCode.MessageReleased, null);
                        }
                        else if (outcome.Descriptor.Code == 37)
                        {
                            Rejected rejected = (Rejected)outcome;
                            throw new AmqpException(rejected.Error);
                        }
                    }

                    return;
                }
            }
        }
    }
}

SenderLink.Send times out if the message does not set the body

If a sender link sends a message without a body or headers using the Message default constructor, it will timeout. I believe this is because in Session.WriteDelivery the code inside of the while loop is never hit because delivery.Buffer.length > 0 returns false on the first pass. Therefore the callback in Session.Send is never called and it waits at SenderLink.cs line 103 until the timeout is reached.

Admittedly this is an edge case and there is no point to send a message like this, but it would be nice to handle this in a more user friendly way. Session.WriteDelivery could check for this case and throw an exception for example.

Here is a test case to reproduce the issue with the TestAqmpBroker.

    public void TestMethod_SendNoBody()
    {
        string testName = "NoMessageBody";

        Connection connection = new Connection(address);
        Session session = new Session(connection);
        SenderLink sender = new SenderLink(session, "sender-" + testName, "q1");

        sender.Send(new Message());

        sender.Close();
        session.Close();
        connection.Close();
    }

SenderLink can only send a finite number of large messages and then starts throwing TimeoutExceptions

I am seeing some unexplainable behavior. I have a SenderLink repeatedly sending large messages (larger than the frame size) to a peer. After sending a certain amount of messages the SenderLink will no longer send messages and throws a TimeoutException. The number of messages is always the same. For example I can send 31 1MB messages before SenderLink no longer sends messages. I can send 62 512kB messages before sending link stops working. Once I get down to 128kB message sizes it seems to work indefinitely. If I restart the client while leaving the server running it can send another N messages before seeing the issue again. Is this a bug? Am I doing something wrong?

As always, thanks for the help. Let me know if you need anymore information. This is a very important issue to resolve for us.

Here is the code for the client with the issue.

class Program
{
    static void Main(string[] args)
    {
        Trace.TraceLevel = TraceLevel.Frame;
        Trace.TraceListener = (f, a) =>
        {
            Console.WriteLine(DateTime.Now.ToString("[hh:ss.fff]") + " " + string.Format(f, a));
        };

        var connection = new Connection(new Address("amqp://guest:guest@localhost:5762"));
        var session = new Session(connection);
        var sender = new SenderLink(session, "link-name", "data");

        int count = 0;
        while (true)
        {
            int arrayLength = 1024 * 1024; //TimeoutException after 31 messages
            //int arrayLength = 1024 * 512; //TimeoutException after 62 messages
            //int arrayLength = 1024 * 256; //TimeoutException after 120 messages
            //int arrayLength = 1024 * 128; //No issues

            sender.Send(new Message(new byte[arrayLength]), 10000);
            Console.WriteLine(++count);
        }
    }
}

Here is the trace for the two messages sent before the TimeoutException (30 and 31).

30
[05:53.200] SEND (ch=0) transfer(handle:0,delivery-id:30,delivery-tag:0000001E,m
essage-format:0,settled:False,more:True,batchable:True) payload 16353
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.200] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.216] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.231] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.247] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.262] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.263] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.263] SEND (ch=0) transfer(handle:0) payload 1299
[05:53.295] RECV (ch=0) disposition(role:True,first:30,settled:True,state:accept
ed())
31
[05:53.295] SEND (ch=0) transfer(handle:0,delivery-id:31,delivery-tag:0000001F,m
essage-format:0,settled:False,more:True,batchable:True) payload 16353
[05:53.295] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.310] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.326] SEND (ch=0) transfer(handle:0,more:True) payload 16364
[05:53.342] SEND (ch=0) transfer(handle:0,more:True) payload 16364

Here is the server code.

class Program
{
    static void Main(string[] args)
    {
        Trace.TraceLevel = TraceLevel.Frame;
        Trace.TraceListener = (f, a) => 
            {
                Console.WriteLine(DateTime.Now.ToString("[hh:ss.fff]") + " " + string.Format(f, a));
            };

        var uri = new Uri("amqp://guest:guest@localhost:5762");
        var host = new ContainerHost(new List<Uri>() { uri }, null, uri.UserInfo);
        host.RegisterMessageProcessor("data", new MsgProc());
        host.Open();

        Console.WriteLine("running");
        Console.ReadLine();
    }
}

class MsgProc : IMessageProcessor
{
    public int Credit
    {
        get { return 300; }
    }

    public void Process(MessageContext messageContext)
    {
        Console.WriteLine("------------------------Message------------------------------");
        Console.WriteLine("Body Length: {0}", messageContext.Message.GetBody<byte[]>().Length);
        Console.WriteLine("----------------------Message End----------------------------");

        messageContext.Complete();
    }
}

Here is the server frame trace for the last two messages.

------------------------Message------------------------------
Body Length: 1048576
----------------------Message End----------------------------
[06:16.750] SEND (ch=0) disposition(role:True,first:29,settled:True,state:accept
ed())
[06:16.750] RECV (ch=0) transfer(handle:0,delivery-id:30,delivery-tag:0000001E,m
essage-format:0,settled:False,more:True,batchable:True) payload 16353
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.765] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.781] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.797] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.812] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.828] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.844] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.859] RECV (ch=0) transfer(handle:0) payload 1299
------------------------Message------------------------------
Body Length: 1048576
----------------------Message End----------------------------
[06:16.859] SEND (ch=0) disposition(role:True,first:30,settled:True,state:accept
ed())
[06:16.859] RECV (ch=0) transfer(handle:0,delivery-id:31,delivery-tag:0000001F,m
essage-format:0,settled:False,more:True,batchable:True) payload 16353
[06:16.859] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.859] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.859] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.875] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.890] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364
[06:16.906] RECV (ch=0) transfer(handle:0,more:True) payload 16364

GC

I think here: https://github.com/Azure/amqpnetlite/blob/master/src/Net/AsyncPump.cs#L59 could benefit from reusing memory as opposed to newing each time. I can send a PR for this and could use one of the libraries MS includes https://msdn.microsoft.com/en-us/library/ms405814.aspx but this is not available on all of the supported platforms. I could also include what we use https://github.com/EventStore/EventStore/blob/release-v3.2.0/src/EventStore.BufferManagement/BufferManager.cs but I am not sure this PR would be accepted at that point since I would be including code copied from a BSD project.

Thoughts?

custom types using AmqpContract

I see that you can use AmqpContract and AmqpMember in order to make serializable classes to send over a sender link, however, it seems that they only serialize out to arrays/lists of values. Is it possible to serialize a class as a Map instead?

Add support for LocalCertificateSelectionCallback

Currently, the SSL client authentication seems to work only when the client certificate matches one of the certification authorities published by the broker. However, in some situations, one might want to select the SSL certificate which should be used for client authentication manually. That can be done using the LocalCertificateSelectionCallback.

The callback has to be added to the SSLSettings in ConnectionFactory and used later when creating the SSL stream in TcpTransport.

This would allow to use the callback when needed:
factory.SSL.LocalCertificateSelectionCallback = (a, b, c, d, e) => X509Certificate.CreateFromCertFile(fileName);

Existing clients can still send data when the ContainerHost is closed

If the ContainerHost is closed existing clients can still send data. Is this the expected behavior? If so I think there should be an option to close all existing connections when the host is closed. In the case that we close the listeners because we are shutting the broker down we need to make sure that we stop all incoming data so the broker resources can be cleaned up. Here is an example to reproduce the behavior. Thanks for the help.

[TestClass]
class TestClass
{
    private const string Endpoint = "amqp://guest:guest@localhost:5762";
    private static readonly Uri Uri = new Uri(Endpoint);
    private static readonly Address Address = new Address(Endpoint);
    private const string MsgQueue = "queue";
    private const string LinkName = "link";


    [TestMethod]
    public void Reconnection_ServerGoesDownBeforeSendingData2()
    {
        var host = new ContainerHost(new List<Uri>() { Uri }, null, Uri.UserInfo);
        host.RegisterMessageProcessor(MsgQueue, new MessageProcessor());
        host.Open();

        var connection = new Connection(new Address(Endpoint));
        var session = new Session(connection);
        var sender = new SenderLink(session, LinkName, MsgQueue);

        host.Close();

        sender.Send(new Message("a"));

        sender.Close();
        session.Close();
        connection.Close();
    }
}

class MessageProcessor : IMessageProcessor
{

    public int Credit
    {
        get { return 300; }
    }

    public void Process(MessageContext messageContext)
    {
        Console.WriteLine("Message received: {0}", messageContext.Message.Body as string);
        messageContext.Complete();
    }
}

Connection to ipv4 host from Azure does not work

When connecting to amqp host that uses ipv4, a connection cannot be established from Azure (Cloud service).

I changed to the following in \Net\TcpTransport.cs:66
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

By providing AddressFamily the connection worked. Can you add support for different address families?

Regards

keeping the link active

I have implemented a sender queue and a receiver queue for command and control of a device, however if nothing is sent over the session for a period of time (which would be likely if nothing is wrong with the device) then the link seems to close. Does AMQPnetLite have anything like a heartbeat flag to keep the connection alive or do I have to send something a packet on a regular basis to keep the link open?

Support for lists and dictionary message bodies

Does the library support messages with a body of type list or dictionary? It seems like natural fit since the AMQP specification defines a list and map in its type system. The following tests both throw NullReferenceExceptions.

    [TestMethod]
    public void AmqpListTest()
    {
        var inputList = new List<string>() { "abc", "def" };
        var inputMessage = new Message(inputList);

        //Simulate sending the message
        var msgBytes = inputMessage.Encode();
        var outputMessage = Message.Decode(msgBytes);

        var outputList = outputMessage.GetBody<List<string>>(); //throws NullReferenceException
        CollectionAssert.AreEqual(inputList, outputList);
    }

    [TestMethod]
    public void AmqpDictionaryTest()
    {
        var inputDict = new Dictionary<string, string>() { { "a", "1" }, { "b", "2" } };
        var inputMessage = new Message(inputDict);

        //Simulate sending the message
        var msgBytes = inputMessage.Encode();
        var outputMessage = Message.Decode(msgBytes);

        var outputDict = outputMessage.GetBody<Dictionary<string, string>>(); //throws NullReferenceException
        CollectionAssert.AreEqual(inputDict, outputDict);
    }

Right now my workaround options are to manually iterate over the list or map body and create the typed list and dictionary object myself or wrap the collection in an AMQP serializable type.

    [AmqpContract]
    public class CollectionWrapper
    {
        [AmqpMember]
        public List<string> Collection { get; set; }
    }

Message decoding requires previous warmup for cached types

Hello community,

in reference to my last issue "Amqp.Message is not serializable #72" I have thank you for your fast support. The library is also very helpful, great work!
Today I have found a some behaviour which was a little bit painful for me and I didn't found any documentation about it. It took me some time to figure out that the library needs a "warmup" to prepare cached types for the internal serializers. The problem occur when you encode data by using message.Encode() and try to decode it back in another application instance. In this case the static Message Decode method was not able to cast Encoder.ReadObject(buffer) to RestrictedDescribed.
Maybe you can extend the documentation for this case. I think a additional generic method to register custom types could also be helpful.

Kind regards

ctetzlaff

can't connect to arbitrary ip address (net35)

I have a sample piece of code like so:

string address = "amqp://guest:[email protected]:5672";

Connection connection = new Connection(new Address(address));
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "message-client", "message_processor");

for (int i = 0; i < nMsgs; ++i)
{
    Message message = new Message("hello");
    message.Properties = new Properties() { MessageId = "msg" + i };
    message.ApplicationProperties = new ApplicationProperties();
    message.ApplicationProperties["sn"] = i;
    sender.Send(message);
    Console.WriteLine("Sent message {0} body {1}", message.Properties, message.Body);
}

sender.Close();
session.Close();
connection.Close();

and consistently get the following exception:

System.Net.Sockets.SocketException (0x80004005): No such host is known
   at System.Net.Dns.InternalGetHostByAddress(IPAddress address, Boolean includeIPv6)
   at System.Net.Dns.GetHostEntry(String hostNameOrAddress)
   at Amqp.TcpTransport.Connect(Connection connection, Address address, Boolean noVerification) in d:\Source\Repos\amqpn
etlite-git\src\Net35\TcpTransport.cs:line 32
   at Amqp.Connection.Connect(SaslProfile saslProfile, Open open) in d:\Source\Repos\amqpnetlite-git\src\Connection.cs:l
ine 265
   at Amqp.Connection..ctor(Address address, SaslProfile saslProfile, Open open, OnOpened onOpened) in d:\Source\Repos\a
mqpnetlite-git\src\Connection.cs:line 120
   at Amqp.Connection..ctor(Address address) in d:\Source\Repos\amqpnetlite-git\src\Connection.cs:line 87
   at ConsoleApplication3.Program.Main(String[] args) in c:\users\mbroadst\documents\visual studio 2010\Projects\Console
Application3\ConsoleApplication3\Program.cs:line 76

Release NuGet package

There have been several changes that would be worth a new NuGet package I think. For example:

#44
#38
#21

Could you release a new version please?

Convert to .net 4.0

I'm trying to convert the Amqp.net project (ContainerHost class and all dependencies) to .net 4.0. I will open a pull request if I can complete this task. Are there some 4.5 features that are you using in it and that you think that are difficult to substitute?
Thanks.

Access to the ContainerHost link creation

Requested by user on CodePlex:
"Is there a way to tie into the link creation of the Amqp.Listener.ContainerHost type? Here is my use case. This application will be acting as an AMQP server. There will be many client connections. Each client will get its own partition of a particular resource on the server. Since the AMQP 1.0 specification states that a link ID must be unique (section 2.6.1) I was thinking of using the link creation event to automatically partition the client resources with its link ID."

SharedLinkEndpoint does not check if the semaphore was entered correctly in Listener.ContainerHost example

SharedLinkEndpoint uses a SemaphoreSlim to limit concurrency in the SharedLinkEndpoint example. It checks to see if the task is canceled or faulted and assumes that if so then the semaphore was not entered. That is incorrect. According to the SemaphoreSlim.WaitAsync documentation if the semaphore is not entered it returns a task with the result set to false.

https://msdn.microsoft.com/en-us/library/hh462740(v=vs.110).aspx

Message.Properties enumerable like a dictionary

My AzureSBLite library that wraps the AMQP .Net Lite for Service Bus access (queues, topics/subscriptions and event hubs) has some methods to convert a BrokeredMessage and EventData instance from/to the Message class.

As you know, the correlation between properties (both application and system properties) is the following :

BrokeredMessage/EventData.Properties <--> Message.ApplicationProperties
BrokeredMessage/EventData.SystemProperties <--> Message.Properties

The Message.ApplicationProperties is enumerable thanks to the exposed Map field but it's not possible for the Message.Properties field that is a DescribedList.
Could be possible to expose Message.Properties like a dictionary to enumerate it ?

Thanks,
Paolo

error deserializing SimpleMap type

Hi, I have the following type I'm trying to deserialize from a node producer:

        [AmqpContract(Encoding = EncodingType.SimpleMap)]
        class DisplayMessageRequest
        {
            [AmqpMember]
            public string title { get; set; }

            [AmqpMember]
            public string message { get; set; }
        }

When I try deserializing it with the following code:

DisplayMessageRequest r = request.GetBody<DisplayMessageRequest>();

I get the following exception (with some debug information above the exception):

2016-01-29 13:37:10.4246 [Debug] executing requested action: MESSAGE
2016-01-29 13:37:10.4406 [Debug] type: Amqp.Types.Map, body: [message:test message,title:test title]
2016-01-29 13:37:10.4558 [Error] Agent.Modules.Host+DisplayMessageAction.Agent.Actions.IAgentAction.Execute System.InvalidOperationException: buffer too small
   at Amqp.ByteBuffer.Validate(Boolean write, Int32 dataSize) in d:\Source\Repos\amqpnetlite-git\src\ByteBuffer.cs:line 158
   at Amqp.AmqpBitConverter.ReadByte(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\AmqpBitConverter.cs:line 44
   at Amqp.Serialization.SerializableType.CollectionType.ReadObject(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\SerializableType.cs:line 356

   at Amqp.Serialization.AmqpSerializer.ReadObject[T,TAs](AmqpSerializer serializer, ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 135
   at Amqp.Serialization.AmqpSerializer.Deserialize[T](ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 66
   at Amqp.Framing.AmqpValue.GetValue[T]() in d:\Source\Repos\amqpnetlite-git\src\Framing\AmqpValue.cs:line 95
   at Amqp.Message.GetBody[T]() in d:\Source\Repos\amqpnetlite-git\src\Message.cs:line 119
   at Agent.Modules.Host.DisplayMessageAction.Agent.Actions.IAgentAction.Execute(SenderLink sender, ReceiverLink receiver, Message request) in C:\Users\mbroadst\Development\hive-agent\Agent\Modules\Host.cs:line 221System.InvalidOperationException: buffer too small
   at Amqp.ByteBuffer.Validate(Boolean write, Int32 dataSize) in d:\Source\Repos\amqpnetlite-git\src\ByteBuffer.cs:line 158
   at Amqp.AmqpBitConverter.ReadByte(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\AmqpBitConverter.cs:line 44
   at Amqp.Serialization.SerializableType.CollectionType.ReadObject(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\SerializableType.cs:line 356

   at Amqp.Serialization.AmqpSerializer.ReadObject[T,TAs](AmqpSerializer serializer, ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 135
   at Amqp.Serialization.AmqpSerializer.Deserialize[T](ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 66
   at Amqp.Framing.AmqpValue.GetValue[T]() in d:\Source\Repos\amqpnetlite-git\src\Framing\AmqpValue.cs:line 95
   at Amqp.Message.GetBody[T]() in d:\Source\Repos\amqpnetlite-git\src\Message.cs:line 119
   at Agent.Modules.Host.DisplayMessageAction.Agent.Actions.IAgentAction.Execute(SenderLink sender, ReceiverLink receiver, Message request) in C:\Users\mbroadst\Development\hive-agent\Agent\Modules\Host.cs:line 221    at Amqp.ByteBuffer.Validate(Boolean write, Int32 dataSize) in d:\Source\Repos\amqpnetlite-git\src\ByteBuffer.cs:line 158
   at Amqp.AmqpBitConverter.ReadByte(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\AmqpBitConverter.cs:line 44
   at Amqp.Serialization.SerializableType.CollectionType.ReadObject(ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\SerializableType.cs:line 356

   at Amqp.Serialization.AmqpSerializer.ReadObject[T,TAs](AmqpSerializer serializer, ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 135
   at Amqp.Serialization.AmqpSerializer.Deserialize[T](ByteBuffer buffer) in d:\Source\Repos\amqpnetlite-git\src\Serialization\AmqpSerializer.cs:line 66
   at Amqp.Framing.AmqpValue.GetValue[T]() in d:\Source\Repos\amqpnetlite-git\src\Framing\AmqpValue.cs:line 95
   at Amqp.Message.GetBody[T]() in d:\Source\Repos\amqpnetlite-git\src\Message.cs:line 119
   at Agent.Modules.Host.DisplayMessageAction.Agent.Actions.IAgentAction.Execute(SenderLink sender, ReceiverLink receiver, Message request) in C:\Users\mbroadst\Development\hive-agent\Agent\Modules\Host.cs:line 221

build script enhancements

The build.cmd script should be updated to do the following:

  1. arguments to specify build configuration (debug / release)
  2. arguments to specify whether to auto-run tests
  3. arguments to specify whether to build NuGet package

support for virtualhosts?

the QPID java broker has a concept of a "virtual host" and im wondering how to go about connecting to this kind of broker with the AMQPNetLite lib.

I asked the same thing of the guys over at noodlefrenzy/node-amqp10#171 and they found that the Qpid Java Broker is expecting the virtualhost to be identified with the hostname field in some init frames. Is there a way in this lib i can specify what hostname value to send during initialization where the value may be different than the actual server hostname?

some amqp1.0 clients are supporting the url patten amqps://user:[email protected]:5671/default where default is the name of the virtual host and as such is the value of the hostname field in the init frames

Valid Values for Error Condition

I am fairly new to AMQP protocol and I am using AMQP.Net Lite to implement my server. This server application might eventually be exposed to third party developers to send data.

My question is on the correct usage of Error Object while returning a response. The Error object has three properties Condition, Description, Info. The Description and Info properties seems like it can be user defined which helps me to set information that my developers would understand around my data.

But I am not sure what are the best practices around using Condition property. The specification ( http://docs.oasis-open.org/amqp/core/v1.0/cos01/amqp-core-transport-v1.0-cos01.html#type-amqp-error ) indicates a set of valid values for this property but I am not sure if I Can use my custom values here. It is hard to map my errors to these preset values in the protocol(I don't have a straight forward one to one mapping).

AsyncPump causes a deadlock.

This happens because System.IO.Stream.ReadAsync and WriteAsync are serialized internally, so that, even though they are asynchronous, they are never concurrent. Which means that SslSocket.SendAsync and SslSocket.ReceiveAsync are also serialized, and since AsyncPump calls ReceiveAsync and waits for it indefinitely, no call to SendAsync can be made.

I have "fixed" hacked this by replacing this line with Task.Run( () => this.sslStream.Read( buffer, offset, count ) ); - essentially reverting back to using blocking Read and wrapping it in a Task, which might look like a stupid hack, but only until you consider that this is exactly what System.IO.Stream does anyway! The original interface of Stream only has a blocking Read for the implementer to override, so when the async versions were added later, they had no choice but to call the blocking Read on a separate thread.

Repro:

      var addr = new Address( "amqps://host" );
      var conn = new ConnectionFactory().CreateAsync( addr ).Result;
      var sender = new SenderLink( new Session( conn ), "x", "/NS/Q" );
      sender.Send( new Message( "Body" ) );

The important bit is creating the connection through ConnectionFactory. This creates the AsyncPump, which introduces the problem. If I use the Connection constructor directly, it will create Connection.Pump and the problem will not manifest.

The above code produces the following trace, and then hangs indefinitely (or until timeout):

SEND AMQP 3 1 0 0
SEND sasl-init(mechanism:PLAIN,initial-response:00777373627573003365646324524656,hostname:host)
RECV sasl-mechanisms(sasl-server-mechanisms:[PLAIN,EXTERNAL])
RECV sasl-outcome(code:0,additional-data:57656C636F6D6521)
SEND AMQP 0 1.0.0
SEND (ch=0) open(container-id:AMQPLite-64eabebce78e4b06b78c4845aa992624,host-name:host,max-frame-size:262144,channel-max:7)
RECV AMQP 0 1 0 0
RECV (ch=0) open(container-id:5aed86f8aa2644c69fbbbbf42d6fccd9_Ghost,max-frame-size:65536,channel-max:7,idle-time-out:240000)
SEND (ch=0) begin(next-outgoing-id:4294967293,incoming-window:2048,outgoing-window:2048,handle-max:63)
RECV (ch=0) begin(remote-channel:0,next-outgoing-id:1,incoming-window:2048,outgoing-window:2048,handle-max:7)
SEND (ch=0) attach(name:x,handle:0,role:False,source:source(),target:target(address:/NS/Q),initial-delivery-count:0)
Sending...
RECV (ch=0) attach(name:x,handle:0,role:True,source:source(),target:target(address:/NS/Q),max-message-size:5242880,properties:[com.microsoft:tracking-id:5aed86f8aa2644c69fbbbbf42d6fccd9_Ghost_Bhost;332:94:95])
RECV (ch=0) flow(next-in-id:4294967293,in-window:2048,next-out-id:1,out-window:2048,handle:0,delivery-count:0,link-credit:50,available:0,echo:False)

SenderLink leaks memory when a timeout sending a message occurs

There is a memory leak when a timeout occurs calling SenderLink.Send. I am sending messages that will timeout at roughly a constant interval. Several objects appear to grow linearly with time. If the message sends successfully without a timeout no leak occurs. Here are the object types that are leaking.

Microsoft.Win32.SafeHandles.SafeWaitHandle
ManualResetEvent
Amqp.Message
Amqp.Framing.AmqpValue
Amqp.SenderLink+<>c__DisplayClass3
Amqp.OutcomeCallback
Amqp.Delivery
Amqp.ByteBuffer

The problem is the delivery object gets added to the outgoingList in SenderLink, but never gets removed when a timeout occurs.

Here is my test client.

class Program
{
    static void Main(string[] args)
    {
        var address = "amqp://guest:[email protected]:5672";
        var conn = new Connection(new Address(address));
        var sess = new Session(conn);
        var link = new SenderLink(sess, "test-client", "msg-queue");

        while (true)
        {
            try
            {
                link.Send(new Message("hi"), 10);
            }
            catch (TimeoutException)
            {
            }
        }
    }
}

And here is my test server.

class Program
{
    static void Main(string[] args)
    {
        var address = new Uri("amqp://guest:[email protected]:5672");
        var host = new ContainerHost(new List<Uri>() { address }, null, address.UserInfo);
        host.RegisterMessageProcessor("msg-queue", new MsgProcessor());
        host.Open();

        Console.WriteLine("ready");
        Console.Read();
    }
}

class MsgProcessor : IMessageProcessor
{
    public int Credit
    {
        get { return 1; }
    }

    public void Process(MessageContext messageContext)
    {
        Thread.Sleep(20);
        messageContext.Complete();
    }
}

Here are my findings. All object counts grow at the same rate.

amqp_timeout_leak

Doing Request/Response initiated from IoT Core device

Hi,
I want to send a message from a Raspberry Pi (Windows 10 IoT Core) to a normal service running on a normal PC (Win 10/.Net 4.5.2). I already consumed this part from the amqp specification: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-transport

My question is: Is there a buildin way to send a message and receive the response correlated to it? Is this the Listener stuff? I can get the "Outcome" if I send a message, but it does not have a Message anywhere?

thx

Flow control drain flag

I am trying to understand the drain flag described in 2.6.7 Flow Control of the AMQP 1.0 spec.

http://docs.oasis-open.org/amqp/core/v1.0/cos01/amqp-core-transport-v1.0-cos01.html#doc-flow-control

From what I understand if the drain flag is true and the sender link does not have any messages to send it should basically “return” the credit by consuming the credit and sending the flow state to the receiver link. I am not seeing that behavior with SenderLink and I’m not sure if my understanding of the drain flag is incomplete or if this is a possible bug.

In SenderLink.OnFlow I see nowhere that the Flow.Drain property is honored. I see no place in the entire Amqp.Net project where the Flow.Drain property is used besides being set actually. Instead if SenderLink.OnFlow has no outgoing messages it just returns and keeps the credit. Is this a bug?

Here is a sample application where I updated the sender link’s credit with drain set to true and the sender keeps the credit.

class Program
{
    const string Address = "amqp://localhost:5462";
    const string MsgQueue = "queue";

    static void Main(string[] args)
    {
        var host = new ContainerHost(new Uri(Address));
        host.RegisterMessageProcessor(MsgQueue, new MessageProcessor());
        host.Open();

        var conn = new Connection(new Address(Address));
        var sess = new Session(conn);
        var sender = new SenderLink(sess, "client", MsgQueue);

        sender.Send(new Message("1"));

        //Shouldn't sender.credit be 0 here?
    }
}

class MessageProcessor : IMessageProcessor
{

    public int Credit
    {
        get { return 1; }
    }

    public void Process(MessageContext messageContext)
    {
        Console.WriteLine(messageContext.Message.Body);
        messageContext.Link.SetCredit(10, true);
        messageContext.Complete();
    }
}

serialization of enum types

Is it possible to have an [AmqpMember] for an enum type? I'm getting a NotSupportedException, but I'm not sure if that's because the enum is actually a region from a DllImport (I'm unfortunately maintaining a legacy codebase here and C# isn't my strongest suit!)

I couldn't find any examples of serializing an enum type in the tests, so I'm thinking maybe it's not supported (yet)

SASL EXTERNAL not working with Apache Qpid C++ broker

The SASL EXTERNAL mechanism is currently not working against the Apache Qpid broker. The client sends the sasl-init without the initial response. That triggers the Qpid C++ broker to send sasl-challenge, to which the client never responds.

This can be fixed in two ways:
a) send the sasl-init with (empty) initial response (not sure what impact will this have e.g. on Azure Service Bus)
b) respond to sasl-challenge with sasl-response containing empty response

Maximum number of concurrent messages processed by broker

Is there a way to limit the number of messages being processed concurrently on the broker? Here is my use case. I am using a ConnectionListener for my broker. There are many clients each creating a SenderLink, which creates a ReceiverLink on the broker side. In most cases they send messages at a slow enough rate where all messages can be handled in a timely manner. However let’s say the broker becomes unavailable and each client buffers data. When the broker becomes available they all send messages at one time. The broker will create a new thread for each incoming messages. I have seen that 100 clients (100 SenderLinks from 100 independent connections) will use about 100 threads to receive messages on the broker (I counted in a memory dump) when each SenderLink is sending messages as fast as they can to catch up.

I was thinking of keeping track of the number of messages being processed in IMessageProcessor.Process and returning error “amqp:resource-limit-exceeded” when it exceeds a threshold to minimize the problem. However this doesn't stop the messages from being sent over the network (1MB each) or AMQP.NET Lite from doing the work to receive and decode the messages. It would be nice to stop the incoming message as soon as possible to avoid this expensive work.

I can limit the number of messages per ReceiverLink using the credit, but even at a credit of 1 if I have a lot of ReceiverLinks this can still result in many threads handling incoming messages from each ReceiverLink.

Since each client creates its own connection and session I could limit the total number of connected clients by using AmqpSettings.MaxSessionsPerConnection. However I don’t want to limit the number of connections just how many messages can be processed at once time.

Thanks for the help.

Any plans for a WCF Binding?

I'm really impressed with this framework. Looks like it hits almost all the possible use cases.

Any plans to build a WCF binding? Or do you know if anyone else is?

System.Net.Sockets.SocketException

Hi,

I'm trying to run the Interop.Server/Client examples in DNXCore5.0, but I get the following System.Net.Sockets.SocketException: "No connection could be made because the target machine actively refused it".

Is this just because websockets aren't supported by DNXCore5.0?
Googling this error told me to make firewall rules for the 5672 port, but this doesn't help. I also looked if the port is already used by any application by running the "netstat" command in cmd, but this is not the case. I'm just trying to run the examples locally for now.

Curious to hear your responses!

customize attach for ContainerHost outgoing links

I see that it's possible to customize attaches for incoming links using a LinkProcessor, however I don't see how it's possible to customize it for outgoing links. For instance, the amqp spec indicates that I can specify a apache.org:legacy-amqp-topic-binding:string property to match subjects to a given endpoint. How would I attach this information to the link created for a RegisterMessageProcessor?

When will the next NuGet package be released?

There have been a few important issues that have been addressed since the last NuGet package release (#10, #5, #4 and soon possibly #9). When do you plan to release the next version? Do you have any general guidelines for changes that justify a release?

Forcefully close a client connection

Here is the use case I have in mind. One application is acting as the broker or server using Amqp.Listener.ContainerHost. Another application is acting as the client using an Amqp.Connection (or a different AMQP client library). The client must authenticate with any SASL authentication mechanism (i.e. plain or external). The client authenticates successfully and starts sending messages. At some point down the road the server needs to revoke the permissions from the client (i.e. client is sending malicious data, is stale, no longer is allowed to send data to this server, etc). At this point the server needs to force the client connection to close. This should close the connection and clean up the links associated with the connection.

Session flow control

NextIncomingId is not updated on receiving begin. If the peer uses a non-zero next-outgoing-id session could stop.

Small memory devices

The current implementation for .NET Micro framework eats an incredible amount of RAM and FLASH. This is a killer issue in resource constrained devices.
That is mostly because of the way exception handling and trace messages are implemented: they are too verbose.
Also because of the very tidy organization of the classes and derived classes it often ends up with huge call stacks. This is an issue in devices with small memory because the argument objects are copied over and over eating a lot of precious RAM space.

I've send a pull request to address these issues. Basically it reworks the implementation of the exception, error codes and several method declarations where I've changed the arguments to be passed by reference.

To use this just add SMALL_MEMORY to the project compiler definitions and reload the solution.
(this is required because I'm using that definition to include - or not - some files and the resource file in the project tree)

Follows the comparison of FLASH and RAM usage in the app I've tested this with (no other changes in code except for the recompilation of the AMQP library).
Current implementation:

  • deployment (FLASH) 170732 bytes
  • free RAM after boot 19262 bytes

Using the SMALL_MEMORY compilation:

  • deployment (FLASH) 154732 bytes
  • free RAM after boot 31904 bytes

This is my contribution to this awesome library that is a great addition to the .NET Micro framework for IoT projects.
Please review and comment! 😄

Amqp.Message is not serializable

Hello,
is there a way to serialize Amqp.Message objects to reach persistence? The Binary Formatter failed because the type isn't marked with the Serializable attribute. The XmlSerializer which doesn't care about the attribute failed too because Amqp.Types.Map implements IDictionary.

Maybe you can advise a different approach.

Kind regards

ctetzlaff

Links are not assigned a handle ID

According to section 2.6.2 of the AMQP 1.0 specification each link should be assigned a numeric handle. All handles return a value of 0. Here is a test to reproduce the issue.

    [TestMethod]
    public void LinkHandleTest()
    {
        const string MsgProcName = "msg_processor";
        const string Endpoint = "amqp://guest:[email protected]:5762";

        var processor = new Moq.Mock<IMessageProcessor>();

        processor.Setup(proc => proc.Credit).Returns(300);

        processor.Setup(proc => proc.Process(It.IsAny<MessageContext>()))
            .Callback<MessageContext>(msg =>
            {
                Console.WriteLine("Name: {0}, Handle: {1}", msg.Link.Name, msg.Link.Handle);
                msg.Complete();
            });

        var uri = new Uri(Endpoint);
        var host = new ContainerHost(new List<Uri> { uri }, null, uri.UserInfo);
        host.RegisterMessageProcessor(MsgProcName, processor.Object);
        host.Open();

        var conn1 = new Connection(new Address(Endpoint));
        var sess1 = new Session(conn1);
        var link1 = new SenderLink(sess1, "link1", MsgProcName);
        link1.Send(new Message("1"), 5000); //output in msg processor: Name: link1, Handle: 0

        var conn2 = new Connection(new Address(Endpoint));
        var sess2 = new Session(conn2);
        var link2 = new SenderLink(sess2, "link2", MsgProcName);
        link2.Send(new Message("2"), 5000); //output in msg processor: Name: link2, Handle: 0
    }

How to send a message with a body that is an object of a custom type

I am having trouble figuring out how to send a message with custom types. I can see that the serializer supports custom types by using AmqpContract and AmqpMember. However I cannot figure out how to deserialize the message body back to the type instead of the described type. Is this possible? I have started an example here.

https://github.com/rr118/amqpnetlite/tree/master/Examples/PeerToPeer/PeerToPeer.CustomType

At line 15 of MessageProcessor I do not know how to deserialize the message body back to a person type. Could you explain what I am missing please? Once I can finish this example I will submit it with a pull request.

Thanks,

Rob

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.