c# mqtt 3.1.1 client
官方交流群: 241445317
PM> Install-Package MqttFx
class Program
{
static async Task Main(string[] args)
{
var services = new ServiceCollection();
services.AddMqttClient(options =>
{
options.Host = "118.126.96.166";
});
var container = services.BuildServiceProvider();
var client = container.GetService<MqttClient>();
client.Connected += Client_Connected;
client.Disconnected += Client_Disconnected;
client.MessageReceived += Client_MessageReceived;
if (await client.ConnectAsync() == ConnectReturnCode.ConnectionAccepted)
{
var top = "$SYS/brokers/+/clients/#";
Console.WriteLine("Subscribe:" + top);
var rcs = (await client.SubscribeAsync(top, MqttQos.AtMostOnce)).ReturnCodes;
foreach (var rc in rcs)
{
Console.WriteLine(rc);
}
for (int i = 1; i < int.MaxValue; i++)
{
await client.PublishAsync("/World", Encoding.UTF8.GetBytes($"Hello World!: {i}"), MqttQos.AtLeastOnce);
await Task.Delay(1000);
//Console.ReadKey();
}
}
Console.ReadKey();
}
private static void Client_Connected(object sender, MqttClientConnectedEventArgs e)
{
Console.WriteLine("Connected Ssuccessful!");
}
private static void Client_Disconnected(object sender, MqttClientDisconnectedEventArgs e)
{
Console.WriteLine("Disconnected");
}
private static void Client_MessageReceived(object sender, MqttMessageReceivedEventArgs e)
{
//$SYS/brokers/+/clients/+/connected
//$SYS/brokers/+/clients/+/disconnected
//$SYS/brokers/+/clients/#
var message = e.Message;
var payload = Encoding.UTF8.GetString(message.Payload);
if (new Regex(@"\$SYS/brokers/.+?/connected").Match(message.Topic).Success)
{
//{ "clientid":"mqtt.fx","username":"mqtt.fx","ipaddress":"127.0.0.1","clean_sess":true,"protocol":4,"connack":0,"ts":1540949660}
var obj = JObject.Parse(payload);
Console.WriteLine($"【Client Connected】 client_id:{obj.Value<string>("clientid")}, ipaddress:{obj.Value<string>("ipaddress")}");
}
else if (new Regex(@"\$SYS/brokers/.+?/disconnected").Match(message.Topic).Success)
{
//{"clientid":"mqtt.fx","username":"mqtt.fx","reason":"normal","ts":1540949658}
var obj = JObject.Parse(payload);
Console.WriteLine($"【Client Disconnected】 client_id:{obj.Value<string>("clientid")}");
}
else
{
Console.WriteLine(payload);
}
}
}
MQTT 官网: http://mqtt.org
MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
MQTT 协议英文版: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
MQTT 协议中文版: https://mcxiaoke.gitbooks.io/mqtt-cn/content/
1.开放消息协议,简单易实现
2.发布订阅模式,一对多消息发布
3.基于TCP/IP网络连接
4.1字节固定报头,2字节心跳报文,报文结构紧凑
5.消息QoS支持,可靠传输保证
1.物联网M2M通信,物联网大数据采集
2.Android消息推送,WEB消息推送
3.移动即时消息,例如Facebook Messenger
4.智能硬件、智能家具、智能电器
5.车联网通信,电动车站桩采集
6.智慧城市、远程医疗、远程教育
7.电力、石油与能源等行业市场
chat/room/1
sensor/10/temperature
sensor/+/temperature
$SYS/broker/metrics/packets/received
$SYS/broker/metrics/#
'+': 表示通配一个层级,例如a/+,匹配a/x, a/y
'#': 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d
mosquitto_sub -t a/b/+ -q 1
mosquitto_pub -t a/b/c -m hello -q 1
订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。
固定报头(Fixed header)
可变报头(Variable header)
报文有效载荷(Payload)
| Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
| ----- |----|----|----|----|----|----|----|----|
|byte1 | MQTT Packet type | Flags
|byte2… | Remaining Length
类型名称 | 类型值 | 报文说明 |
---|---|---|
CONNECT | 1 | 发起连接 |
CONNACK | 2 | 连接回执 |
PUBLISH | 3 | 发布消息 |
PUBACK | 4 | 发布回执 |
PUBREC | 5 | QoS2消息回执 |
PUBREL | 6 | QoS2消息释放 |
PUBCOMP | 7 | QoS2消息完成 |
SUBSCRIBE | 8 | 订阅主题 |
SUBACK | 9 | 订阅回执 |
UNSUBSCRIBE | 10 | 取消订阅 |
UNSUBACK | 11 | 取消订阅回执 |
PINGREQ | 12 | PING请求 |
PINGRESP | 13 | PING响应 |
DISCONNECT | 14 | 断开连接 |
PUBLISH报文承载客户端与服务器间双向的发布消息。 PUBACK报文用于接收端确认QoS1报文,PUBREC/PUBREL/PUBCOMP报文用于QoS2消息流程。
客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送PINGREQ心跳报文,服务端响应PINGRESP报文。PINGREQ/PINGRESP报文均2个字节。
MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。
MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
MQTT客户端向服务器发起CONNECT请求时,通过KeepAlive参数设置保活周期。
客户端在无报文发送时,按KeepAlive周期定时发送2字节的PINGREQ心跳报文,服务端收到PINGREQ报文后,回复2字节的PINGRESP报文。
服务端在1.5个心跳周期内,既没有收到客户端发布订阅报文,也没有收到PINGREQ心跳报文时,主动心跳超时断开客户端TCP连接。
MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。
MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。
MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。