MQTT ブローカー Mosquitto を立ち上げて paho-mqtt でメッセージを送り QoS ごとのパケットの内容を Wireshark で見る

iotpython

MQTT は IoT 向けの軽量な Pub/Sub メッセージングプロトコル。

OSS の MQTT ブローカー Mosquitto を立ち上げる。

$ mkdir config data log
$ vi config/mosquitto.conf
# port
listener 1883

allow_anonymous true

log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
log_type all

persistence true
persistence_location /mosquitto/data/

$ docker run --rm \
  --name mosquitto \
  -p 0.0.0.0:1883:1883 \
  -v ./config:/mosquitto/config \
  -v ./data:/mosquitto/data \
  -v ./log:/mosquitto/log \
  eclipse-mosquitto
...
1764475792: mosquitto version 2.0.22 running

Wireshark をインストールする。

# Ubuntu (WSL2)
$ sudo apt install wireshark
Should non-superusers be able to capture packets? → No

$ sudo wireshark

様々な言語 のクライアントがある paho-mqtt で Topic を Subscribe して Publish する。

import paho.mqtt.client as mqtt # pip install paho-mqtt
import time

sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client1")
sub_client.on_connect = lambda c, u, f, rc, props: sub_client.subscribe("test/topic")
sub_client.on_message = lambda c, u, msg: print(f"Received: {msg.payload.decode()}")
sub_client.connect("localhost", 1883)
sub_client.loop_start()

pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client2")
pub_client.connect("localhost", 1883)
pub_client.loop_start()

counter = 0
try:
    while True:
        message = f"Message #{counter}"
        pub_client.publish("test/topic", message)
        print(f"Published: {message}")
        counter += 1
        time.sleep(10)
except KeyboardInterrupt:
    print("\nStopped")

Capture > Options から Loopback: lo を選び MQTT のパケットをキャプチャする。 先頭 1 バイトで Publish/Subscribe といったメッセージタイプや At most/least once といった QoS レベルを表現している。 その後メッセージ長、Topic名長、Topic名の後にメッセージが続く。 Subscribe したクライアントへの通信はサーバーからクライアントへの Publish メッセージとして行われている。

なお TCP で直接トランスポートするほか WebSocket を用いる仕様もあって AWS IoT でもサポートされている。

WebSocketでの通信内容をWiresharkで見る - sambaiz-net

Publish と Subscribe でそれぞれ QoS レベルを指定できるが、最小のものに合わせられる。

pub_client.publish("test/topic", message, qos=0) # At most once delivery
client.subscribe("test/topic", qos=1) # At least once delivery

Publish/Subscribe している最中に Mosquito を一時的に落としてみると、QoS=0 ではデータが失われる。

# QoS=0
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #10 # Unblocked
Published: Message #11
Received: Message #11

QoS=1 では Publish Ack が返ってこない場合再送することで、落ちている間のデータも復旧次第 Subscribe できるようになっている。

# QoS=1
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #5 # Unblocked
Received: Message #6
Received: Message #7
Received: Message #8
Received: Message #9
Received: Message #10
Published: Message #11
Received: Message #11

QoS=2 では Publish Received が返ってくると Publish Release を送って再送用のメッセージを破棄し、送信先からの Publish Complete をもって処理を完了する。