Steps that reproduce the problem:
- start broker with a clean system (=no db)
2a. connect a subscriber
2b. connect a publisher and publish some messages
3b. broker works well
- kill broker
- kill subscribers
- restart broker
- connect and publish a message
- broker goes wrong because it tries to submit to old subscribers
(see attached logs)
MOQUETTE.LOG
Maybe I didn't understand the logic behind the code but I think that when broker is started the old subscriptions need to be deleted from the db.
I'm using Moquette v0.7 on Windows 8.1 (my dev environment)
0 [main] DEBUG SubscriptionsStore - init invoked
6 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs []
7 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before
7 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after
8 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
8 [main] DEBUG ProtocolProcessor - subscription tree on init
531 [main] INFO NettyAcceptor - Server binded
535 [main] INFO NettyAcceptor - Server binded
536 [main] INFO NettyAcceptor - SSL is disabled
10187 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
10187 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqsub/24676-DVL
10189 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
10191 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqsub/24676-DVL>
10283 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqsub/24676-DVL>
10317 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
10319 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqsub/24676-DVL> with clean session true
10320 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL is a newcome, creating it's subscriptions set
10322 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10331 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type SUBSCRIBE
10331 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping SUBSCRIBE
10344 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10384 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping SUBSCRIBE
10384 [pool-1-thread-1] DEBUG ProtocolProcessor - processSubscribe invoked from client mosqsub/24676-DVL with msgID 1
10385 [pool-1-thread-1] INFO ProtocolProcessor - <mosqsub/24676-DVL> subscribed to topic 78:25:44:7E:E9:A0/D with QoS 0 - MOST_ONE
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10386 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10422 [pool-1-thread-1] DEBUG MapDBPersistentStore - searchMatching scanning all retained messages, presents are 0
10422 [pool-1-thread-1] DEBUG ProtocolProcessor - replying with SubAck to MSG ID 1
0 [main] DEBUG SubscriptionsStore - init invoked
13223 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
15744 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before
33059 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic 78:25:44:7E:E9:A0/D
58676 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic
66856 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
87979 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
87980 [main] DEBUG ProtocolProcessor - subscription tree on init
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
88685 [main] INFO NettyAcceptor - Server binded
88687 [main] INFO NettyAcceptor - Server binded
88689 [main] INFO NettyAcceptor - SSL is disabled
95949 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
95950 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqpub/27720-DVL
95952 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
95955 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqpub/27720-DVL>
125014 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqpub/27720-DVL>
125072 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
125098 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true] for client mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL is a newcome, creating it's subscriptions set
125099 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqpub/27720-DVL subscriptions set with new subscription
125100 [nioEventLoopGroup-3-1] INFO PublishDecoder - decode invoked with buffer UnpooledUnsafeDirectByteBuf(ridx: 1, widx: 833, cap: 1024)
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type PUBLISH
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping PUBLISH
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type DISCONNECT
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping DISCONNECT
125103 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event org.dna.mqtt.moquette.messaging.spi.impl.events.LostConnectionEvent@ff6c19
125115 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL subscriptions set now is [[filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true]]
125153 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping PUBLISH
125153 [pool-1-thread-1] TRACE ProtocolProcessor - PUB --PUBLISH--> SRV processPublish invoked with org.dna.mqtt.moquette.proto.messages.PublishMessage@15dcd1d
125153 [pool-1-thread-1] INFO ProtocolProcessor - Publish recieved from clientID <mosqpub/27720-DVL> on topic 78:25:44:7E:E9:A0/D with QoS MOST_ONE
125153 [pool-1-thread-1] DEBUG ProtocolProcessor - publish2Subscribers republishing to existing subscribers that matches the topic 78:25:44:7E:E9:A0/D
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - subscription tree
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
133913 [pool-1-thread-1] DEBUG ProtocolProcessor - Broker republishing to client <mosqsub/24676-DVL> topic 78:25:44:7E:E9:A0/D qos <MOST_ONE>, active true
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - sendPublish invoked clientId <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D QoS MOST_ONE retained false messageID 1
145081 [pool-1-thread-1] INFO ProtocolProcessor - send publish message to <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
145082 [pool-1-thread-1] DEBUG ProtocolProcessor - clientIDs are {mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}
Server started, version 0.7-SNAPSHOT
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
dic 02, 2014 8:17:45 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 1 org.dna.mqtt.moquette.messaging.spi.impl.ValueEvent@c625e1
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
... 3 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more
Reporter: diegovis
Original: https://code.google.com/p/moquette-mqtt/issues/detail?id=54