MQTT は IoT 向けの軽量な Pub/Sub メッセージングプロトコル。
OSS の MQTT ブローカー Mosquitto を立ち上げる。
$ mkdir config data log
$ vi config/mosquitto.conf
# port
listener 1883
allow_anonymous true
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
log_type all
persistence true
persistence_location /mosquitto/data/
$ docker run --rm \
--name mosquitto \
-p 0.0.0.0:1883:1883 \
-v ./config:/mosquitto/config \
-v ./data:/mosquitto/data \
-v ./log:/mosquitto/log \
eclipse-mosquitto
...
1764475792: mosquitto version 2.0.22 running
Wireshark をインストールする。
# Ubuntu (WSL2)
$ sudo apt install wireshark
Should non-superusers be able to capture packets? → No
$ sudo wireshark
様々な言語 のクライアントがある paho-mqtt で Topic を Subscribe して Publish する。
import paho.mqtt.client as mqtt # pip install paho-mqtt
import time
sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client1")
sub_client.on_connect = lambda c, u, f, rc, props: sub_client.subscribe("test/topic")
sub_client.on_message = lambda c, u, msg: print(f"Received: {msg.payload.decode()}")
sub_client.connect("localhost", 1883)
sub_client.loop_start()
pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client2")
pub_client.connect("localhost", 1883)
pub_client.loop_start()
counter = 0
try:
while True:
message = f"Message #{counter}"
pub_client.publish("test/topic", message)
print(f"Published: {message}")
counter += 1
time.sleep(10)
except KeyboardInterrupt:
print("\nStopped")
Capture > Options から Loopback: lo を選び MQTT のパケットをキャプチャする。 先頭 1 バイトで Publish/Subscribe といったメッセージタイプや At most/least once といった QoS レベルを表現している。 その後メッセージ長、Topic名長、Topic名の後にメッセージが続く。 Subscribe したクライアントへの通信はサーバーからクライアントへの Publish メッセージとして行われている。
なお TCP で直接トランスポートするほか WebSocket を用いる仕様もあって AWS IoT でもサポートされている。
WebSocketでの通信内容をWiresharkで見る - sambaiz-net
Publish と Subscribe でそれぞれ QoS レベルを指定できるが、最小のものに合わせられる。
pub_client.publish("test/topic", message, qos=0) # At most once delivery
client.subscribe("test/topic", qos=1) # At least once delivery
Publish/Subscribe している最中に Mosquito を一時的に落としてみると、QoS=0 ではデータが失われる。
# QoS=0
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #10 # Unblocked
Published: Message #11
Received: Message #11
QoS=1 では Publish Ack が返ってこない場合再送することで、落ちている間のデータも復旧次第 Subscribe できるようになっている。
# QoS=1
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #5 # Unblocked
Received: Message #6
Received: Message #7
Received: Message #8
Received: Message #9
Received: Message #10
Published: Message #11
Received: Message #11
QoS=2 では Publish Received が返ってくると Publish Release を送って再送用のメッセージを破棄し、送信先からの Publish Complete をもって処理を完了する。