Running MQTT Broker Mosquitto, Sending Messages with paho-mqtt, and Inspecting Packets with Wireshark
iotpythonMQTT is a lightweight Pub/Sub messaging protocol designed for IoT applications.
Let’s set up the OSS MQTT broker Mosquitto.
$ mkdir config data log
$ vi config/mosquitto.conf
# port
listener 1883
allow_anonymous true
log_dest file /mosquitto/log/mosquitto.log
log_dest stdout
log_type all
persistence true
persistence_location /mosquitto/data/
$ docker run --rm \
--name mosquitto \
-p 0.0.0.0:1883:1883 \
-v ./config:/mosquitto/config \
-v ./data:/mosquitto/data \
-v ./log:/mosquitto/log \
eclipse-mosquitto
...
1764475792: mosquitto version 2.0.22 running
Install Wireshark.
# Ubuntu (WSL2)
$ sudo apt install wireshark
Should non-superusers be able to capture packets? → No
$ sudo wireshark
Subscribe to a topic and publish messages using paho-mqtt, which has clients for various languages.
import paho.mqtt.client as mqtt # pip install paho-mqtt
import time
sub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client1")
sub_client.on_connect = lambda c, u, f, rc, props: sub_client.subscribe("test/topic")
sub_client.on_message = lambda c, u, msg: print(f"Received: {msg.payload.decode()}")
sub_client.connect("localhost", 1883)
sub_client.loop_start()
pub_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "client2")
pub_client.connect("localhost", 1883)
pub_client.loop_start()
counter = 0
try:
while True:
message = f"Message #{counter}"
pub_client.publish("test/topic", message)
print(f"Published: {message}")
counter += 1
time.sleep(10)
except KeyboardInterrupt:
print("\nStopped")
From Capture > Options, select Loopback: lo to capture MQTT packets. The first byte represents the message type (Publish/Subscribe) and QoS level (At most/least once). This is followed by the message length, topic name length, topic name, and then the message payload. Messages to subscribed clients are transmitted as Publish messages from the server to the client.
In addition to direct TCP transport, there is also a WebSocket-based specification, which is also supported by AWS IoT.
Viewing WebSocket Communication with Wireshark - sambaiz-net
You can specify QoS levels for both Publish and Subscribe, but they are aligned to the minimum level.
pub_client.publish("test/topic", message, qos=0) # At most once delivery
client.subscribe("test/topic", qos=1) # At least once delivery
If you temporarily stop Mosquitto while publishing/subscribing, with QoS=0, data will be lost.
# QoS=0
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #10 # Unblocked
Published: Message #11
Received: Message #11
With QoS=1, if a Publish Ack is not returned, the message is resent, ensuring that data sent during downtime can be subscribed to once the server recovers.
# QoS=1
Published: Message #4
Received: Message #4
Published: Message #5 # Blocked
Published: Message #6
Published: Message #7
Published: Message #8
Published: Message #9
Published: Message #10
Received: Message #5 # Unblocked
Received: Message #6
Received: Message #7
Received: Message #8
Received: Message #9
Received: Message #10
Published: Message #11
Received: Message #11
With QoS=2, when a Publish Received is returned, a Publish Release is sent to discard the message for resending, and the process completes with a Publish Complete from the recipient.