reading-registers

practical

Lesson 3 โ€” Reading Registers with pymodbus

Setup

pymodbus is the standard Python library for Modbus. It supports Modbus TCP, RTU, and ASCII as both client and server, and it is what you will use for all scripted interaction with Modbus devices.

pip install pymodbus

Current stable version at time of writing: pymodbus 3.x. The 3.x API differs significantly from 2.x โ€” specifically in the client import path. This lesson uses 3.x syntax throughout.

Verify the installation:

python3 -c "import pymodbus; print(pymodbus.__version__)"

For these examples, run the challenge container (see course challenges) or install a local Modbus simulator:

docker run -d -p 502:502 oitc/modbus-server

Connecting to a Modbus TCP Server

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=502)
connected = client.connect()

if not connected:
    print("Connection failed")
    exit(1)

print("Connected")
client.close()

ModbusTcpClient accepts keyword arguments for timeout and retry behavior:

client = ModbusTcpClient(
    "127.0.0.1",
    port=502,
    timeout=3,          # seconds to wait for response
    retries=3,          # retry count on timeout
    reconnect_delay=1   # seconds between reconnect attempts
)

Always call client.close() when done, or use the client as a context manager:

with ModbusTcpClient("127.0.0.1", port=502) as client:
    # all operations here
    pass

Reading Holding Registers (FC03)

Holding registers are the most useful target: they contain setpoints, configuration values, counters, and control parameters.

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=502)
client.connect()

# Read 10 holding registers starting at address 0, unit ID 1
result = client.read_holding_registers(address=0, count=10, slave=1)

if result.isError():
    print(f"Error: {result}")
else:
    print(result.registers)
    # Output: [0, 1000, 2500, 0, 0, 0, 0, 0, 0, 0]

client.close()

Parameters: - address: PDU address (zero-based). Address 0 = register 40001 in HMI notation. - count: Number of registers to read. Maximum 125 per request per the Modbus specification. - slave: Unit ID (1โ€“247). Use the correct unit ID for the target device.

Checking for errors: Always call result.isError() before accessing result.registers. If the device returns an exception (wrong address, wrong unit ID, wrong count), pymodbus returns a ModbusException object and accessing .registers will throw an AttributeError.


Reading Coils (FC01)

Coils are 1-bit values representing digital output states: relay energized/de-energized, valve open/closed, motor running/stopped.

result = client.read_coils(address=0, count=16, slave=1)

if not result.isError():
    print(result.bits)
    # Output: [True, False, False, True, False, False, False, False, ...]

result.bits is a list of booleans. pymodbus pads the list to the next multiple of 8 โ€” if you request 10 coils, you get 16 booleans. Only the first 10 are meaningful.


Reading Discrete Inputs (FC02)

Discrete inputs are 1-bit read-only values: sensor state, limit switch position, pushbutton pressed/released.

result = client.read_discrete_inputs(address=0, count=8, slave=1)

if not result.isError():
    print(result.bits[:8])

Reading Input Registers (FC04)

Input registers are 16-bit read-only values from analog sensors: temperature in tenths of degrees, pressure in mbar, flow rate in liters/hour.

result = client.read_input_registers(address=0, count=10, slave=1)

if not result.isError():
    print(result.registers)

Scanning All Unit IDs

A device may respond on multiple unit IDs, or a Modbus gateway may multiplex multiple physical devices (each on a different unit ID) behind a single IP. Scanning all valid unit IDs is standard reconnaissance.

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=502, timeout=1)
client.connect()

responding_units = []

for uid in range(1, 248):
    result = client.read_holding_registers(address=0, count=1, slave=uid)
    if not result.isError():
        print(f"Unit {uid} responds: {result.registers}")
        responding_units.append(uid)

print(f"\nResponding unit IDs: {responding_units}")
client.close()

Set timeout=1 to keep the scan fast โ€” non-responding unit IDs will each take up to 1 second. Scanning all 247 IDs with a 1-second timeout takes under 5 minutes in the worst case.


Scanning the Full Register Space

Most devices do not expose all 65,536 possible register addresses. Scanning for valid ranges is done by reading blocks and checking for error responses.

from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=502)
client.connect()

valid_ranges = []
block_size = 100

for addr in range(0, 65536, block_size):
    result = client.read_holding_registers(address=addr, count=block_size, slave=1)
    if not result.isError():
        print(f"Valid range: {addr}โ€“{addr + block_size - 1}: {result.registers}")
        valid_ranges.append((addr, result.registers))
    # Exception code 0x02 = illegal data address โ€” range not mapped
    # Exception code 0x03 = illegal data value โ€” count too large for this range, try smaller

client.close()

Handle boundary cases: If a block spans a valid/invalid boundary, the device returns an exception for the entire block. Reduce block size or scan individual addresses at boundaries.

Performance consideration: Reading 65,536 registers in blocks of 100 takes ~656 requests. At 100ms round-trip, that is 66 seconds. At 10ms, under 7 seconds. Do not assume the device can handle high-rate requests โ€” some PLCs have limited TCP stack buffers and will drop connections or behave unexpectedly under load.


Interpreting Register Values

Raw 16-bit register values need context to be meaningful. Common encodings:

Unsigned 16-bit integer (most common):

value = result.registers[0]  # 0โ€“65535

Signed 16-bit integer (temperatures below zero, position offsets):

import struct
raw = result.registers[0]
value = struct.unpack('>h', struct.pack('>H', raw))[0]  # -32768โ€“32767

Scaled integer (common for analog values โ€” device applies a scaling factor): - Device datasheet says: "Temperature = register value / 10, unit = ยฐC" - Register value 235 โ†’ 23.5ยฐC - Register value 65523 (signed: -13) โ†’ -1.3ยฐC

32-bit float from two consecutive registers (IEEE 754, most common in modern PLCs):

import struct

raw = result.registers  # need at least 2 registers
# Big-endian word order (most common):
value = struct.unpack('>f', struct.pack('>HH', raw[0], raw[1]))[0]
# Little-endian word order (some Schneider and Wago devices):
value = struct.unpack('<f', struct.pack('<HH', raw[0], raw[1]))[0]
# Mixed (little-endian words, big-endian bytes within word โ€” Modicon legacy):
value = struct.unpack('>f', struct.pack('>HH', raw[1], raw[0]))[0]

If the float value is nonsensical (NaN, Inf, or wildly out of range), try the other byte orders.

32-bit unsigned integer from two registers:

value = (result.registers[0] << 16) | result.registers[1]  # big-endian word order
value = (result.registers[1] << 16) | result.registers[0]  # little-endian word order

ASCII string from registers (each register holds 2 ASCII characters):

raw = result.registers
chars = []
for reg in raw:
    chars.append(chr(reg >> 8))    # high byte
    chars.append(chr(reg & 0xFF))  # low byte
text = ''.join(chars).rstrip('\x00')

Monitoring Changing Values

In a real assessment, reading once is not enough. You want to see which registers change over time (live sensor data), which are static (configuration), and whether your writes had any effect.

import time
from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=502)
client.connect()

prev = None
while True:
    result = client.read_holding_registers(address=0, count=20, slave=1)
    if not result.isError():
        current = result.registers
        if prev is not None:
            changes = [(i, prev[i], current[i]) for i in range(len(current)) if prev[i] != current[i]]
            for addr, old, new in changes:
                print(f"Register {addr} changed: {old} -> {new}")
        prev = current
    time.sleep(1)

Registers that change on every poll at ยฑ1โ€“2% are sensor readings. Registers that never change are likely configuration. Registers that change in large discrete steps are setpoints being adjusted by an operator or SCADA system.


Complete Recon Script

Putting it together: a single script that connects, scans unit IDs, dumps all valid register ranges, and reports:

#!/usr/bin/env python3
"""Modbus TCP reconnaissance script โ€” for authorized testing only."""

import struct
import time
from pymodbus.client import ModbusTcpClient


def scan_unit_ids(client, timeout=1):
    """Return list of responding unit IDs."""
    responding = []
    for uid in range(1, 248):
        result = client.read_holding_registers(address=0, count=1, slave=uid)
        if not result.isError():
            responding.append(uid)
    return responding


def dump_registers(client, uid, block_size=100):
    """Return dict of {address: [values]} for all valid holding register ranges."""
    found = {}
    for addr in range(0, 10000, block_size):  # first 10k addresses is usually enough
        result = client.read_holding_registers(address=addr, count=block_size, slave=uid)
        if not result.isError():
            found[addr] = result.registers
    return found


def try_decode_float(r0, r1):
    """Attempt IEEE 754 float decode from two registers, return best guess."""
    candidates = []
    for fmt, args in [
        ('>f', ('>HH', r0, r1)),
        ('>f', ('>HH', r1, r0)),
    ]:
        try:
            val = struct.unpack(fmt, struct.pack(*args))[0]
            if not (val != val) and abs(val) < 1e6:  # not NaN, not huge
                candidates.append(val)
        except Exception:
            pass
    return candidates[0] if candidates else None


def main():
    target = "127.0.0.1"
    port = 502

    print(f"[*] Connecting to {target}:{port}")
    client = ModbusTcpClient(target, port=port, timeout=2)
    if not client.connect():
        print("[-] Connection failed")
        return

    print("[*] Scanning unit IDs 1-247...")
    units = scan_unit_ids(client)
    print(f"[+] Responding unit IDs: {units}")

    for uid in units:
        print(f"\n[*] Dumping registers for unit {uid}...")
        regs = dump_registers(client, uid)
        for addr, values in regs.items():
            print(f"  Addr {addr:05d}: {values}")
            # Attempt float decode on consecutive pairs
            for i in range(0, len(values) - 1, 2):
                f = try_decode_float(values[i], values[i+1])
                if f is not None:
                    print(f"    -> Possible float at {addr+i}: {f:.4f}")

    client.close()
    print("\n[*] Done")


if __name__ == "__main__":
    main()

Key Takeaways

  • pip install pymodbus installs the 3.x client; import from pymodbus.client
  • Always check result.isError() before accessing .registers or .bits
  • FC03 reads holding registers (RW, most interesting), FC01 reads coils (digital outputs)
  • Scan unit IDs 1โ€“247 to find all devices behind a gateway
  • Scan register space in blocks of 100; exception code 0x02 means unmapped address
  • 32-bit floats use two consecutive registers; byte order varies by vendor โ€” try all four combinations if the value looks wrong
  • Monitor registers over time to distinguish live sensor data from static configuration