Giter Site home page Giter Site logo

moquette-io / moquette Goto Github PK

View Code? Open in Web Editor NEW
2.2K 152.0 809.0 6.04 MB

Java MQTT lightweight broker

Home Page: http://moquette-io.github.io/moquette/

License: Apache License 2.0

HTML 2.94% Java 88.69% Shell 0.32% Groovy 7.73% Ruby 0.02% Batchfile 0.31%
broker mqtt java moquette

moquette's Introduction

ServerIntegrationOpenSSLTestJava CI with Maven

Moquette Project

Build Status

Moquette aims to be a MQTT compliant broker. The broker supports QoS 0, QoS 1 and QoS 2.

Its designed to be evented, uses Netty for the protocol encoding and decoding part.

Embeddable

Freedomotic is an home automation framework and uses Moquette embedded to interface with MQTT by a specific plugin.

Moquette is also used into Atomize Spin a software solution for the logistic field.

Part of moquette are used into the Vertx MQTT module, into MQTT spy and into WSO2 Messge broker.

1 minute set up

Start play with it, download the self distribution tar from BinTray , the un untar and start the broker listening on 1883 port and enjoy!

tar xvf moquette-distribution-0.15.tar.gz
cd bin
./moquette.sh

Or if you are on Windows shell

 cd bin
 .\moquette.bat

Embedding in other projects

Include dependency in your project:

<dependency>
      <groupId>io.moquette</groupId>
      <artifactId>moquette-broker</artifactId>
      <version>0.15</version>
</dependency>

Build from sources

After a git clone of the repository, cd into the cloned sources and: ./gradlew package, at the end the distribution package is present at distribution/target/distribution-0.17-SNAPSHOT-bundle.tar.gz

In distribution/target directory will be produced the selfcontained file for the broker with all dependencies and a running script.

moquette's People

Contributors

aemaem avatar andreasmager avatar andsel avatar bobrowskim avatar dependabot[bot] avatar dserfe avatar fjcyue avatar hylkevds avatar kevto avatar komamitsu avatar konradmichael avatar mackristof avatar maggu2810 avatar mikedombo avatar mpaatz avatar mscheibler avatar muka avatar ningg avatar phameete avatar ricardojlrufino avatar runafter avatar salmanebah avatar sbaranov-iqmessenger avatar sbouchexiv avatar source-c avatar stannislin avatar swarwick avatar teranisi avatar wagnerluis1982 avatar windbender avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

moquette's Issues

Mermory Leak

Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:83)
org.dna.mqtt.moquette.parser.netty.PublishEncoder.encode(PublishEncoder.java:53)
org.dna.mqtt.moquette.parser.netty.PublishEncoder.encode(PublishEncoder.java:27)
org.dna.mqtt.moquette.parser.netty.MQTTEncoder.encode(MQTTEncoder.java:57)
org.dna.mqtt.moquette.parser.netty.MQTTEncoder.encode(MQTTEncoder.java:30)
io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:111)
io.netty.channel.DefaultChannelHandlerContext.invokeWrite(DefaultChannelHandlerContext.java:666)
io.netty.channel.DefaultChannelHandlerContext.write(DefaultChannelHandlerContext.java:724)
io.netty.channel.DefaultChannelHandlerContext.write(DefaultChannelHandlerContext.java:659)
org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler.write(MessageMetricsHandler.java:53)
io.netty.channel.DefaultChannelHandlerContext.invokeWrite(DefaultChannelHandlerContext.java:666)
io.netty.channel.DefaultChannelHandlerContext.access$2000(DefaultChannelHandlerContext.java:30)
io.netty.channel.DefaultChannelHandlerContext$AbstractWriteTask.write(DefaultChannelHandlerContext.java:945)
io.netty.channel.DefaultChannelHandlerContext$WriteAndFlushTask.write(DefaultChannelHandlerContext.java:999)
io.netty.channel.DefaultChannelHandlerContext$AbstractWriteTask.run(DefaultChannelHandlerContext.java:934)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
java.lang.Thread.run(Thread.java:745)

when I using the moquette 0.6 as the embeded mqtt server, One exception as above was caught, Is this normal?

allow_anonymous

Hi,

currently a client can connect without supplying any username and password, even if the configuration is set to use a password file (password_file parameter in moquette.conf). However, if the client specifies a username and password, they will be compared with the ones in password_file.
Mosquitto solves this with the allow_anonymous configuration parameter, and moquette could have a similar parameter.

Thank you

Websocket java.lang.NullPointerException using Paho mqttws31.js with Moquette .6

Reporter: bppause
Original: https://code.google.com/p/moquette-mqtt/issues/detail?id=53
I'm using 0.6 versiรณn with index.html you provided, and using mqttws31.js downloaded from http://www.eclipse.org/paho/clients/js/, and when trying to hit connect button, i get next error message on server:

18:35:27,746 DEBUG io.netty.util.ResourceLeakDetector:76 - -Dio.netty.noResourceLeakDetection: false
18:35:27,783 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - [id: 0xc41b4658, /127.0.0.1:2410 => /127.0.0.1:8080] WS Version V13 server handshake
18:35:27,786 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - WS Version 13 Server Handshake key: 2WP6OSP2ldJdlDY5NPZxBw==. Response: pzANkZagf3dP0kQ33Jq8676lnQI=.
18:35:27,789 WARN io.netty.channel.DefaultChannelPipeline:151 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.NullPointerException
at org.dna.mqtt.moquette.server.netty.NettyMQTTHandler.channelInactive(NettyMQTTHandler.java:81)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:214)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.codec.http.HttpObjectAggregator.channelInactive(HttpObjectAggregator.java:219)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.handler.codec.ReplayingDecoder.channelInactive(ReplayingDecoder.java:347)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:767)
at io.netty.channel.AbstractChannel$AbstractUnsafe$5.run(AbstractChannel.java:558)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:354)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:348)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:662)
18:35:28,098 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - [id: 0x03e036d5, /127.0.0.1:2412 => /127.0.0.1:8080] WS Version V13 server handshake
18:35:28,099 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - WS Version 13 Server Handshake key: SCvw+oSgc9B2l6x3dMc0SA==. Response: lF1UZN3/15mn05D1WxND16bjzsw=.
18:35:28,105 DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder:76 - Decoding WebSocket Frame opCode=2
18:35:28,106 DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder:76 - Decoding WebSocket Frame length=24
18:35:28,108 WARN io.netty.channel.DefaultChannelPipeline:151 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:263)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler$1.channelRead(WebSocketServerProtocolHandler.java:133)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:173)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:662)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1171)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1117)
at io.netty.buffer.AbstractByteBuf.getByte(AbstractByteBuf.java:330)
at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:563)
at org.dna.mqtt.moquette.parser.netty.Utils.decodeRemainingLenght(Utils.java:75)
at org.dna.mqtt.moquette.parser.netty.Utils.checkHeaderAvailability(Utils.java:47)
at org.dna.mqtt.moquette.parser.netty.MQTTDecoder.decode(MQTTDecoder.java:55)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
... 22 more

unsubscribe without clean m_sessionsStore

unsubscribe without clean m_sessionsStore, but when subscribe a topic, there is a "m_sessionsStore.addNewSubscription(newSubscription, clientID);" run in the subscribeSingleTopic function.

 @MQTTMessage(message = UnsubscribeMessage.class)
    void processUnsubscribe(ServerChannel session, UnsubscribeMessage msg) {
        List<String> topics = msg.topicFilters();
        int messageID = msg.getMessageID();
        String clientID = (String) session.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
        LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", topics, clientID);

        for (String topic : topics) {
            subscriptions.removeSubscription(topic, clientID);
        }
        //ack the client
        UnsubAckMessage ackMessage = new UnsubAckMessage();
        ackMessage.setMessageID(messageID);

        LOG.info("replying with UnsubAck to MSG ID {}", messageID);
        session.write(ackMessage);
    }
private void subscribeSingleTopic(Subscription newSubscription, final String topic) {
        LOG.info("<{}> subscribed to topic <{}> with QoS {}", 
                newSubscription.getClientId(), topic, 
                AbstractMessage.QOSType.formatQoS(newSubscription.getRequestedQos()));
        String clientID = newSubscription.getClientId();
        m_sessionsStore.addNewSubscription(newSubscription, clientID);
        subscriptions.add(newSubscription);

        //scans retained messages to be published to the new subscription
        Collection<IMessagesStore.StoredMessage> messages = m_messagesStore.searchMatching(new IMatchingCondition() {
            public boolean match(String key) {
                return  SubscriptionsStore.matchTopics(key, topic);
            }
        });

        for (IMessagesStore.StoredMessage storedMsg : messages) {
            //fire the as retained the message
            LOG.debug("send publish message for topic {}", topic);
            //forwardPublishQoS0(newSubscription.getClientId(), storedMsg.getTopic(), storedMsg.getQos(), storedMsg.getPayload(), true);
            Integer packetID = storedMsg.getQos() == QOSType.MOST_ONE ? null :
                    m_messagesStore.nextPacketID(newSubscription.getClientId());
            sendPublish(newSubscription.getClientId(), storedMsg.getTopic(), storedMsg.getQos(), storedMsg.getPayload(), true, packetID);
        }
    }

Error building from Github 0.7

Looks like its failing when building the broker- see below (note: its seems to be getting passed prior error with db files )

results :

ailed tests: testCleanSession_maintainClientSubscriptions_againstClientDestru
tion(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moq
ette.server.ServerIntegrationPahoTest)
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclips
.moquette.server.ServerIntegrationPahoTest)
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.
clipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.Serve
IntegrationPahoTest)
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.Serv
rIntegrationPahoTest)
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTe
t)
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): ex
ected:<2> but was:<1>
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrat
onPahoTest)
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.serve
.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server
ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.
erverIntegrationPahoTest)
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.
erverIntegrationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:
[Test my payload]>
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest)
checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartInt
grationTest): expected:<[Hello world MQTT!!]> but was:<[Test my payload]>
checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.
erverRestartIntegrationTest): expected:<[Hello world MQTT!!]> but was:<[Test my
payload]>
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: The DB storagefile C:\Users\Brian\moquette_store.mapdb already exists
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: Error deleting the moquette db file C:\Users\Brian\moquette_store.mapdb

ests run: 88, Failures: 25, Errors: 0, Skipped: 0

INFO] ------------------------------------------------------------------------
INFO] Reactor Summary:
INFO]
INFO] Moquette MQTT parent ............................... SUCCESS [ 0.781 s]
INFO] Moquette - Parser Commons .......................... SUCCESS [ 12.247 s]
INFO] Moquette - Netty Parser ............................ SUCCESS [ 9.518 s]
INFO] Moquette - broker .................................. FAILURE [01:11 min]
INFO] Moquette - distribution ............................ SKIPPED
INFO] Moquette - OSGi bundle ............................. SKIPPED
INFO] ------------------------------------------------------------------------
INFO] BUILD FAILURE
INFO] ------------------------------------------------------------------------
INFO] Total time: 01:34 min
INFO] Finished at: 2015-04-07T07:07:29-04:00
INFO] Final Memory: 16M/44M
INFO] ------------------------------------------------------------------------
ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.
2.4:test (default-test) on project moquette-broker: There are test failures.
ERROR]
ERROR] Please refer to C:\Users\Brian\Documents\GitHub\moquette\broker\target\s
refire-reports for the individual test results.
ERROR] -> [Help 1]
ERROR]
ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
h.
ERROR] Re-run Maven using the -X switch to enable full debug logging.
ERROR]
ERROR] For more information about the errors and possible solutions, please rea
the following articles:
ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExc
ption
ERROR]
ERROR] After correcting the problems, you can resume the build with the command

ERROR] mvn -rf :moquette-broker
cmd' is not recognized as an internal or external command,
perable program or batch file.

Failed to build the project

Hi,

I clone the latest src code and run "mvn clean package".It gives me following build failure error. Any ideas?

Results :

Failed tests:   testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testCleanSession_maintainClientSubscriptions_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
  checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists

Tests in error: 
  checkPlainConnect(org.eclipse.moquette.server.ServerIntegrationWebSocketTest): Address already in use
  checkPlainConnect(org.eclipse.moquette.server.ServerIntegrationWebSocketTest)
  checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.ServerRestartIntegrationTest): Address already in use
  checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartIntegrationTest): Address already in use
  testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest): Address already in use
  testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testCleanSession_maintainClientSubscriptions_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
  connectWithCredentials(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
  checkReplayofStoredPublishResumeAfter_a_disconnect_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
  checkReplayStoredPublish_forNoCleanSession_qos1(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
  checkWillTestmaentIsPublishedOnConnectionKill_noRetain(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
  checkQoS2SuscriberDisconnectReceivePersistedPublishes(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
  checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS0ReceiveQoS1publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS0ReceiveQoS1publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS1ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS1ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS1ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS1ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS2ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS2ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
  checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
  checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)

Tests run: 133, Failures: 14, Errors: 42, Skipped: 0

Maven build is stuck

I'm running: mvn clean install

Maven version: 3.3
JDL: OpenJDK 7

And the build process is always stuck at:

13:40:03,788 [pool-57-thread-1] ERROR SimpleMessaging onEvent 173  - Serious error processing the message org.eclipse.moquette.proto.messages.DisconnectMessage@5dd3c8ce for session [clientID: Publisher]org.eclipse.moquette.server.netty.NettyChannel@64953d6b
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
    ... 6 more
Caused by: java.lang.RuntimeException: Could not instantiate class
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
    at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
    at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
    at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
    at org.mapdb.Store.deserialize(Store.java:296)
    at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
    at org.mapdb.Store.get(Store.java:146)
    at org.mapdb.HTreeMap.removeInternal(HTreeMap.java:1055)
    at org.mapdb.HTreeMap.remove(HTreeMap.java:1010)
    at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.wipeSubscriptions(MapDBPersistentStore.java:249)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.cleanSession(ProtocolProcessor.java:280)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processDisconnect(ProtocolProcessor.java:530)
    ... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
    ... 26 more
May 14, 2015 1:40:48 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 10 org.eclipse.moquette.spi.impl.ValueEvent@5454d285
java.lang.RuntimeException: Could not instantiate class
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
    at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
    at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
    at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
    at org.mapdb.Store.deserialize(Store.java:296)
    at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
    at org.mapdb.Store.get(Store.java:146)
    at org.mapdb.HTreeMap.putInner(HTreeMap.java:893)
    at org.mapdb.HTreeMap.put(HTreeMap.java:849)
    at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.updateSubscriptions(MapDBPersistentStore.java:255)
    at org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore.deactivate(SubscriptionsStore.java:180)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:557)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:158)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
    ... 22 more

Exception in thread "pool-57-thread-1" java.lang.RuntimeException: java.lang.RuntimeException: Could not instantiate class
    at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not instantiate class
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
    at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
    at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
    at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
    at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
    at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
    at org.mapdb.Store.deserialize(Store.java:296)
    at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
    at org.mapdb.Store.get(Store.java:146)
    at org.mapdb.HTreeMap.putInner(HTreeMap.java:893)
    at org.mapdb.HTreeMap.put(HTreeMap.java:849)
    at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.updateSubscriptions(MapDBPersistentStore.java:255)
    at org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore.deactivate(SubscriptionsStore.java:180)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:557)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:158)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
    ... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
    at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
    ... 22 more

Error from org.kaazing.robot during mvn clean install

T E S T S

Running org.eclipse.moquette.connection.ConnectionIT
[ERROR] Internal Error. Sending error to client
java.lang.RuntimeException: Script not found: org/eclipse/moquette/connection/connect.then.close.rpt
at org.kaazing.robot.driver.control.handler.ControlServerHandler.prepareReceived(ControlServerHandler.java:148)
at org.kaazing.robot.driver.control.handler.ControlUpstreamHandler.messageReceived(ControlUpstreamHandler.java:40)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
[ERROR] Internal Error. Sending error to client
java.lang.RuntimeException: Script not found: org/eclipse/moquette/connection/connect.with.invalid.WillQoS.rpt
at org.kaazing.robot.driver.control.handler.ControlServerHandler.prepareReceived(ControlServerHandler.java:148)
at org.kaazing.robot.driver.control.handler.ControlUpstreamHandler.messageReceived(ControlUpstreamHandler.java:40)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Tests run: 3, Failures: 0, Errors: 2, Skipped: 1, Time elapsed: 0.212 sec <<< FAILURE! - in org.eclipse.moquette.connection.ConnectionIT
shouldConnectThenClose(org.eclipse.moquette.connection.ConnectionIT) Time elapsed: 0.127 sec <<< ERROR!
org.kaazing.robot.junit.RoboticException: Internal Error:Script not found: org/eclipse/moquette/connection/connect.then.close.rpt
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:126)
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.lang.Thread.run(Thread.java:722)

connectWithInvalidWillQoS(org.eclipse.moquette.connection.ConnectionIT) Time elapsed: 0.008 sec <<< ERROR!
org.kaazing.robot.junit.RoboticException: Internal Error:Script not found: org/eclipse/moquette/connection/connect.with.invalid.WillQoS.rpt
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:126)
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.lang.Thread.run(Thread.java:722)

Results :

Tests in error:
ConnectionIT.shouldConnectThenClose ยป Robotic Internal Error:Script not found:...
ConnectionIT.connectWithInvalidWillQoS ยป Robotic Internal Error:Script not fou...

Tests run: 3, Failures: 0, Errors: 2, Skipped: 1

[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO]
[INFO] --- robot-maven-plugin:0.0.0.12:stop (default) @ moquette-broker ---
[INFO]
[INFO] --- maven-failsafe-plugin:2.17:verify (default) @ moquette-broker ---
[INFO] Failsafe report directory: /home/andrea/workspace/moquette_github/broker/target/failsafe-reports
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Moquette MQTT parent .............................. SUCCESS [0.287s]
[INFO] Moquette - Parser Commons ......................... SUCCESS [1.669s]
[INFO] Moquette - Netty Parser ........................... SUCCESS [1.749s]
[INFO] Moquette - broker ................................. FAILURE [1:15.691s]
[INFO] Moquette - distribution ........................... SKIPPED
[INFO] Moquette - OSGi bundle ............................ SKIPPED

Fail to handle invalid subscription.

If MQTT client simply send a SUBSCRIBE request with invalid topic string(like this one: #MQTTClient).
Then the moquette server will just throw a exception on console/log, without close client's connection and rollback internal status.

Note: we cannot assume client will validate topic string before send to server. if client don't do it, the server's internal status will be incorrect.

publish2Subscribers problem

Steps that reproduce the problem:

  1. start broker with a clean system (=no db)
    2a. connect a subscriber
    2b. connect a publisher and publish some messages
    3b. broker works well
  2. kill broker
  3. kill subscribers
  4. restart broker
  5. connect and publish a message
  6. broker goes wrong because it tries to submit to old subscribers
    (see attached logs)

MOQUETTE.LOG
Maybe I didn't understand the logic behind the code but I think that when broker is started the old subscriptions need to be deleted from the db.

I'm using Moquette v0.7 on Windows 8.1 (my dev environment)

0 [main] DEBUG SubscriptionsStore - init invoked
6 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs []
7 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before

7 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after

8 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
8 [main] DEBUG ProtocolProcessor - subscription tree on init

531 [main] INFO NettyAcceptor - Server binded
535 [main] INFO NettyAcceptor - Server binded
536 [main] INFO NettyAcceptor - SSL is disabled
10187 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
10187 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqsub/24676-DVL
10189 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
10191 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqsub/24676-DVL>
10283 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqsub/24676-DVL>
10317 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
10319 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqsub/24676-DVL> with clean session true
10320 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL is a newcome, creating it's subscriptions set
10322 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10331 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type SUBSCRIBE
10331 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping SUBSCRIBE
10344 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10384 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping SUBSCRIBE
10384 [pool-1-thread-1] DEBUG ProtocolProcessor - processSubscribe invoked from client mosqsub/24676-DVL with msgID 1
10385 [pool-1-thread-1] INFO ProtocolProcessor - <mosqsub/24676-DVL> subscribed to topic 78:25:44:7E:E9:A0/D with QoS 0 - MOST_ONE
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10386 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10422 [pool-1-thread-1] DEBUG MapDBPersistentStore - searchMatching scanning all retained messages, presents are 0
10422 [pool-1-thread-1] DEBUG ProtocolProcessor - replying with SubAck to MSG ID 1
0 [main] DEBUG SubscriptionsStore - init invoked
13223 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
15744 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before

33059 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic 78:25:44:7E:E9:A0/D
58676 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic
66856 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]

87979 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
87980 [main] DEBUG ProtocolProcessor - subscription tree on init
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]

88685 [main] INFO NettyAcceptor - Server binded
88687 [main] INFO NettyAcceptor - Server binded
88689 [main] INFO NettyAcceptor - SSL is disabled
95949 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
95950 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqpub/27720-DVL
95952 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
95955 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqpub/27720-DVL>
125014 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqpub/27720-DVL>
125072 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
125098 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true] for client mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL is a newcome, creating it's subscriptions set
125099 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqpub/27720-DVL subscriptions set with new subscription
125100 [nioEventLoopGroup-3-1] INFO PublishDecoder - decode invoked with buffer UnpooledUnsafeDirectByteBuf(ridx: 1, widx: 833, cap: 1024)
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type PUBLISH
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping PUBLISH
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type DISCONNECT
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping DISCONNECT
125103 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event org.dna.mqtt.moquette.messaging.spi.impl.events.LostConnectionEvent@ff6c19
125115 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL subscriptions set now is [[filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true]]
125153 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping PUBLISH
125153 [pool-1-thread-1] TRACE ProtocolProcessor - PUB --PUBLISH--> SRV processPublish invoked with org.dna.mqtt.moquette.proto.messages.PublishMessage@15dcd1d
125153 [pool-1-thread-1] INFO ProtocolProcessor - Publish recieved from clientID <mosqpub/27720-DVL> on topic 78:25:44:7E:E9:A0/D with QoS MOST_ONE
125153 [pool-1-thread-1] DEBUG ProtocolProcessor - publish2Subscribers republishing to existing subscribers that matches the topic 78:25:44:7E:E9:A0/D
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - subscription tree
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]

133913 [pool-1-thread-1] DEBUG ProtocolProcessor - Broker republishing to client <mosqsub/24676-DVL> topic 78:25:44:7E:E9:A0/D qos <MOST_ONE>, active true
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - sendPublish invoked clientId <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D QoS MOST_ONE retained false messageID 1
145081 [pool-1-thread-1] INFO ProtocolProcessor - send publish message to <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
145082 [pool-1-thread-1] DEBUG ProtocolProcessor - clientIDs are {mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}

Server started, version 0.7-SNAPSHOT
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
dic 02, 2014 8:17:45 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 1 org.dna.mqtt.moquette.messaging.spi.impl.ValueEvent@c625e1
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more

Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
... 3 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more

Reporter: diegovis
Original: https://code.google.com/p/moquette-mqtt/issues/detail?id=54

Unable to replace existed topic with different cleanSession

The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:

 // org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java

    void addSubscription(Subscription s) {
        //avoid double registering for same clientID, topic and QoS
        if (m_subscriptions.contains(s)) {
            return;
        }
        //remove existing subscription for same client and topic but different QoS
        ...
    }

the problem here: if client send SUBSCRIBE with same clientID, topic and QoS, but different cleanSession Flag(Subscription.m_cleanSession). the code will ignore changed cleanSession flag.

Class Not Found when connect

Using moquette-bundle with equinox console, When a publisher/subscriber attempts to connect, there is an ClassNotFoundException on class org.eclipse.moquette.proto.messages.AbstractMessage$QOSType.

I'm using 0.7 version on Ubuntu 14.10

I' m trying to use Moquette bundle with equinox console. I didn't have any problems during the compiling phase. These are the bundles that i use:

id State Bundle
0 ACTIVE org.eclipse.osgi_3.10.1.v20140909-1633
1 ACTIVE org.eclipse.equinox.console_1.1.0.v20140131-1639
2 ACTIVE org.apache.felix.gogo.command_0.10.0.v201209301215
3 ACTIVE org.apache.felix.gogo.runtime_0.10.0.v201209301036
4 ACTIVE org.apache.felix.gogo.shell_0.10.0.v201212101605
5 ACTIVE io.netty.buffer_4.0.24.Final
6 ACTIVE io.netty.codec_4.0.24.Final
7 ACTIVE io.netty.codec-http_4.0.24.Final
8 ACTIVE io.netty.common_4.0.24.Final
9 ACTIVE io.netty.handler_4.0.24.Final
10 ACTIVE io.netty.transport_4.0.24.Final
11 ACTIVE org.apache.felix.javax.servlet_1.0.0
12 ACTIVE slf4j.api_1.7.10
Fragments=13
13 RESOLVED slf4j.log4j12_1.7.10
Master=12
14 ACTIVE log4j_1.2.17
15 ACTIVE org.mapdb.mapdb_1.1.0.SNAPSHOT
16 ACTIVE org.eclipse.moquette.bundle_0.7.0.SNAPSHOT

I attach the start up script and the moquette.log.

PS I don't have this problem with main version.
screen1
screen2
screen3

Publishing file

It is possible to publish a file? If yes could you tell me how the file should be encoded?

Dependency injection

Hi,

my goal is to be able to use moquette with my own IAuthenticator.

What I think is necessary to achieve this :

  • SimpleMessaging should not be a singleton anymore (pretty easy since it is only used by Server class)
  • All the dependency of it should be injected instead of being created inside init method (ex: IAuthenticator)
  • regroup all authentication stuff into a package

If you agree, I'd like to do that in my fork and then crate a PR.

Are you ok with that ?

config settings: allow system properties

Do you think it would be ok to use system properties for the values of the properties in the configuration file?
We could extend the parse method of the ConfigurationParser to replace system properties in the value (using: https://commons.apache.org/proper/commons-lang/javadocs/api-2.6/org/apache/commons/lang/text/StrSubstitutor.html).

If someone would configure the the moquette database path in the configuration file and does not want to use absolute path he could use e.g. the value "${moquette.path}/storage/moquette_store.mapdb"

Error when building 0,7

I am getting this error when running mvn clean package and mvn clean install

Note: I deleted the moquette_store.mapdb and .wal files is there somthing else i was supposed to drop ??
u mentioned in defect 19 - Hi these types of errors are flapping, due to the fact (I'll fix it) that the file ~/moquette.mapdb* are not removed. It should solve dropping that file

  • subscribed to topic with QoS 0 - MOST_ONE
    4:41:22,034 [pool-29-thread-1] DEBUG ProtocolProcessor subscribeSingleTopic 619
  • send publish message for topic /topic
    4:41:22,036 [pool-29-thread-1] ERROR SimpleMessaging onEvent 167 - Grave error
    processing the message org.eclipse.moquette.proto.messages.SubscribeMessage@1f6
    dc5 for session [clientID: TestClient]org.eclipse.moquette.server.netty.NettyCh
    nnel@1c4390a
    ava.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
    port.java:66)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
    java:161)
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
    java:51)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:1
    8)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
    ava:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
    java:615)
    at java.lang.Thread.run(Thread.java:745)
    aused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
    ava:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
    orImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
    port.java:64)
    ... 6 more
    aused by: java.util.NoSuchElementException
    at java.util.HashMap$HashIterator.nextEntry(HashMap.java:925)
    at java.util.HashMap$KeyIterator.next(HashMap.java:956)
    at java.util.Collections.max(Collections.java:698)
    at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.nextPacketI
    (MapDBPersistentStore.java:214)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.subscribeSingleTopic(
    rotocolProcessor.java:621)
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processSubscribe(Prot
    colProcessor.java:585)
    ... 11 more

Hello andsel The latest code , publish message error

TestCase

Client A-----> Connection Server
Client A publish message [ retian:false)]

Qos:0 or 1 or 2 Will occur error

error :code

ERROR SimpleMessaging - Serious error processing the message org.eclipse.moquette.proto.messages.PublishMessage@18f93d85 for session [clientID: A]org.eclipse.moquette.server.netty.NettyChannel@1335f392
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:166)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:56)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client in cache <{A=ConnectionDescriptor{m_clientID=A, m_cleanSession=true}}>
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:411)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.forward2Subscribers(ProtocolProcessor.java:360)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:288)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more

Error when building 0,7 bundle

I am getting this error when running mvn install pax:provision

[ERROR] Failed to execute goal on project moquette-bundle: Could not resolve dep
endencies for project org.eclipse.moquette:moquette-bundle:bundle:0.7-SNAPSHOT:
Could not find artifact org.eclipse.moquette:moquette-broker:jar:0.7-SNAPSHOT in
Paho Releases (https://repo.eclipse.org/content/repositories/paho-releases/) ->
[Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
ch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyReso
lutionException
'cmd' is not recognized as an internal or external command,
operable program or batch file.

Feature request - plugins

Dear Andrea,

We need to extend Moquette for customizing authentication and autherization functionalities. The code includes two authentication scheme but it does not allow to plug ours in by configuration. Additionaly, we'd like to catch 'subscribe' events, and on an authorization check the server should let the subscription request would be accepted or npt.

Thank you

Cannot custom the message store

import org.dna.mqtt.moquette.messaging.spi.IMatchingCondition;
import org.dna.mqtt.moquette.messaging.spi.impl.HawtDBPersistentStore.StoredMessage;
import org.dna.mqtt.moquette.messaging.spi.impl.events.PublishEvent;
import org.dna.mqtt.moquette.proto.messages.AbstractMessage.QOSType;

public interface IMessagesStore {
void initStore();

void storeRetained(String var1, ByteBuffer var2, QOSType var3);

Collection<StoredMessage> searchMatching(IMatchingCondition var1);

StoredMessage is protected cannot be accessed by others

hello andsel my use moquette 0.7 Strange bug

test case moquette 0.7 update for 2015-1-26
client eclipse.paht.javaclient version :1.0.1

aclient (clear session false)
bclient (clear session false)

a -> connection Server
b -> connection Server
b -> subscriptions "abc/topic" (qos =0)
b -> distconnection or lostconnection

a ->publish ("abc/topic","test1",qos=2,false);
...
a ->publish ("abc/topic","test10",qos=2,false);

b -> connection Server
received message test1
...
received message test10

b-> distconnection Server or loast connection
b->connnection Server
received message test1
...
received message test10

why repeat received message

io.netty.handler.codec.CorruptedFrameException:

io.netty.handler.codec.CorruptedFrameException: Received a message with fixed header flags (a) != expected (2)
at org.dna.mqtt.moquette.parser.netty.DemuxDecoder.genericDecodeCommonHeader(DemuxDecoder.java:62)
at org.dna.mqtt.moquette.parser.netty.DemuxDecoder.decodeCommonHeader(DemuxDecoder.java:44)
at org.dna.mqtt.moquette.parser.netty.PubRelDecoder.decode(PubRelDecoder.java:36)
at org.dna.mqtt.moquette.parser.netty.MQTTDecoder.decode(MQTTDecoder.java:71)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:253)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:744)

metrics collecting cause memory leak(MessageMetricsCollector)

org.eclipse.moquette.server.netty.metrics.MessageMetricsCollector class contains a memory queue called allMetrics, it counting message read & write on all channel.
However, it only add MessageMetrics object once channel get closed, the problem here is there are no other way to remove MessageMetrics object. and it will run out of memory someday.

Check whether client is valid before close old channel with same clientID.

The CONNECT message process code: ProtocolProcessor.processConnect will first try to close presented connection if it has same clientID with current requester:

void processConnect(ServerChannel session, ConnectMessage msg) {

    // it should check requester first before close old connection
    if( m_clientIDs.contains(requester.clientID)){
      //  close old connection here
    }

    if (authenticator != null && !authenticator.checkValid(msg.isUserFlag() ? msg.getUsername() : null, msg.isPasswordFlag() ? msg.getPassword().toCharArray() : null))
    { 
            // ...
    }
}

it probably more reasonable to first check whether current requester is valid. because this can prevent evil client(without get authorized) to break exist connection.

Potential memory leak in moquette's usage of LMAX ringbuffer.

moquette use ringbuffer to publish/batch event to ProtocolProcessor, and the ProtocolProcessor using another ringbuffer to dispatch output event to netty.

By default, the ringbuffer will pre-allocate an array of event object(ValueEvent), and every event object will be referenced to this array until disruptor get closed.

The problems of here are moquette(SimpleMessaging.disruptorPublish and ProtocolProcessor.disruptorPublish) try to associated netty's channel(ServerChannel) with event object, which means, the ringbuffer always contain references of netty's channel until buffer overwrite.

Every established netty channel contains quite a lot data(such as pooled byte buffer, channel pipelines...). so if user's JVM memory low than netty's channel size * buffer size, OOM Exception will be thrown.

it may be a good idea to clean event object(it contains a reference to netty's channel) after event object get processed, for example, in ProtocolProcessor, we may use:

  /**
     * Called when a publisher has published an event to the {@link RingBuffer}
     *
     * @param event published to the {@link RingBuffer}
     * @param sequence of the event being processed
     * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
     * @throws Exception if the EventHandler would like the exception handled further up the chain.
     */
    @Override
    public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
        //It's always of type OutputMessagingEvent
        OutputMessagingEvent outEvent = (OutputMessagingEvent) event.getEvent();
        try {
            final ServerChannel serverChannel = outEvent.getChannel();
            final AbstractMessage outputEvent = outEvent.getMessage();
            if(LOG.isDebugEnabled()){
                LOG.debug("Output event, sending {}", outputEvent);
            }
            serverChannel.write(outputEvent);
        }
        finally {
            // free event resource
            event.setEvent(null);
        }
    }

same code should apply to SimpleMessaging as well.

Error when publishing large message

Hello, we are having the following issue after publishing a large message to the broker. The message in question is the application log and can have 1MB.

Note that the message is published and received by the subscriber clients, but after the following error, we are unable to publish more messages, only after restarting the broker.

Any further help feel free to ask.

Serious error processing the message org.eclipse.moquette.proto.messages.PublishMessage@1731782d for session [clientID: spin-device-16]org.eclipse.moquette.server.netty.NettyChannel@6c2bd990
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167) [moquette-broker-0.7-SNAPSHOT.jar:na]
    at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57) [moquette-broker-0.7-SNAPSHOT.jar:na]
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128) [disruptor-3.3.2.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.reflect.InvocationTargetException: null
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
    at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
    ... 6 common frames omitted
Caused by: org.mapdb.DBException$VolumeIOError: IO failed
    at org.mapdb.Volume$RandomAccessFileVol.getLong(Volume.java:1896) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.Volume$ReadOnly.getLong(Volume.java:1672) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.StoreDirect.offsetsGet(StoreDirect.java:357) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.StoreDirect.delete2(StoreDirect.java:413) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.StoreCached.flushWriteCacheSegment(StoreCached.java:349) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.StoreWAL.commit(StoreWAL.java:760) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.mapdb.DB.commit(DB.java:2227) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.cleanInFlight(MapDBPersistentStore.java:190) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:309) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
    at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:295) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
    ... 11 common frames omitted
Caused by: java.io.EOFException: null
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:827) ~[na:1.7.0_71]
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:860) ~[na:1.7.0_71]
    at org.mapdb.Volume$RandomAccessFileVol.getLong(Volume.java:1894) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
    ... 20 common frames omitted

Error building from Github 0.7

esults :

ailed tests: testCleanSession_maintainClientSubscriptions_againstClientDestru
tion(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moq
ette.server.ServerIntegrationPahoTest)
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclips
.moquette.server.ServerIntegrationPahoTest)
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.
clipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.Serve
IntegrationPahoTest)
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.Serv
rIntegrationPahoTest)
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTe
t)
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): ex
ected:<2> but was:<1>
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrat
onPahoTest)
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.serve
.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server
ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.
erverIntegrationPahoTest)
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.
erverIntegrationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:
[Test my payload]>
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest)
checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartInt
grationTest): expected:<[Hello world MQTT!!]> but was:<[Test my payload]>
checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.
erverRestartIntegrationTest): expected:<[Hello world MQTT!!]> but was:<[Test my
payload]>
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: The DB storagefile C:\Users\Brian\moquette_store.mapdb already exists
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: Error deleting the moquette db file C:\Users\Brian\moquette_store.mapdb

ests run: 88, Failures: 25, Errors: 0, Skipped: 0

INFO] ------------------------------------------------------------------------
INFO] Reactor Summary:
INFO]
INFO] Moquette MQTT parent ............................... SUCCESS [ 0.781 s]
INFO] Moquette - Parser Commons .......................... SUCCESS [ 12.247 s]
INFO] Moquette - Netty Parser ............................ SUCCESS [ 9.518 s]
INFO] Moquette - broker .................................. FAILURE [01:11 min]
INFO] Moquette - distribution ............................ SKIPPED
INFO] Moquette - OSGi bundle ............................. SKIPPED
INFO] ------------------------------------------------------------------------
INFO] BUILD FAILURE
INFO] ------------------------------------------------------------------------
INFO] Total time: 01:34 min
INFO] Finished at: 2015-04-07T07:07:29-04:00
INFO] Final Memory: 16M/44M
INFO] ------------------------------------------------------------------------
ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.
2.4:test (default-test) on project moquette-broker: There are test failures.
ERROR]
ERROR] Please refer to C:\Users\Brian\Documents\GitHub\moquette\broker\target\s
refire-reports for the individual test results.
ERROR] -> [Help 1]
ERROR]
ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
h.
ERROR] Re-run Maven using the -X switch to enable full debug logging.
ERROR]
ERROR] For more information about the errors and possible solutions, please rea
the following articles:
ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExc
ption
ERROR]
ERROR] After correcting the problems, you can resume the build with the command

ERROR] mvn -rf :moquette-broker
cmd' is not recognized as an internal or external command,
perable program or batch file.

Error building from Github 0.7

Error below -

Note: Also is there a way to provide the jars for .7 in mean time as we have been unable to build via maven

using Maven version 3.2.5
referencing - https://github.com/andsel/moquette

20:39:10,312 [pool-1-thread-1] ERROR SimpleMessaging onEvent 167 - Grave error
processing the message org.eclipse.moquette.proto.messages.PublishMessage@1bf0f5
b for session [clientID: Publisher]org.eclipse.moquette.server.netty.NettyChanne
l@1d44d0d
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
pport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
.java:161)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
.java:51)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:1
28)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
pport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for cli
ent in cache <{Publisher=ConnectionDescriptor{m_clientID=Publisher,
m_cleanSession=true}, Subscriber=ConnectionDescriptor{m_clientID=Subscriber, m_
cleanSession=false}}>
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolP
rocessor.java:411)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.forward2Subscribers(P
rotocolProcessor.java:360)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(Protoc
olProcessor.java:294)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(Protoc
olProcessor.java:281)
... 11 more

Duplicated subscription in session if client subscribe to same topic with different QOS

The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:

   // org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java

    void addSubscription(Subscription s) {
        //avoid double registering for same clientID, topic and QoS
        if (m_subscriptions.contains(s)) {
            return;
        }
        //remove existing subscription for same client and topic but different QoS
        int existingSubIdx = Collections.binarySearch(m_subscriptions, s, new ClientIDComparator());
        if (existingSubIdx >= 0) {
            m_subscriptions.remove(existingSubIdx);
        }

        m_subscriptions.add(s);
    }

The code try to locate duplicated element in m_subscriptions by using Binary-search. However, i don't understand how this can works without sort the array first.

With this simple test code, the m_subscriptions will contains duplicated subscription with different QOS:

       SubscriptionsStore subscriptionsStore = // init;

        Subscription s1 = new Subscription("client1","client/test/a", AbstractMessage.QOSType.MOST_ONCE,true);
        Subscription s2 = new Subscription("client8","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
        Subscription s3 = new Subscription("client3","client/test/c", AbstractMessage.QOSType.MOST_ONCE,true);

        subscriptionsStore.add(s1);
        subscriptionsStore.add(s3);
        subscriptionsStore.add(s2);
        subscriptionsStore.add(s2);

        Subscription s4 = new Subscription("client2","client/test/b", AbstractMessage.QOSType.LEAST_ONCE,true);
        subscriptionsStore.add(s4);
        Subscription s5 = new Subscription("client5","client/test/b", AbstractMessage.QOSType.LEAST_ONCE,true);
        subscriptionsStore.add(s5);

        Subscription s6 = new Subscription("client6","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
        subscriptionsStore.add(s6);
        Subscription s7 = new Subscription("client7","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
        subscriptionsStore.add(s7);
        Subscription s8 = new Subscription("client1","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
        subscriptionsStore.add(s8);

        Subscription s9 = new Subscription("client1","client/test/b", AbstractMessage.QOSType.EXACTLY_ONCE,true);
        subscriptionsStore.add(s9);

        System.out.println(subscriptionsStore.dumpTree());

       // contains two subscription of client1 with different QOS: MOST_ONCE and EXACTLY_ONCE

I think there are some defects

First of all, I English level is not very good, please forgive me!In the process I use the moquette, I think I found several problems.
In the org.eclipse.moquette.spi.impl.ProtocolProcessor class, there is a sentence:
If (m_clientIDs.get (clientId) = = null) {
Throw new RuntimeException (String.format ("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", clientId, m_clientIDs));
}
This will cause the publisher obstruction.

Also, When broker announced the news to client, if client is persistent, suddenly closed the connection or throws an exception, this time is no receipt to broker has been receiving complete. Broker will never again release relevant information to the client.

session memory leak on NettyMQTTHandler

the org.eclipse.moquette.server.netty.NettyMQTTHandler class contains a HashMap field(m_channelMapper):

public class NettyMQTTHandler extends ChannelInboundHandlerAdapter{
    private final Map<ChannelHandlerContext, NettyChannel> m_channelMapper = new HashMap<ChannelHandlerContext, NettyChannel>();
}

which seems used to maintain mapping relation between netty's channel and moquette's ServerChannel.
however, when running on heavy-loaded server, this HashMap will only add more data without remove anything. and finally run out of memory.
The problems that i found:

  • the m_channelMapper using ChannelHandlerContext as key, however, netty's ChannelHandlerContext didn't implement hashcode & equals:
if (!m_channelMapper.containsKey(ctx)) {
    m_channelMapper.put(ctx, new NettyChannel(ctx));
}
  • When client send DISCONNECT event, only ProtocolProcessor process the event, and the m_channelMapper will not remove associated value(NettyChannel).
  • When client terminate the TCP connection(for example, connection reset by peer), the ChannelHandler.exceptionCaught method will be called, but NettyMQTTHandler didn't implements such method and clean associated value(NettyChannel).
  • And because above problem, some established netty channel will remains in m_channelMapper, and every channel contains quite a lot data(such as pooled byte buffer, channel pipelines...).

some fix suggestions:

public class NettyMQTTHandler extends ChannelInboundHandlerAdapter
{
    private final ChannelFutureListener remover = future -> removeSession(future.channel());
    private final Map<ChannelId, NettyChannel> m_channelMapper = new HashMap<ChannelId, NettyChannel>();

    public void channelRead(ChannelHandlerContext ctx, Object message) {
           // ....
           Channel channel = ctx.channel();
           NettyChannel protocolChannel = sessions.computeIfAbsent(channel.id(), key -> {
                 // register close hook, when ProtocolProcessor close the channel, we remove associated value.
                  channel.closeFuture().addListener(remover);
                  return new NettyChannel(ctx);
           });
           messageDispatcher.handleProtocolMessage(protocolChannel, msg);
    }

   private void removeSession(Channel channel){
        if(sessions.remove(channel.id()) != null){
            channel.closeFuture().removeListener(remover);
       }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);

        // always try to notify connection lost, because physics connection may be already closed.
        // scenario:
        //      1.client send CONNECT message and terminate immediately(CONNECT message didn't get processed).
        //      2.ProtocolProcessor process CONNECT message with a dead nettyChannel(already removed from sessions).
        //      3.The timer trigger this method, and we try to notify this event if possible.
        //
        if(ctx.hasAttr(NettyChannel.ATTR_KEY_CLIENTID)){
            String clientID = (String) ctx.attr(NettyChannel.ATTR_KEY_CLIENTID).get();
            if(clientID != null && ! clientID.isEmpty()){
                // NOTE: this requires us compare NettyChannel using equals rather than reference(A == B).
                messageDispatcher.lostConnection(new NettyChannel(ctx), clientID);
            }

            // otherwise, this channel already get properly clean?
        }

        Channel channel = ctx.channel();
        NettyChannel protocolChannel = sessions.get(channel.id());
        if(protocolChannel == null){
            ctx.close();
            return;
        }

        removeSession(channel);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        NettyChannel protocolChannel = sessions.get(channel.id());
        // protocolChannel == null? this may happens if client didn't write anything(channelRead).
        if(protocolChannel != null){
            String clientID = (String) protocolChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
            // clientID may be null if client send CONNECT message and terminate this connection immediately(CONNECT message didn't get processed).
            // in this case, we only hope the CONNECT message get processed properly and setup timer(call channelInactive) to clean resource.
            // otherwise, we propagate event and clean resource immediately.
            if(clientID != null && ! clientID.isEmpty()){
                messageDispatcher.lostConnection(protocolChannel, clientID);
            }
            removeSession(channel);
        }

        // Close the connection explicitly just in case the transport
        // did not close the connection automatically.
        if (channel.isActive()) {
          ctx.close();
        }
        // we are last handler, no need to propagate exception.
        // super.exceptionCaught(ctx, cause);
    }
}

public class NettyChannel {
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof NettyChannel)) return false;

        NettyChannel that = (NettyChannel) o;
        return !(m_channel != null ? !m_channel.channel().id().equals(that.m_channel.channel().id()) :   that.m_channel != null);
    }
}

Clustering Possibility?

Hi
I'm not sure if it is okay to leave a question in this Issue page, but here it goes.

I am wondering if moquette is available to be clustered.

Thank you. Apologies if questions weren't appropriate in this board

Client-Id length

Please Change the limitation of clientId length from 23 to more value for productivity.
i know this is an standard for low constrained devices but it is a real limitation when using for device identification.
i think 1024 is better

Problem with Will message

Hi,
my problem is about sending the will message. When I simulate a lost connection, I have the following error:

2015-02-19 20:15:15,476 INFO ProtocolProcessor - Lost connection with client <0.0.1>
2015-02-19 20:15:15,477 INFO ProtocolProcessor - Publish received from clientID <0.0.1> on topic <topic/diagnosticServer> with QoS EXACTLY_ONCE
feb 19, 2015 8:15:15 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
GRAVE: Exception processing: 5 org.eclipse.moquette.spi.impl.ValueEvent@149e8424
java.lang.NullPointerException
at org.eclipse.moquette.spi.impl.events.PublishEvent.(PublishEvent.java:44)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:301)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:339)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:551)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:146)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:50)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.NullPointerException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.eclipse.moquette.spi.impl.events.PublishEvent.(PublishEvent.java:44)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:301)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:339)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:551)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:146)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:50)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
... 3 more

This is the client side code:

client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setWill(dgnServerTopic, my_message.getBytes(), 2, true);
client.setCallback(this);
client.connect(connOpts);

Hello using last code error

error option

A connection server (clear session false)-> subscription /sys/abc -> distconnection
B connection server (clear session false )-> publish message (topic /sys/abc qos =2)
A connection server (clear session false) ...server error

error code
13223 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
13232 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
13232 [pool-1-thread-1] INFO ProtocolProcessor - No stored messages for client
34268 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
34274 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
34274 [pool-1-thread-1] INFO ProtocolProcessor - No stored messages for client
44705 [pool-1-thread-1] INFO ProtocolProcessor - subscribed to topic </sys/abc> with QoS 0 - MOST_ONE
47234 [pool-1-thread-1] INFO ProtocolProcessor - DISCONNECT client with clean session false
60281 [pool-1-thread-1] INFO ProtocolProcessor - PUBLISH from clientID on topic </sys/abc> with QoS EXACTLY_ONCE
64265 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
64269 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
64270 [pool-1-thread-1] INFO ProtocolProcessor - republishing stored messages to client
64270 [pool-1-thread-1] INFO ProtocolProcessor - send publish message to on topic </sys/abc>
64272 [pool-1-thread-1] ERROR SimpleMessaging - Serious error processing the message Connect [clientID: AAAAA, prot: MQTT, ver: 04, clean: false] for session [clientID: AAAAA]org.eclipse.moquette.server.netty.NettyChannel@78c36c6f
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Internal bad error, trying to forwardPublish a QoS 0 message with PacketIdentifier: 0
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:416)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.republishStoredInSession(ProtocolProcessor.java:263)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnect(ProtocolProcessor.java:240)
... 11 more

EADDRNOTAVAIL (Cannot assign requested address)

Hi,

I tried to setup MQTT Broker on an android devices. But I always get this error
java.net.BindException: bind failed: EADDRNOTAVAIL (Cannot assign requested address)

Should I change HOST "0.0.0.0" to a new local ip like "192.168.178.111"?

Duplication of subscription with different cleanSession

The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:

 // org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java

    void addSubscription(Subscription s) {
        //avoid double registering for same clientID, topic and QoS
        if (m_subscriptions.contains(s)) {
            return;
        }
        //remove existing subscription for same client and topic but different QoS
        ...
    }

the problem here: if client send SUBSCRIBE with same clientID, topic and QoS, but different cleanSession Flag(Subscription.m_cleanSession). the code will ignore changed cleanSession flag.

and when client publish message, the server will never be able to store QoS 1 message if client change it's cleanSession flag from true to false(via reconnect).

// this check will never be true if client first connect with cleanSession = true, and subscribe
// something, then client crash, and reconnect via cleanSession = false, subscribe same topics
// this time, the `subscriber.isCleanSession()` always return true.
  if (!subscriber.isCleanSession() && !subscriber.isActive()) {
      //clone the event with matching clientID and store it for future publish.
      messagesStore.storePublishForFuture(new PublishEvent(topic, publishQos, message, retain, subscriberClientId, publishEventMsgID));
      // ...
   }

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.