The MQTT broker URL used as default in the examples has ceased operation. All examples and documention have been updated to point to the new domain mqtt.eclipse.org. This also required an update of the python example test scripts to use TLS 1.2
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import re
import os
import sys
import ssl
import paho.mqtt.client as mqtt
from threading import Thread, Event
import time
import string
import random
import IDF
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
event_client_connected = Event()
event_stop_client = Event()
event_client_received_correct = Event()
message_log = ""
broker_host = {}
broker_port = {}
expected_data = ""
subscribe_topic = ""
publish_topic = ""
expected_count = 0
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
def mqtt_client_task(client):
while not event_stop_client.is_set():
def get_host_port_from_dut(dut1, config_option):
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()[config_option])
return value.group(1), int(value.group(2))
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
global message_log
global expected_count
payload = msg.payload.decode()
if payload == expected_data:
expected_count += 1
print("[{}] Received...".format(msg.mid))
message_log += "Received data:" + msg.topic + " " + payload + "\n"
def test_single_config(dut, transport, qos, repeat, published):
global expected_count
global expected_data
global message_log
sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(16))
expected_count = 0
message_log = ""
expected_data = sample_string * repeat
print("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, sample msg:'{}'".format(transport, qos, published, expected_data))
client = None
if transport in ["ws", "wss"]:
client = mqtt.Client(transport="websockets")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
if transport in ["ssl", "wss"]:
client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.connect(broker_host[transport], broker_port[transport], 60)
except Exception:
print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_host[transport], sys.exc_info()[0]))
# Starting a py-client in a separate thread
thread1 = Thread(target=mqtt_client_task, args=(client,))
print("Connecting py-client to broker {}:{}...".format(broker_host[transport], broker_port[transport]))
if not event_client_connected.wait(timeout=30):
raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host[transport]))
client.subscribe(subscribe_topic, qos)
dut.write("{} {} {} {} {}".format(transport, sample_string, repeat, published, qos), eol="\n")
# waiting till subscribed to defined topic
dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
for i in range(published):
client.publish(publish_topic, sample_string * repeat, qos)
print("Checking esp-client received msg published from py-client...")
dut.expect(re.compile(r"Correct pattern received exactly x times"), timeout=60)
start = time.time()
while expected_count < published and time.time() - start <= 60:
# Note: tolerate that messages qos=1 to be received more than once
if expected_count == published or (expected_count > published and qos == 1):
print("All data received from ESP32...")
raise ValueError("Not all data received from ESP32: Expected:{}x{}, Received:{}x{}".format(expected_data, published, message_log, expected_count))
def test_weekend_mqtt_publish(env, extra_data):
# Using broker url dictionary for different transport
global broker_host
global broker_port
global publish_topic
global subscribe_topic
steps: |
1. join AP and connects to ssl broker
2. Test connects a client to the same broker
3. Test evaluates python client received correct qos0 message
4. Test ESP32 client received correct qos0 message
dut1 = env.get_dut("mqtt_publish", "examples/protocols/mqtt/publish_test")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_publish.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_publish_bin_size", "{}KB"
.format(bin_size // 1024))
IDF.check_performance("mqtt_publish_size", bin_size // 1024)
# Look for host:port in sdkconfig
# python client subscribes to the topic to which esp client publishes and vice versa
publish_topic = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_SUBSCIBE_TOPIC"].replace('"','')
subscribe_topic = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_PUBLISH_TOPIC"].replace('"','')
broker_host["ssl"], broker_port["ssl"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_SSL_URI")
broker_host["tcp"], broker_port["tcp"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_TCP_URI")
broker_host["ws"], broker_port["ws"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WS_URI")
broker_host["wss"], broker_port["wss"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WSS_URI")
except Exception:
print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
# simple test with empty message
test_single_config(dut1, transport, qos, 0, 5)
# decide on broker what level of test will pass (local broker works the best)
if broker_host[transport].startswith("192.168") and qos < 1:
# medium size, medium repeated
test_single_config(dut1, transport, qos, 5, 50)
# long data
test_single_config(dut1, transport, qos, 1000, 10)
# short data, many repeats
test_single_config(dut1, transport, qos, 2, 200)
elif transport in ["ws", "wss"]:
# more relaxed criteria for websockets!
test_single_config(dut1, transport, qos, 2, 5)
test_single_config(dut1, transport, qos, 50, 1)
test_single_config(dut1, transport, qos, 10, 20)
# common configuration should be good for most public mosquittos
test_single_config(dut1, transport, qos, 5, 10)
test_single_config(dut1, transport, qos, 500, 3)
test_single_config(dut1, transport, qos, 1, 50)
if __name__ == '__main__':