Merge branch 'feature/qemu_mqtt_test' into 'master'

ci: QEMU tests in tiny-test-fw

See merge request espressif/esp-idf!6959
This commit is contained in:
Ivan Grokhotkov 2019-12-18 07:10:30 +08:00
commit de43b8406b
9 changed files with 152 additions and 61 deletions

View File

@ -1,27 +1,13 @@
import re
import os
import sys
import socket
from threading import Thread, Event
import subprocess
import time
from shutil import copyfile
try:
import IDF
from IDF.IDFDUT import ESP32DUT
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
import Utility
from tiny_test_fw import Utility, DUT
import ttfw_idf
stop_sock_listener = Event()
stop_io_listener = Event()
@ -73,7 +59,7 @@ def sock_listener(dut1):
sock = None
@IDF.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
def lwip_test_suite(env, extra_data):
global stop_io_listener
global stop_sock_listener
@ -84,12 +70,12 @@ def lwip_test_suite(env, extra_data):
3. Execute ttcn3 test suite
4. Collect result from ttcn3
"""
dut1 = env.get_dut("net_suite", "examples/system/network_tests", dut_class=ESP32DUT)
dut1 = env.get_dut("net_suite", "examples/system/network_tests", dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "net_suite.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("net_suite", "{}KB".format(bin_size // 1024))
IDF.check_performance("net_suite", bin_size // 1024)
ttfw_idf.log_performance("net_suite", "{}KB".format(bin_size // 1024))
ttfw_idf.check_performance("net_suite", bin_size // 1024)
dut1.start_app()
thread1 = Thread(target=sock_listener, args=(dut1, ))
thread2 = Thread(target=io_listener, args=(dut1, ))

View File

@ -11,20 +11,8 @@ import time
import string
import random
try:
import IDF
from IDF.IDFDUT import ESP32DUT
except ImportError:
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw,
# we need to set environment variable `TEST_FW_PATH`,
# then get and insert `TEST_FW_PATH` to sys path before import FW module
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import IDF
import DUT
from tiny_test_fw import DUT
import ttfw_idf
event_client_connected = Event()
@ -53,6 +41,8 @@ def mqtt_client_task(client):
def get_host_port_from_dut(dut1, config_option):
value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()[config_option])
if value is None:
return None, None
return value.group(1), int(value.group(2))
@ -124,7 +114,7 @@ def test_single_config(dut, transport, qos, repeat, published):
event_stop_client.clear()
@IDF.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
def test_weekend_mqtt_publish(env, extra_data):
# Using broker url dictionary for different transport
global broker_host
@ -138,13 +128,12 @@ def test_weekend_mqtt_publish(env, extra_data):
3. Test evaluates python client received correct qos0 message
4. Test ESP32 client received correct qos0 message
"""
dut1 = env.get_dut("mqtt_publish", "examples/protocols/mqtt/publish_test", dut_class=ESP32DUT)
dut1 = env.get_dut("mqtt_publish", "examples/protocols/mqtt/publish_test")
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, "mqtt_publish.bin")
bin_size = os.path.getsize(binary_file)
IDF.log_performance("mqtt_publish_bin_size", "{}KB"
.format(bin_size // 1024))
IDF.check_performance("mqtt_publish_size", bin_size // 1024)
ttfw_idf.log_performance("mqtt_publish_bin_size", "{}KB".format(bin_size // 1024))
ttfw_idf.check_performance("mqtt_publish_size", bin_size // 1024)
# Look for host:port in sdkconfig
try:
# python client subscribes to the topic to which esp client publishes and vice versa
@ -159,13 +148,16 @@ def test_weekend_mqtt_publish(env, extra_data):
raise
dut1.start_app()
try:
ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
ip_address = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
print("Connected to AP with IP: {}".format(ip_address))
except DUT.ExpectTimeout:
print('ENV_TEST_FAILURE: Cannot connect to AP')
raise
for qos in [0, 1, 2]:
for transport in ["tcp", "ssl", "ws", "wss"]:
if broker_host[transport] is None:
print('Skipping transport: {}...'.format(transport))
continue
# simple test with empty message
test_single_config(dut1, transport, qos, 0, 5)
# decide on broker what level of test will pass (local broker works the best)
@ -189,4 +181,4 @@ def test_weekend_mqtt_publish(env, extra_data):
if __name__ == '__main__':
test_weekend_mqtt_publish()
test_weekend_mqtt_publish(dut=ttfw_idf.ESP32QEMUDUT if sys.argv[1:] == ['qemu'] else ttfw_idf.ESP32DUT)

View File

@ -0,0 +1,7 @@
CaseConfig:
- name: test_weekend_mqtt_publish
overwrite:
dut:
class: ESP32QEMUDUT
package: ttfw_idf

View File

@ -0,0 +1,19 @@
CONFIG_IDF_TARGET_ESP32=y
CONFIG_EXAMPLE_USE_OPENETH=y
CONFIG_ETH_USE_OPENETH=y
CONFIG_ETH_OPENETH_DMA_RX_BUFFER_NUM=4
CONFIG_ETH_OPENETH_DMA_TX_BUFFER_NUM=1
CONFIG_EXAMPLE_CONNECT_ETHERNET=y
CONFIG_LOG_DEFAULT_LEVEL_DEBUG=y
CONFIG_ESPTOOLPY_FLASHMODE_DOUT=y
CONFIG_MBEDTLS_ASYMMETRIC_CONTENT_LEN=y
CONFIG_MBEDTLS_SSL_IN_CONTENT_LEN=16384
CONFIG_MBEDTLS_SSL_OUT_CONTENT_LEN=16384
CONFIG_EXAMPLE_BROKER_SSL_URI="mqtts://${EXAMPLE_MQTT_BROKER_SSL}"
CONFIG_EXAMPLE_BROKER_TCP_URI="mqtt://${EXAMPLE_MQTT_BROKER_TCP}"
CONFIG_EXAMPLE_BROKER_WS_URI="ws://${EXAMPLE_MQTT_BROKER_WS}/ws"
CONFIG_EXAMPLE_BROKER_WSS_URI="wss://${EXAMPLE_MQTT_BROKER_WSS}/ws"
CONFIG_EXAMPLE_BROKER_CERTIFICATE_OVERRIDE="${EXAMPLE_MQTT_BROKER_CERTIFICATE}"
CONFIG_MBEDTLS_HARDWARE_AES=n
CONFIG_MBEDTLS_HARDWARE_MPI=n
CONFIG_MBEDTLS_HARDWARE_SHA=n

View File

@ -766,7 +766,7 @@ class SerialDUT(BaseDUT):
return formatted_data
def _port_open(self):
self.port_inst = serial.Serial(self.port, **self.serial_configs)
self.port_inst = serial.serial_for_url(self.port, **self.serial_configs)
def _port_close(self):
self.port_inst.close()

View File

@ -14,24 +14,12 @@
""" example of writing test with TinyTestFW """
import re
import os
import sys
try:
import TinyFW
except ImportError:
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
from IDF.IDFDUT import ESP32DUT
import ttfw_idf
from tiny_test_fw import TinyFW
@IDF.idf_example_test(env_tag="Example_WIFI")
@ttfw_idf.idf_example_test(env_tag="Example_WIFI")
def test_examples_protocol_https_request(env, extra_data):
"""
steps: |
@ -39,7 +27,7 @@ def test_examples_protocol_https_request(env, extra_data):
2. connect to www.howsmyssl.com:443
3. send http request
"""
dut1 = env.get_dut("https_request", "examples/protocols/https_request", dut_class=ESP32DUT)
dut1 = env.get_dut("https_request", "examples/protocols/https_request", dut_class=ttfw_idf.ESP32DUT)
dut1.start_app()
dut1.expect(re.compile(r"Connecting to www.howsmyssl.com:443"), timeout=30)
dut1.expect("Performing the SSL/TLS handshake")
@ -51,5 +39,5 @@ def test_examples_protocol_https_request(env, extra_data):
if __name__ == '__main__':
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=ttfw_idf.IDFDUT)
test_examples_protocol_https_request()

View File

@ -20,6 +20,8 @@ import re
import functools
import tempfile
import subprocess
import time
import pexpect
# python2 and python3 queue package name is different
try:
@ -418,6 +420,7 @@ class IDFDUT(DUT.SerialDUT):
class ESP32DUT(IDFDUT):
TARGET = "esp32"
TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"
@classmethod
def _get_rom(cls):
return esptool.ESP32ROM
@ -426,6 +429,7 @@ class ESP32DUT(IDFDUT):
class ESP32S2DUT(IDFDUT):
TARGET = "esp32s2beta"
TOOLCHAIN_PREFIX = "xtensa-esp32s2-elf-"
@classmethod
def _get_rom(cls):
return esptool.ESP32S2ROM
@ -434,13 +438,105 @@ class ESP32S2DUT(IDFDUT):
class ESP8266DUT(IDFDUT):
TARGET = "esp8266"
TOOLCHAIN_PREFIX = "xtensa-lx106-elf-"
@classmethod
def _get_rom(cls):
return esptool.ESP8266ROM
def get_target_by_rom_class(cls):
for c in [ESP32DUT, ESP32S2DUT, ESP8266DUT]:
for c in [ESP32DUT, ESP32S2DUT, ESP8266DUT, IDFQEMUDUT]:
if c._get_rom() == cls:
return c.TARGET
return None
class IDFQEMUDUT(IDFDUT):
TARGET = None
TOOLCHAIN_PREFIX = None
ERASE_NVS = True
DEFAULT_EXPECT_TIMEOUT = 30 # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation)
QEMU_SERIAL_PORT = 3334
def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix=".bin", prefix="qemu_flash_img")
self.app = app
self.flash_size = 4 * 1024 * 1024
self._write_flash_img()
args = [
"qemu-system-xtensa",
"-nographic",
"-machine", self.TARGET,
"-drive", "file={},if=mtd,format=raw".format(self.flash_image.name),
"-nic", "user,model=open_eth",
"-serial", "tcp::{},server,nowait".format(self.QEMU_SERIAL_PORT),
"-S",
"-global driver=timer.esp32.timg,property=wdt_disable,value=true"]
# TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
if "QEMU_BIOS_PATH" in os.environ:
args += ["-L", os.environ["QEMU_BIOS_PATH"]]
self.qemu = pexpect.spawn(" ".join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
self.qemu.expect_exact(b"(qemu)")
super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
def _write_flash_img(self):
self.flash_image.seek(0)
self.flash_image.write(b'\x00' * self.flash_size)
for offs, path in self.app.flash_files:
with open(path, "rb") as flash_file:
contents = flash_file.read()
self.flash_image.seek(offs)
self.flash_image.write(contents)
self.flash_image.flush()
@classmethod
def _get_rom(cls):
return esptool.ESP32ROM
@classmethod
def get_mac(cls, app, port):
# TODO(IDF-1242): get this from QEMU/efuse binary
return "11:22:33:44:55:66"
@classmethod
def confirm_dut(cls, port, **kwargs):
return True, cls.TARGET
def start_app(self, erase_nvs=ERASE_NVS):
# TODO: implement erase_nvs
# since the flash image is generated every time in the constructor, maybe this isn't needed...
self.qemu.sendline(b"cont\n")
self.qemu.expect_exact(b"(qemu)")
def reset(self):
self.qemu.sendline(b"system_reset\n")
self.qemu.expect_exact(b"(qemu)")
def erase_partition(self, partition):
raise NotImplementedError("method not erase_partition not implemented")
def dump_flush(self, output_file, **kwargs):
raise NotImplementedError("method not dump_flush not implemented")
@classmethod
def list_available_ports(cls):
return ["socket://localhost:{}".format(cls.QEMU_SERIAL_PORT)]
def close(self):
super(IDFQEMUDUT, self).close()
self.qemu.sendline(b"q\n")
self.qemu.expect_exact(b"(qemu)")
for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
if not self.qemu.isalive():
break
time.sleep(1)
else:
self.qemu.terminate(force=True)
class ESP32QEMUDUT(IDFQEMUDUT):
TARGET = "esp32"
TOOLCHAIN_PREFIX = "xtensa-esp32-elf-"

View File

@ -16,7 +16,7 @@ import re
from tiny_test_fw import TinyFW, Utility
from IDFApp import IDFApp, Example, LoadableElfExample, UT # noqa: export all Apps for users
from IDFDUT import IDFDUT, ESP32DUT, ESP32S2DUT, ESP8266DUT # noqa: export DUTs for users
from IDFDUT import IDFDUT, ESP32DUT, ESP32S2DUT, ESP8266DUT, ESP32QEMUDUT # noqa: export DUTs for users
def format_case_id(chip, case_name):

View File

@ -0,0 +1,3 @@
-r ../tiny_test_fw/requirements.txt
pexpect
python-gitlab