Writing Custom Robot Framework Libraries in Python
October 2025 · 8 min readRobot Framework's built-in keywords cover a lot. But when you're testing MQTT message flows, custom device protocols, or hardware-in-the-loop setups, you need your own libraries. Here's how I build them.
Why Custom Libraries?
Built-in and community libraries handle HTTP requests, browser automation, and SSH. But IoT testing has specific needs:
- Publishing and subscribing to MQTT topics with QoS guarantees
- Validating JSON payloads against device schemas
- Waiting for asynchronous events with configurable timeouts
- Interacting with hardware simulators over serial or TCP
A custom Python library wraps all of this behind clean Robot Framework keywords that test engineers can use without knowing the implementation details.
Library Structure
Robot Framework discovers Python libraries by class name. The simplest structure:
tests/
├── libraries/
│ ├── MqttTestLibrary.py
│ ├── DeviceSimulator.py
│ └── PayloadValidator.py
├── resources/
│ └── common.resource
└── suites/
├── mqtt_integration.robot
└── device_api.robot
Building an MQTT Test Library
Here's a real-world MQTT library I use for integration tests:
import paho.mqtt.client as mqtt
import json
import threading
import time
class MqttTestLibrary:
"""Robot Framework library for MQTT integration testing."""
ROBOT_LIBRARY_SCOPE = "TEST SUITE"
def __init__(self, broker="localhost", port=1883):
self._broker = broker
self._port = int(port)
self._client = None
self._messages = {}
self._lock = threading.Lock()
def connect_to_broker(self, client_id="rf-test"):
"""Connect to the MQTT broker."""
self._client = mqtt.Client(client_id=client_id)
self._client.on_message = self._on_message
self._client.connect(self._broker, self._port)
self._client.loop_start()
def disconnect_from_broker(self):
"""Disconnect and stop the network loop."""
if self._client:
self._client.loop_stop()
self._client.disconnect()
def subscribe_to_topic(self, topic, qos=1):
"""Subscribe to an MQTT topic."""
self._client.subscribe(topic, qos=int(qos))
def publish_message(self, topic, payload, qos=1):
"""Publish a JSON payload to a topic."""
if isinstance(payload, dict):
payload = json.dumps(payload)
self._client.publish(topic, payload, qos=int(qos))
def wait_for_message(self, topic, timeout=10):
"""Wait for a message on a topic. Returns the payload."""
deadline = time.time() + float(timeout)
while time.time() < deadline:
with self._lock:
if topic in self._messages:
return self._messages.pop(topic)
time.sleep(0.1)
raise AssertionError(
f"No message received on '{topic}' within {timeout}s"
)
def message_payload_should_contain(self, payload, key, expected):
"""Assert a key in the JSON payload matches expected value."""
data = json.loads(payload) if isinstance(payload, str) else payload
actual = data.get(key)
if str(actual) != str(expected):
raise AssertionError(
f"Expected '{key}' to be '{expected}', got '{actual}'"
)
def _on_message(self, client, userdata, msg):
with self._lock:
self._messages[msg.topic] = msg.payload.decode("utf-8")
Using It in a Test Suite
*** Settings ***
Library ../libraries/MqttTestLibrary.py broker=mqtt.staging.local port=1883
Suite Setup Connect To Broker client_id=integration-test
Suite Teardown Disconnect From Broker
*** Test Cases ***
Device Should Publish Telemetry After Command
[Documentation] Send a command and verify the device responds with telemetry.
Subscribe To Topic devices/sensor-001/telemetry
Publish Message devices/sensor-001/commands {"action": "read_sensors"}
${msg}= Wait For Message devices/sensor-001/telemetry timeout=15
Message Payload Should Contain ${msg} temperature key_exists=true
Message Payload Should Contain ${msg} device_id sensor-001
Broker Should Retain Last Will Message
[Documentation] Verify the broker publishes LWT when a client disconnects unexpectedly.
Subscribe To Topic devices/sensor-001/status
# Simulate ungraceful disconnect in another test fixture
${msg}= Wait For Message devices/sensor-001/status timeout=20
Message Payload Should Contain ${msg} status offline
Library Scope Matters
The ROBOT_LIBRARY_SCOPE class attribute controls instance lifecycle:
- TEST CASE — new instance per test. Completely isolated but expensive for connection-heavy libraries.
- TEST SUITE — one instance per suite file. Good balance for database or MQTT connections that are expensive to create.
- GLOBAL — single instance for the entire run. Use sparingly — shared state between suites causes flaky tests.
For MQTT testing, TEST SUITE scope is ideal: one connection per suite, clean state between suites.
Adding a Payload Validator
import json
import jsonschema
class PayloadValidator:
"""Validate MQTT/API payloads against JSON schemas."""
ROBOT_LIBRARY_SCOPE = "GLOBAL"
def __init__(self):
self._schemas = {}
def load_schema(self, name, schema_path):
"""Load a JSON schema from file."""
with open(schema_path) as f:
self._schemas[name] = json.load(f)
def payload_should_match_schema(self, payload, schema_name):
"""Validate payload against a named schema."""
data = json.loads(payload) if isinstance(payload, str) else payload
schema = self._schemas.get(schema_name)
if not schema:
raise AssertionError(f"Schema '{schema_name}' not loaded")
try:
jsonschema.validate(data, schema)
except jsonschema.ValidationError as e:
raise AssertionError(
f"Payload does not match schema '{schema_name}': {e.message}"
)
A Few Things I Wish I Knew Earlier
- Name methods with underscores. Robot Framework converts
wait_for_messagetoWait For Messageautomatically. Keep Python naming conventions. - Use threading locks for async callbacks. MQTT's
on_messageruns on a background thread. Without locks, you get race conditions in message retrieval. - Raise
AssertionErrorfor failures. Robot Framework uses this specific exception type to mark tests as failed. Any other exception type marks the test as errored instead. - Document keyword arguments. Robot Framework generates keyword documentation from docstrings. Well-documented libraries save your team hours of guesswork.