Giter Site home page Giter Site logo

vertx-stomp's Introduction

vert.x-stomp

Build Status (5.x) Build Status (4.x)

A vert.x implementation of the STOMP specification. It provides a STOMP server and client.

vertx-stomp's People

Contributors

afloarea avatar cbdyzj avatar cescoffier avatar deepsleep avatar doernemt avatar pmlopes avatar slinkydeveloper avatar tsegismont avatar vietj avatar wdroste avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vertx-stomp's Issues

JWTAuth Provider not working?

How to parse JWT Token sent by Stomp JS client in the header using JWTAuth Provider ?

  • Java
 JWTAuth authProvider = JWTAuth.create(vertx, new JWTAuthOptions()
            .setPermissionsClaimKey("")
            .addPubSecKey(
                  new PubSecKeyOptions()
                        .setPublicKey(config().getString("public_key"))
                        .setAlgorithm(config().getString("algorithm"))
            ));

 StompServer server = StompServer.create(vertx, new StompServerOptions()
            .setSecured(true)
            .setPort(-1) // Disable the TCP port, optional
            .setWebsocketBridge(true) // Enable the web socket support
            .setWebsocketPath("/ws")) // Configure the web socket path, /stomp by default
            .handler(StompServerHandler.create(vertx).authProvider(authProvider));
  • Javascript
 const client = new Stomp.Client({
         brokerURL : 'url',
         connectHeaders : {
            Authorization : 'Bearer ' + <TOKEN>
         }
      });

Th jwtAuth provider works fine with my HTTP routes, but not with my WebSocket.

Thanks in advance.

StompClientConnection closeHandler is called twice

It looks like the handler is attached to the tcp socket close event and the ponger timeout. This can cause the socket to reconnect twice if, for example, you have configured a closeHandler to automatically reconnect on close.

Thanks

Stomp Client connect to non root endpoint

Can the StompClient be extended to take a requestURI in addition to the host and port number. This is to enable usage with a server that is hosting a websocket connection from a request URI rather than the default root of the domain.

ie.

HttpClient client = vertx.createHttpClient().websocket(80, "localhost", "/foo", ...

The above code connects correctly to the server hosting the stomp websocket.

Is there an option to specify foo when using StompClient?

Handler for logging STOMP frames sent/received

Make it possible to add a handler that can log the STOMP frames being sent/received through the APIs.
For debugging purposes it would be really great to be able to sequentially log the messages that are being routed (either received or sent) through the APIs whether that be CONNECT, MESSAGE, DISCONNECT or any other type of STOMP frame (even heartbeat pings). If the handler takes the frame as a parameter and it is possible to check the type of frame (CONNECT, MESSAGE, ...) then it would be really great as we would be able to log the different frames in different ways (colours etc).

See discussion here: https://groups.google.com/forum/?fromgroups#!topic/vertx/pE5E4RKXueg

Only reproducible on a Mac

JAVA 1.8_144

Tried it against 3.4.0, 3.4.1, 3.5.0-SNAPSHOT

We built a stomp client that is reliably getting this error on Mac but not on our linux boxes.. it looks like there's a logic error or weirdness in a data handler..

SEVERE: Unhandled exception java.lang.IllegalArgumentException: end must be greater or equal than start at io.vertx.core.impl.Arguments.require(Arguments.java:34) at io.vertx.core.buffer.impl.BufferImpl.getBytes(BufferImpl.java:169) at io.vertx.core.buffer.impl.BufferImpl.getBuffer(BufferImpl.java:198) at io.vertx.core.parsetools.impl.RecordParserImpl.parseDelimited(RecordParserImpl.java:182) at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:159) at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:218) at io.vertx.ext.stomp.impl.FrameParser.handle(FrameParser.java:221) at io.vertx.ext.stomp.impl.StompClientConnectionImpl.lambda$new$2(StompClientConnectionImpl.java:105) at io.vertx.core.net.impl.NetSocketImpl.handleDataReceived(NetSocketImpl.java:325) at io.vertx.core.net.impl.NetClientImpl.handleMsgReceived(NetClientImpl.java:101) at io.vertx.core.net.impl.NetClientImpl.handleMsgReceived(NetClientImpl.java:45) at io.vertx.core.net.impl.NetClientBase$1.handleMsgReceived(NetClientBase.java:189) at io.vertx.core.net.impl.VertxNetHandler.lambda$channelRead$0(VertxNetHandler.java:63)

Futurisation

We are providing future methods for this component, e.g

Future<StompClientConnection> fut = StompClient.create(vertx).connect(61613, "0.0.0.0");

Can't capture exceptions from connectAwait

The way StompClient.connectAwait is implemented doesn't seem correct to me because it doesn allow to capture the exception awaitResult might throw.

When connecting you might not be able to connect for various reasons, that can be handled. The way it is implemented now connectAwait returns StompClientConnection or it swallows exceptions?

This WON"T work:

        try {
            stompClient.connectAwait()
        } catch( e: Exception) {
            println(e)
        }

So if I can't connect then what? I get an exception on the main event loop? This is what i see

io.vertx.core.impl.NoStackTraceThrowable: CONNECTED frame not receive in time

I think awaitConnect should really be more like this:

    val connectionResult = awaitEvent<AsyncResult<StompClientConnection>> {
            stompClient.connect(it)
        }

please correct me if I am wrong.

bodyLength not reseted between frames

When I Use StompClient, the FrameParser's bodyLength has been growing, but the stomp frames are different.

And A few time ago, it will through "Body size exceeded" exception.

Cannot configure Auth with WebSocket server

I have added HttpServer with WebSocket handler for STOMPSever like this:

  Vertx vertx = Vertx.vertx();
        
        StompServerHandler handler = StompServerHandler.create(vertx).authProvider(new AuthenticationProvider());
        StompServer server = StompServer.create(vertx, 
                    new StompServerOptions()
                            .setSecured(true)
                            .setWebsocketBridge(true)
                           // .setPort(-1)
                            .setWebsocketPath("/broker/stomp")
                )
                .handler(handler)
                .listen(61613, "0.0.0.0");

        HttpServer http = vertx.createHttpServer(
                new HttpServerOptions().setWebsocketSubProtocols("v10.stomp, v11.stomp, v12.stomp")
        )
                .websocketHandler(server.webSocketHandler())
                
                .listen(61614);

But i get exception:

SEVERE: Unhandled exception
java.lang.NullPointerException
	at io.vertx.ext.stomp.impl.DefaultStompHandler.onAuthenticationRequest(DefaultStompHandler.java:437)
	at io.vertx.ext.stomp.DefaultConnectHandler.authenticate(DefaultConnectHandler.java:86)
	at io.vertx.ext.stomp.DefaultConnectHandler.handle(DefaultConnectHandler.java:70)
	at io.vertx.ext.stomp.DefaultConnectHandler.handle(DefaultConnectHandler.java:41)
	at io.vertx.ext.stomp.impl.DefaultStompHandler.handleConnect(DefaultStompHandler.java:379)
	at io.vertx.ext.stomp.impl.DefaultStompHandler.handle(DefaultStompHandler.java:235)
	at io.vertx.ext.stomp.impl.DefaultStompHandler.handle(DefaultStompHandler.java:66)
	at io.vertx.ext.stomp.impl.StompServerImpl.lambda$null$13(StompServerImpl.java:237)
	at io.vertx.ext.stomp.impl.FrameParser.handleLine(FrameParser.java:164)
	at io.vertx.core.parsetools.impl.RecordParserImpl.parseDelimited(RecordParserImpl.java:202)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:176)
	at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:235)
	at io.vertx.ext.stomp.impl.FrameParser.handle(FrameParser.java:221)
	at io.vertx.ext.stomp.impl.FrameParser.handle(FrameParser.java:35)
	at io.vertx.core.http.impl.WebSocketImplBase.handleFrame(WebSocketImplBase.java:268)
	at io.vertx.core.http.impl.Http1xServerConnection.handleOther(Http1xServerConnection.java:499)
	at io.vertx.core.http.impl.Http1xServerConnection.processMessage(Http1xServerConnection.java:460)
	at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:144)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandlerWithWebSockets.handleMessage(HttpServerImpl.java:676)
	at io.vertx.core.http.impl.HttpServerImpl$ServerHandlerWithWebSockets.handleMessage(HttpServerImpl.java:619)
	at io.vertx.core.net.impl.VertxHandler.lambda$channelRead$1(VertxHandler.java:146)
	at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:337)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:195)
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:144)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:141)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

when i try to login from STOMP over web socket

Unit test failing in CI

This failure has been reported by CI:

Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.489 sec <<< FAILURE! - in io.vertx.ext.stomp.impl.WebSocketBridgeTest
testWebSocketsWhenTCPDisabled(io.vertx.ext.stomp.impl.WebSocketBridgeTest)  Time elapsed: 0.024 sec  <<< FAILURE!
java.lang.AssertionError: Received a failed result Address already in use
	at io.vertx.ext.stomp.impl.AsyncLock.waitForSuccess(AsyncLock.java:62)
	at io.vertx.ext.stomp.impl.WebSocketBridgeTest.testWebSocketsWhenTCPDisabled(WebSocketBridgeTest.java:319)

it is either a wrong test or a bug.

more info here https://travis-ci.org/vert-x3/vertx-stomp/jobs/461273463

Check Frame if it is a Received or a Sent frame

I'd like to be able to check if a frame is a received frame or a frame that is sent. I can check on the frame.getCommand() for all kinds of frame types and based on that I can make some logic that tells whether it is a received frame or a sent frame based on whether I do client logic or server logic. However, I can't find a good way around checking if a PING frame is a received one or a sent one.

Cannot determine if a client cert has been provided

I have vertx stomp server that should authenticate an encrypted connection with a client certificate or if not provided authenticate with a user name / password.

When I set clientAuth to REQUEST it works.

See: https://vertx.io/docs/apidocs/io/vertx/core/http/ClientAuth.html#REQUEST

It works meaning I am able to accept encrypted connections when cert is provided and also when no cert is provided by the client. The problem is that on the server I am unable to determine if the client has sent a cert or has not sent a cert. Does anyone know how this can be done?

To me it looks the like the problem is that

https://vertx.io/docs/apidocs/io/vertx/ext/stomp/StompServerConnection.html and it's implementation StompServerTCPConnectionImpl.java is overly restrictive in that it doesn't allow to access the NetSocket, so there is no way to look at the certificate. The handler gets a ServerFrame, from that you can get the connection, but from the connection you cannot get the socket and you would need the socket to look at the certificate. You need the certificate to determine if it cares about he username or password.

Dead lock sometimes occurs when sending and listening in the same process (e.g. in a test case)

The sender and listener are using separate StompClient instances. This is the dead lock info:

Found one Java-level deadlock:
=============================
"vert.x-eventloop-thread-1":
  waiting to lock monitor 0x00007fa184003c98 (object 0x00000007733f9808, a io.vertx.ext.stomp.impl.StompClientConnectionImpl),
  which is held by "main"
"main":
  waiting to lock monitor 0x00007fa184003be8 (object 0x00000007733f8cc0, a io.vertx.core.net.impl.NetSocketImpl),
  which is held by "vert.x-eventloop-thread-1"

Java stack information for the threads listed above:
===================================================
"vert.x-eventloop-thread-1":
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.handle(StompClientConnectionImpl.java:520)
        - waiting to lock <0x00000007733f9808> (a io.vertx.ext.stomp.impl.StompClientConnectionImpl)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.handle(StompClientConnectionImpl.java:40)
        at io.vertx.ext.stomp.impl.FrameParser.handleLine(FrameParser.java:91)
        at io.vertx.ext.stomp.impl.FrameParser$$Lambda$341/803919044.handle(Unknown Source)
        at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:197)
        at io.vertx.core.parsetools.impl.RecordParserImpl.handle(RecordParserImpl.java:259)
        at io.vertx.ext.stomp.impl.FrameParser.handle(FrameParser.java:221)
        - locked <0x00000007733f98f8> (a io.vertx.ext.stomp.impl.FrameParser)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.lambda$new$2(StompClientConnectionImpl.java:107)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl$$Lambda$342/818696377.handle(Unknown Source)
        at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:393)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:230)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:120)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:371)
        - locked <0x00000007733f8cc0> (a io.vertx.core.net.impl.NetSocketImpl)
        at io.vertx.core.net.impl.ConnectionBase.handleRead(ConnectionBase.java:390)
        at io.vertx.core.net.impl.VertxHandler$$Lambda$331/9417091.handle(Unknown Source)
        at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
        at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:188)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:174)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
"main":
        at io.vertx.core.net.impl.NetSocketImpl.writeMessage(NetSocketImpl.java:113)
        - waiting to lock <0x00000007733f8cc0> (a io.vertx.core.net.impl.NetSocketImpl)
        at io.vertx.core.net.impl.NetSocketImpl.write(NetSocketImpl.java:159)
        at io.vertx.core.net.impl.NetSocketImpl.write(NetSocketImpl.java:137)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.send(StompClientConnectionImpl.java:202)
        - locked <0x00000007733f9808> (a io.vertx.ext.stomp.impl.StompClientConnectionImpl)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.send(StompClientConnectionImpl.java:233)
        at io.vertx.ext.stomp.impl.StompClientConnectionImpl.send(StompClientConnectionImpl.java:208)
        at com.example.testSimplePubSub(Test.java:105)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:628)
        at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:117)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:184)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor$$Lambda$223/124734309.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:180)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:127)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$150/1043208434.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$149/215219944.invoke(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$148/1243806178.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService$$Lambda$154/776700275.accept(Unknown Source)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$150/1043208434.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$149/215219944.invoke(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$148/1243806178.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService$$Lambda$154/776700275.accept(Unknown Source)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$150/1043208434.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$149/215219944.invoke(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask$$Lambda$148/1243806178.execute(Unknown Source)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:142)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:117)
        at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:383)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:344)
        at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:125)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:417)

Found 1 deadlock.

Only half available ports can be bond when set both localAddress and setReuseAddress as true in the StompClientOptions

Hi there, I was doing performance testing on a vertx-stomp based application, and we use StompClient to generate large number of stomp clients.

I found a weird issue that when setting both setLocalAddress(localAddress) and setReuseAddress(true), the StompClient can only bind about half of the available ports on linux(CentOS 7 and Ubuntu 16.4). And I also tested on macOS, it does not have this issue. And when setting setReuseAddress(false) does not have this issue neither.
All of the testing are running on the Hotspot jdk 8 build #162

You can reproduce this issue by using the attached jar by follow steps.

  1. Download the source code attached in the zip and build with maven. A jar file test-stomp-0.0.1-jar-with-dependencies.jar will be generated.

  2. Start up the MyStompServer
    java -jar test-stomp-0.0.1-jar-with-dependencies.jar

  3. Narrow the ip_local_port_range to 10 avaialbe by running below command in the client server
    sysctl -w 'net.ipv4.ip_local_port_range=61000 61009'

  4. Connected 10 clients to the Stomp server
    java -Djava.net.preferIPv4Stack=true -cp test-stomp-0.0.1-jar-with-dependencies.jar test.stomp.MyStompClient 10 true <clientIPAddr> <serverIPAddr>
    Here, 10 is start up 10 clients, true is set setReuseAddress as true in the StompClientOptions.

Then there are only 5 clients can be start up and check by using netstat -nt|grep 61613 in the client server(192.168.0.178)

tcp | 0 | 0 192.168.0.178:61000 | 192.168.0.84:61613 | ESTABLISHED
tcp | 0 | 0 192.168.0.178:61004 | 192.168.0.84:61613 | ESTABLISHED
tcp | 0 | 0 192.168.0.178:61001 | 192.168.0.84:61613 | ESTABLISHED
tcp | 0 | 0 192.168.0.178:61002 | 192.168.0.84:61613 | ESTABLISHED
tcp | 0 | 0 192.168.0.178:61003 | 192.168.0.84:61613 | ESTABLISHED

And there are exception as below:

io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:83)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:80)
    at java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:80)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:310)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:254)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:1000)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:264)
    at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:254)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.BindException: Cannot assign requested address
    ... 22 more
Failed to connect to the STOMP server: io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:83)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:80)
    at java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:80)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:310)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:254)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:1000)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:264)
    at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:254)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.BindException: Cannot assign requested address
    ... 22 more
Failed to connect to the STOMP server: io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:83)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:80)
    at java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:80)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:310)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:254)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:1000)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:264)
    at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:254)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.BindException: Cannot assign requested address
    ... 22 more
Failed to connect to the STOMP server: io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:83)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:80)
    at java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:80)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:310)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:254)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:1000)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:264)
    at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:254)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.BindException: Cannot assign requested address
    ... 22 more
Failed to connect to the STOMP server: io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:454)
    at sun.nio.ch.Net.connect(Net.java:446)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:83)
    at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:80)
    at java.security.AccessController.doPrivileged(Native Method)
    at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:80)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:310)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:254)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1291)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:545)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:530)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:1000)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:264)
    at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:254)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.BindException: Cannot assign requested address
    ... 22 more
Failed to connect to the STOMP server: io.netty.channel.AbstractChannel$AnnotatedSocketException: Cannot assign requested address: /192.168.0.84:61613

3 Receive ping from server at 5186310942605173
1 Receive ping from server at 5186310946579101
0 Receive ping from server at 5186310947877513
4 Receive ping from server at 5186310949324149
2 Receive ping from server at 5186310950959071

When run below, there are 10 clients can be run and no exceptions print out.
java -Djava.net.preferIPv4Stack=true -cp test-stomp-0.0.1-jar-with-dependencies.jar test.stomp.MyStompClient 10 fase <clientIPAddr> <serverIPAddr>

F.Y.I. The attached jar is the stomp server and client used above. And the zip file is the full src code with maven pom. And for your better reading, I copy the startAClient method from MyStompClient as below

 public void startAClient(String localIP, String remoteIP, int id, boolean reuseAddress) throws UnknownHostException {
        StompClientOptions options = new StompClientOptions();
        options.setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000));
        options.setLocalAddress(localIP);
        options.setReuseAddress(reuseAddress);
        StompClient client = StompClient.create(vertx,options) .receivedFrameHandler(frame -> {
            if(Frame.Command.PING.equals(frame.getCommand())) {
                System.out.println(id + " Receive ping from server at "  + System.nanoTime());
            }
        });
        client .connect(61613, remoteIP, ar -> {
            if (ar.succeeded()) {
                StompClientConnection connection = ar.result();
            } else {
                Exception e = (Exception) ar.cause();
                e.printStackTrace();
                System.out.println("Failed to connect to the STOMP server: " + ar.cause());
            } });
    }

teststomp.zip

Cannot set droppedHandler until after a CONNECTED frame is received

The outcome is that if you connect to a socket which immediately resets the connection, the error cannot be handled.

An example of this would be connecting to a socket which immediately closes the socket (e.g. due to a firewall rule).

You can simulate this with iptables like so:

-A INPUT -p tcp -m state --state NEW -m tcp --dport 61613 -j REJECT --reject-with tcp-reset

One dirty fix would be for the default droppedHandler to call resultHandler with a failed future (instead of just doing nothing), however, I think the code would benefit from a slight refactor instead.

Frame Ack null value cause Queue/Topic transform method NPE

Account to API:
io.vertx.ext.stomp.Frame.java that "getAck()" method may return null.

   /**
   * Gets the value of the {@code ack} header.
   *
   * @return the {@code ack} header value, {@code null} if not set
   */
  public String getAck() {
    return headers.get(ACK);
  }

So io.vertx.ext.stomp.impl.Topic.Subscription constructor -->
protected class Subscription {
...
protected Subscription(StompServerConnection connection, Frame frame) {
...
this.ackMode = frame.getAck();
...
}
...
}

And, io.vertx.ext.stomp.impl.Queue.Subscription constructor -->
private class Subscription {
...
private Subscription(StompServerConnection connection, Frame frame) {
...
this.ackMode = frame.getAck();
...
}
}

"this.ackMode" may be null value.

When I used "netty 4.1.x" sample code "io.netty.example.stomp.StompClient.java" causing below exception!


java.lang.NullPointerException
at io.vertx.ext.stomp.impl.Queue.transform(Queue.java:90)
That code is at "...if (!subscription.ackMode.equals("auto")) {..."


java.lang.NullPointerException
at io.vertx.ext.stomp.impl.Topic.transform(Topic.java:77)
That code is at "... if (!subscription.ackMode.equals("auto")) { ..."

Server will still allow requests when client does not send a CONNECT frame

If you connect a client that misbehaves such that it never sends a CONNECT frame. Which will then never call the Connect Handler and hence skipping any configured AuthProvider. The server will still allow the client to send messages unless explicitly handled by individual handlers. I am unsure if this was the intention or if it is a bug.

Stomp Websocket fallbacks

Currently if you use stomp over websocket, will it fallback to sockjs if websocket is unavailable?

Heartbeat configuration in the default stomp server handler appears to be incorrect

If a client connects to the STOMP server with this heartbeat header:
heart-beat:30000,300000

The server then might respond with:
heart-beat:300000,30000

This should result in the client sending a heartbeat every 30 seconds (max of c.x, s.y), and the server sending a heartbeat every 5 minutes (max of c.y, s.x).

By default it appears that the Stomp server handler will in this situation send a heartbeat every 30 seconds and expect one from the client every 5 minutes. I think this is because the API is slightly confusing. The following call will compute the heartbeat interval for the STOMP client.

Frame.Heartbeat.computePingPeriod(
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)),
Frame.Heartbeat.create(connection.server().options().getHeartbeat()));

The resulting value is then passed in to the connection.configureHeartbeat function as the first parameter, which is the server heartbeat ping time. Basically, these need to be reversed either by reversing the parameters passed to that configureHeartbeat or change the way the handler is calculating ping/pong.

Handling authentication errors in StompClient version 3.2.1

My application supplied incorrect credentials to the StompClient and so the server responded to the CONNECT frame with an ERROR frame.

However, because the application hadn't yet been given a chance to set StompClientConnectionImpl.errorHandler the authentication failure wasn't passed to the application.

StompClientConnection reconnect

I use vertx-stomp to connect to ActiveMQ successfully,
and when the ActiveMQ server is closed or the network is disconnected,
it can not be reconnected automatically, and how to refresh the connection when the server is open or the network is restored.
stompClientOptions.setReconnectInterval (1000) is invalid.
use java
Thanks very much!

STOMP server tasks(pinger/ponger) runs endlessly

Hi, I have an issue with a lot of logs about idle connections on the STOMP server(3.7.1 version) with websocket bridge.

Example from my logs:

11:35:18.486 WARN Disconnecting client io.vertx.ext.stomp.impl.StompServerWebSocketConnectionImpl@6f5765e1 - no client activity in the last 329665005 ms

11:35:18.442 WARN Disconnecting client io.vertx.ext.stomp.impl.StompServerWebSocketConnectionImpl@6f5765e1 - no client activity in the last 329664961 ms

11:35:13.486 WARN Disconnecting client io.vertx.ext.stomp.impl.StompServerWebSocketConnectionImpl@6f5765e1 - no client activity in the last 329660005 ms

As you can see this hashCode is the same for all logs.

Logs count(only for this connection):

  1. last 8 hours: 11,541
  2. all time: 133,148

Clients use the following heart-beat value: 5000,10000

UPD:
I reproduced this issue -> client sends more than one CONNECT command. A quick solution is a change StompServerTCPConnectionImpl#configureHeartbeat to:

public synchronized void configureHeartbeat(long ping, long pong, Handler<StompServerConnection> pingHandler) {
    cancelHeartbeat();

    if (ping > 0) {
      pinger = server.vertx().setPeriodic(ping, l -> pingHandler.handle(this));
    }
    if (pong > 0) {
      ponger = server.vertx().setPeriodic(pong, l -> {
        long delta = System.nanoTime() - lastClientActivity;
        final long deltaInMs = TimeUnit.MILLISECONDS.convert(delta, TimeUnit.NANOSECONDS);
        if (deltaInMs > pong * 2) {
          log.warn("Disconnecting client " + this + " - no client activity in the last " + deltaInMs + " ms");
          close();
        }
      });
    }
  }

But we should handle this case normally.

First Time setPeriodic on PING failes to get canceled

On startup of application and in the first websocket connection the id of the timer set the pinger id is of value zero

code reference:

io.vertx.ext.stomp.impl.StompServerTCPConnectionImpl#pinger:132

@Override
  public synchronized void configureHeartbeat(long ping, long pong, Handler<StompServerConnection> pingHandler) {
    if (ping > 0) {
      pinger = server.vertx().setPeriodic(ping, l -> pingHandler.handle(this));
    }
    if (pong > 0) {
      ponger = server.vertx().setPeriodic(pong, l -> {
        long delta = System.nanoTime() - lastClientActivity;
        final long deltaInMs = TimeUnit.MILLISECONDS.convert(delta, TimeUnit.NANOSECONDS);
        if (deltaInMs > pong * 2) {
          log.warn("Disconnecting client " + this + " - no client activity in the last " + deltaInMs + " ms");
          close();
        }
      });
    }
  }

therefore the pinger which is the id of the timer is of value zero

when closing connection and on cancleHeardbeat the check of pinger >0 fails and the timer is not removed

io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java:113

public synchronized void cancelHeartbeat() {
   if (pinger > 0) {
     server.vertx().cancelTimer(pinger);
     pinger = 0;
   }

   if (ponger > 0) {
     server.vertx().cancelTimer(ponger);
     ponger = 0;
   }
 }

Workaround is to create a once off timer at vertx boot:

server.vertx().setTimer(1, id -> LOG.info("Cancelling {} Timer as a Workaround for StompServer", id));

Add support for StompJS

StompJS (http://jmesnil.net/stomp-websocket/doc) defines how JavaScript client can connect directly to a Stomp server without having to handle a TCP socket (replaced by a web socket).

To support StompJS, we would need to abstract how we handle connection on the server side to use a web socket instead, and also provide a websocket handler that you can pass to a HTTP server.

The Stomp server won't be responsible for the creation of the HTTP server, but would just provides a handler.

CONNECT frame not handled by writingFrameHandler

I've tried the following code:

StompClient stompClient = StompClient.create(vertx, stompClientOptions);
	stompClient
		.receivedFrameHandler(frame -> {
			printFrame(frame);
		})
		.writingFrameHandler(frame -> {
			printFrame(frame);
		})
		.connect(ar -> {
			// ....
		});

and the first frame that I get printed (from the printFrame method) is the "CONNECTED" frame. I would have guessed that the "CONNECT" frame be the first, but it never gets printed.

Stomp client no support for virtual hosts

In CloudAMQP you have a host, and a virtual host. Host is the name for the IP address to connect to, and the virtual host is the name to put in the "host" field on the CONNECT frame. There is no way to specify that on the actual implementation.
Maybe an "virtualHost" option could help?

Frames.createErrorFrame sets an invalid CONTENT_TYPE

io.vertx.ext.stomp.Frames#createErrorFrame sets content-type for String messages to test/plain which makes de-serialisation fail on the client.

Workaround is to overwrite the header after constructing the error frame.

PR to follow

Cannot run IT tests

currently the following:

mvn clean verify -Pit-with-docker

fails:

[ERROR] DOCKER> Unable to build image [vertx-stomp/rabbitmq-stomp] : The command '/bin/sh -c rabbitmq-plugins enable --offline enable rabbitmq_stomp' returned a non-zero code: 70  [The command '/bin/sh -c rabbitmq-plugins enable --offline enable rabbitmq_stomp' returned a non-zero code: 70 ]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.328 s
[INFO] Finished at: 2018-09-28T17:11:44+02:00
[INFO] Final Memory: 44M/577M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal io.fabric8:docker-maven-plugin:0.21.0:build (start) on project vertx-stomp: Unable to build image [vertx-stomp/rabbitmq-stomp] : The command '/bin/sh -c rabbitmq-plugins enable --offline enable rabbitmq_stomp' returned a non-zero code: 70  -> [Help 1]

this prevents testing the interop with brokers.

Connection timeout errors should result in a normal Exception with a stack trace

The FailedFuture implementation creates a failed future with a NoStackTraceThrowable error when created with a simple string message. This is the constructor that gets called by StompClientImpl:192 when there is a timeout waiting for a CONNECTED frame from the server. I think we need to create an actual failed future here with a logical Exception.

I ran into this because I was using the awaitResult construct in a Kotlin co-routine to make a call to connect to a Stomp server. The implementation of awaitResult looks for a failed future and then throws any error.

The consequences of this were that my code using awaitResult which was using a try/catch block with Exception failed to catch the error.

A connection error like this should result in a normal exception and not a Throwable. I changed things around to avoid using the awaitResult because of this.

Need access to the User object during destination matches

The AuthProvider only gets the username/password (no session ID)( and DefaultStompHandler ignores the returned User. The 'handleStomp' and 'handleConnect' methods are both private.. also the onAuthenticationRequest only gets the server not the connection so overriding the ConnectHandler didn't work also the result was a boolean..

Basically its difficult to associate an authenticated 'User' to a session. With that association authorization based on destination per client is possible.

The purpose is to have fine grained authorization based on the authenticated user to determine the subscriptions and the destinations eligible for sending.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.