Skip to content
All Posts
Robot Framework Python Testing IoT

Writing Custom Robot Framework Libraries in Python

October 2025 · 8 min read

Robot Framework's built-in keywords cover a lot. But when you're testing MQTT message flows, custom device protocols, or hardware-in-the-loop setups, you need your own libraries. Here's how I build them.

Why Custom Libraries?

Built-in and community libraries handle HTTP requests, browser automation, and SSH. But IoT testing has specific needs:

A custom Python library wraps all of this behind clean Robot Framework keywords that test engineers can use without knowing the implementation details.

Library Structure

Robot Framework discovers Python libraries by class name. The simplest structure:

tests/
├── libraries/
│   ├── MqttTestLibrary.py
│   ├── DeviceSimulator.py
│   └── PayloadValidator.py
├── resources/
│   └── common.resource
└── suites/
    ├── mqtt_integration.robot
    └── device_api.robot

Building an MQTT Test Library

Here's a real-world MQTT library I use for integration tests:

import paho.mqtt.client as mqtt
import json
import threading
import time


class MqttTestLibrary:
    """Robot Framework library for MQTT integration testing."""

    ROBOT_LIBRARY_SCOPE = "TEST SUITE"

    def __init__(self, broker="localhost", port=1883):
        self._broker = broker
        self._port = int(port)
        self._client = None
        self._messages = {}
        self._lock = threading.Lock()

    def connect_to_broker(self, client_id="rf-test"):
        """Connect to the MQTT broker."""
        self._client = mqtt.Client(client_id=client_id)
        self._client.on_message = self._on_message
        self._client.connect(self._broker, self._port)
        self._client.loop_start()

    def disconnect_from_broker(self):
        """Disconnect and stop the network loop."""
        if self._client:
            self._client.loop_stop()
            self._client.disconnect()

    def subscribe_to_topic(self, topic, qos=1):
        """Subscribe to an MQTT topic."""
        self._client.subscribe(topic, qos=int(qos))

    def publish_message(self, topic, payload, qos=1):
        """Publish a JSON payload to a topic."""
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        self._client.publish(topic, payload, qos=int(qos))

    def wait_for_message(self, topic, timeout=10):
        """Wait for a message on a topic. Returns the payload."""
        deadline = time.time() + float(timeout)
        while time.time() < deadline:
            with self._lock:
                if topic in self._messages:
                    return self._messages.pop(topic)
            time.sleep(0.1)
        raise AssertionError(
            f"No message received on '{topic}' within {timeout}s"
        )

    def message_payload_should_contain(self, payload, key, expected):
        """Assert a key in the JSON payload matches expected value."""
        data = json.loads(payload) if isinstance(payload, str) else payload
        actual = data.get(key)
        if str(actual) != str(expected):
            raise AssertionError(
                f"Expected '{key}' to be '{expected}', got '{actual}'"
            )

    def _on_message(self, client, userdata, msg):
        with self._lock:
            self._messages[msg.topic] = msg.payload.decode("utf-8")

Using It in a Test Suite

*** Settings ***
Library    ../libraries/MqttTestLibrary.py    broker=mqtt.staging.local    port=1883
Suite Setup    Connect To Broker    client_id=integration-test
Suite Teardown    Disconnect From Broker

*** Test Cases ***
Device Should Publish Telemetry After Command
    [Documentation]    Send a command and verify the device responds with telemetry.
    Subscribe To Topic    devices/sensor-001/telemetry
    Publish Message    devices/sensor-001/commands    {"action": "read_sensors"}
    ${msg}=    Wait For Message    devices/sensor-001/telemetry    timeout=15
    Message Payload Should Contain    ${msg}    temperature    key_exists=true
    Message Payload Should Contain    ${msg}    device_id    sensor-001

Broker Should Retain Last Will Message
    [Documentation]    Verify the broker publishes LWT when a client disconnects unexpectedly.
    Subscribe To Topic    devices/sensor-001/status
    # Simulate ungraceful disconnect in another test fixture
    ${msg}=    Wait For Message    devices/sensor-001/status    timeout=20
    Message Payload Should Contain    ${msg}    status    offline

Library Scope Matters

The ROBOT_LIBRARY_SCOPE class attribute controls instance lifecycle:

For MQTT testing, TEST SUITE scope is ideal: one connection per suite, clean state between suites.

Adding a Payload Validator

import json
import jsonschema


class PayloadValidator:
    """Validate MQTT/API payloads against JSON schemas."""

    ROBOT_LIBRARY_SCOPE = "GLOBAL"

    def __init__(self):
        self._schemas = {}

    def load_schema(self, name, schema_path):
        """Load a JSON schema from file."""
        with open(schema_path) as f:
            self._schemas[name] = json.load(f)

    def payload_should_match_schema(self, payload, schema_name):
        """Validate payload against a named schema."""
        data = json.loads(payload) if isinstance(payload, str) else payload
        schema = self._schemas.get(schema_name)
        if not schema:
            raise AssertionError(f"Schema '{schema_name}' not loaded")
        try:
            jsonschema.validate(data, schema)
        except jsonschema.ValidationError as e:
            raise AssertionError(
                f"Payload does not match schema '{schema_name}': {e.message}"
            )

A Few Things I Wish I Knew Earlier