MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.
-
Clone the MoP project from GitHub to your local.
git clone https://github.com/streamnative/mop.git cd mop
-
Build the project.
mvn clean install -DskipTests
-
The NAR file can be found at this location.
./mqtt-impl/target/pulsar-protocol-handler-mqtt-${version}.nar
Configure the Pulsar broker to run the MoP protocol handler as a plugin by adding configurations to the Pulsar configuration file, such as broker.conf
or standalone.conf
.
-
Set the configuration of the MoP protocol handler.
Add the following properties and set their values in the Pulsar configuration file, such as
conf/broker.conf
orconf/standalone.conf
.Property Suggested value Default value messagingProtocols
mqtt null protocolHandlerDirectory
Location of MoP NAR file ./protocols Example
messagingProtocols=mqtt protocolHandlerDirectory=./protocols
-
Set the MQTT server listeners.
The hostname in listeners should be the same as Pulsar broker's
advertisedAddress
.Example
mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1
After you install the MoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load the MoP protocol handler.
To use the proxy, follow the following steps. For detailed steps, refer to Deploy a cluster on bare metal.
-
Prepare a ZooKeeper cluster.
-
Initialize the cluster metadata.
-
Prepare a BookKeeper cluster.
-
Copy the
pulsar-protocol-handler-mqtt-${version}.nar
to the$PULSAR_HOME/protocols
directory. -
Start the Pulsar broker.
Here is an example of the Pulsar broker configuration.
messagingProtocols=mqtt protocolHandlerDirectory=./protocols brokerServicePort=6651 mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1 mqttProxyEnable=true mqttProxyPort=5682
There are many MQTT client that can be used to verify the MoP protocol handler, such as MQTTBox, MQTT Toolbox. You can choose a CLI tool or interface tool to verify the MoP protocol handler.
The following example shows how to verify the MoP protocol handler with FuseSource MqttClient.
-
Add the dependency.
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.16</version> </dependency>
-
Publish messages and consume messages.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) }; connection.subscribe(topics); // publish message connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false); // receive message Message received = connection.receive();