Browse Source

scripts: twister: Isolate statuses into a separate class

Now statuses are not just a str that can be easily mistyped
or assigned wrong. Now they are an Enum.

Signed-off-by: Lukasz Mrugala <lukaszx.mrugala@intel.com>
pull/76983/head
Lukasz Mrugala 1 year ago committed by Anas Nashif
parent
commit
5c6c44a247
  1. 105
      scripts/pylib/twister/twisterlib/handlers.py
  2. 154
      scripts/pylib/twister/twisterlib/harness.py
  3. 4
      scripts/pylib/twister/twisterlib/package.py
  4. 72
      scripts/pylib/twister/twisterlib/reports.py
  5. 75
      scripts/pylib/twister/twisterlib/runner.py
  6. 86
      scripts/pylib/twister/twisterlib/statuses.py
  7. 11
      scripts/pylib/twister/twisterlib/testinstance.py
  8. 37
      scripts/pylib/twister/twisterlib/testplan.py
  9. 4
      scripts/pylib/twister/twisterlib/testsuite.py
  10. 3
      scripts/pylib/twister/twisterlib/twister_main.py
  11. 121
      scripts/tests/twister/test_handlers.py
  12. 132
      scripts/tests/twister/test_harness.py
  13. 123
      scripts/tests/twister/test_runner.py
  14. 13
      scripts/tests/twister/test_testinstance.py
  15. 51
      scripts/tests/twister/test_testplan.py
  16. 4
      scripts/tests/twister_blackbox/test_tooling.py

105
scripts/pylib/twister/twisterlib/handlers.py

@ -23,6 +23,7 @@ from queue import Queue, Empty @@ -23,6 +23,7 @@ from queue import Queue, Empty
from twisterlib.environment import ZEPHYR_BASE, strip_ansi_sequences
from twisterlib.error import TwisterException
from twisterlib.platform import Platform
from twisterlib.statuses import HarnessStatus, OutputStatus, TestCaseStatus, TestInstanceStatus
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/build_helpers"))
from domains import Domains
@ -111,7 +112,7 @@ class Handler: @@ -111,7 +112,7 @@ class Handler:
logger.debug(f"Expected suite names:{expected_suite_names}")
logger.debug(f"Detected suite names:{detected_suite_names}")
if not expected_suite_names or \
not harness_state == "passed":
not harness_state == HarnessStatus.PASS:
return
if not detected_suite_names:
self._missing_suite_name(expected_suite_names, handler_time)
@ -127,10 +128,10 @@ class Handler: @@ -127,10 +128,10 @@ class Handler:
Change result of performed test if problem with missing or unpropper
suite name was occurred.
"""
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.execution_time = handler_time
for tc in self.instance.testcases:
tc.status = "failed"
tc.status = TestCaseStatus.FAIL
self.instance.reason = f"Testsuite mismatch"
logger.debug("Test suite names were not printed or some of them in " \
"output do not correspond with expected: %s",
@ -142,14 +143,14 @@ class Handler: @@ -142,14 +143,14 @@ class Handler:
harness_class_name = type(harness).__name__
if self.suite_name_check and harness_class_name == "Test":
self._verify_ztest_suite_name(harness.state, harness.detected_suite_names, handler_time)
if self.instance.status == 'failed':
if self.instance.status == TestInstanceStatus.FAIL:
return
if not harness.matched_run_id and harness.run_id_exists:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.execution_time = handler_time
self.instance.reason = "RunID mismatch"
for tc in self.instance.testcases:
tc.status = "failed"
tc.status = TestCaseStatus.FAIL
self.instance.record(harness.recording)
@ -215,7 +216,7 @@ class BinaryHandler(Handler): @@ -215,7 +216,7 @@ class BinaryHandler(Handler):
log_out_fp.write(strip_ansi_sequences(line_decoded))
log_out_fp.flush()
harness.handle(stripped_line)
if harness.state:
if harness.state != HarnessStatus.NONE:
if not timeout_extended or harness.capture_coverage:
timeout_extended = True
if harness.capture_coverage:
@ -294,21 +295,21 @@ class BinaryHandler(Handler): @@ -294,21 +295,21 @@ class BinaryHandler(Handler):
def _update_instance_info(self, harness_state, handler_time):
self.instance.execution_time = handler_time
if not self.terminated and self.returncode != 0:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
if self.options.enable_valgrind and self.returncode == 2:
self.instance.reason = "Valgrind error"
else:
# When a process is killed, the default handler returns 128 + SIGTERM
# so in that case the return code itself is not meaningful
self.instance.reason = "Failed"
elif harness_state:
elif harness_state != HarnessStatus.NONE:
self.instance.status = harness_state
if harness_state == "failed":
if harness_state == HarnessStatus.FAIL:
self.instance.reason = "Failed"
else:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.reason = "Timeout"
self.instance.add_missing_case_status("blocked", "Timeout")
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, "Timeout")
def handle(self, harness):
robot_test = getattr(harness, "is_robot_test", False)
@ -448,7 +449,7 @@ class DeviceHandler(Handler): @@ -448,7 +449,7 @@ class DeviceHandler(Handler):
log_out_fp.flush()
harness.handle(sl.rstrip())
if harness.state:
if harness.state != HarnessStatus.NONE:
if not harness.capture_coverage:
ser.close()
break
@ -562,17 +563,17 @@ class DeviceHandler(Handler): @@ -562,17 +563,17 @@ class DeviceHandler(Handler):
def _update_instance_info(self, harness_state, handler_time, flash_error):
self.instance.execution_time = handler_time
if harness_state:
if harness_state != HarnessStatus.NONE:
self.instance.status = harness_state
if harness_state == "failed":
if harness_state == HarnessStatus.FAIL:
self.instance.reason = "Failed"
self.instance.add_missing_case_status("blocked", harness_state)
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, harness_state)
elif not flash_error:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.reason = "Timeout"
if self.instance.status in ["error", "failed"]:
self.instance.add_missing_case_status("blocked", self.instance.reason)
if self.instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, self.instance.reason)
def _terminate_pty(self, ser_pty, ser_pty_process):
logger.debug(f"Terminating serial-pty:'{ser_pty}'")
@ -597,11 +598,11 @@ class DeviceHandler(Handler): @@ -597,11 +598,11 @@ class DeviceHandler(Handler):
timeout=max(flash_timeout, self.get_test_timeout())
)
except serial.SerialException as e:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.reason = "Serial Device Error"
logger.error("Serial device error: %s" % (str(e)))
self.instance.add_missing_case_status("blocked", "Serial Device Error")
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, "Serial Device Error")
if serial_pty and ser_pty_process:
self._terminate_pty(serial_pty, ser_pty_process)
@ -621,7 +622,7 @@ class DeviceHandler(Handler): @@ -621,7 +622,7 @@ class DeviceHandler(Handler):
time.sleep(1)
hardware = self.device_is_available(self.instance)
except TwisterException as error:
self.instance.status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.reason = str(error)
logger.error(self.instance.reason)
return hardware
@ -714,7 +715,7 @@ class DeviceHandler(Handler): @@ -714,7 +715,7 @@ class DeviceHandler(Handler):
logger.debug(stdout.decode(errors="ignore"))
if proc.returncode != 0:
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Device issue (Flash error?)"
flash_error = True
with open(d_log, "w") as dlog_fp:
@ -724,7 +725,7 @@ class DeviceHandler(Handler): @@ -724,7 +725,7 @@ class DeviceHandler(Handler):
logger.warning("Flash operation timed out.")
self.terminate(proc)
(stdout, stderr) = proc.communicate()
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Device issue (Timeout)"
flash_error = True
@ -733,7 +734,7 @@ class DeviceHandler(Handler): @@ -733,7 +734,7 @@ class DeviceHandler(Handler):
except subprocess.CalledProcessError:
halt_monitor_evt.set()
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Device issue (Flash error)"
flash_error = True
@ -894,7 +895,7 @@ class QEMUHandler(Handler): @@ -894,7 +895,7 @@ class QEMUHandler(Handler):
timeout_time = start_time + timeout
p = select.poll()
p.register(in_fp, select.POLLIN)
_status = None
_status = OutputStatus.NONE
_reason = None
line = ""
@ -916,18 +917,18 @@ class QEMUHandler(Handler): @@ -916,18 +917,18 @@ class QEMUHandler(Handler):
# of not enough CPU time scheduled by host for
# QEMU process during p.poll(this_timeout)
cpu_time = QEMUHandler._get_cpu_time(pid)
if cpu_time < timeout and not _status:
if cpu_time < timeout and _status == OutputStatus.NONE:
timeout_time = time.time() + (timeout - cpu_time)
continue
except psutil.NoSuchProcess:
pass
except ProcessLookupError:
_status = "failed"
_status = OutputStatus.FAIL
_reason = "Execution error"
break
if not _status:
_status = "failed"
if _status == OutputStatus.NONE:
_status = OutputStatus.FAIL
_reason = "timeout"
break
@ -938,14 +939,14 @@ class QEMUHandler(Handler): @@ -938,14 +939,14 @@ class QEMUHandler(Handler):
c = in_fp.read(1).decode("utf-8")
except UnicodeDecodeError:
# Test is writing something weird, fail
_status = "failed"
_status = OutputStatus.FAIL
_reason = "unexpected byte"
break
if c == "":
# EOF, this shouldn't happen unless QEMU crashes
if not ignore_unexpected_eof:
_status = "failed"
_status = OutputStatus.FAIL
_reason = "unexpected eof"
break
line = line + c
@ -959,11 +960,11 @@ class QEMUHandler(Handler): @@ -959,11 +960,11 @@ class QEMUHandler(Handler):
logger.debug(f"QEMU ({pid}): {line}")
harness.handle(line)
if harness.state:
if harness.state != HarnessStatus.NONE:
# if we have registered a fail make sure the status is not
# overridden by a false success message coming from the
# testsuite
if _status != 'failed':
if _status != OutputStatus.FAIL:
_status = harness.state
_reason = harness.reason
@ -1006,14 +1007,15 @@ class QEMUHandler(Handler): @@ -1006,14 +1007,15 @@ class QEMUHandler(Handler):
return command
def _update_instance_info(self, harness_state, is_timeout):
if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
self.instance.status = "failed"
if (self.returncode != 0 and not self.ignore_qemu_crash) or \
harness_state == HarnessStatus.NONE:
self.instance.status = TestInstanceStatus.FAIL
if is_timeout:
self.instance.reason = "Timeout"
else:
if not self.instance.reason:
self.instance.reason = "Exited with {}".format(self.returncode)
self.instance.add_missing_case_status("blocked")
self.instance.add_missing_case_status(TestCaseStatus.BLOCK)
def handle(self, harness):
self.run = True
@ -1054,7 +1056,7 @@ class QEMUHandler(Handler): @@ -1054,7 +1056,7 @@ class QEMUHandler(Handler):
is_timeout = True
self.terminate(proc)
if harness.state == "passed":
if harness.state == HarnessStatus.PASS:
self.returncode = 0
else:
self.returncode = proc.returncode
@ -1171,14 +1173,15 @@ class QEMUWinHandler(Handler): @@ -1171,14 +1173,15 @@ class QEMUWinHandler(Handler):
return command
def _update_instance_info(self, harness_state, is_timeout):
if (self.returncode != 0 and not self.ignore_qemu_crash) or not harness_state:
self.instance.status = "failed"
if (self.returncode != 0 and not self.ignore_qemu_crash) or \
harness_state == HarnessStatus.NONE:
self.instance.status = TestInstanceStatus.FAIL
if is_timeout:
self.instance.reason = "Timeout"
else:
if not self.instance.reason:
self.instance.reason = "Exited with {}".format(self.returncode)
self.instance.add_missing_case_status("blocked")
self.instance.add_missing_case_status(TestInstanceStatus.BLOCK)
def _enqueue_char(self, queue):
while not self.stop_thread:
@ -1200,7 +1203,7 @@ class QEMUWinHandler(Handler): @@ -1200,7 +1203,7 @@ class QEMUWinHandler(Handler):
def _monitor_output(self, queue, timeout, logfile, pid_fn, harness, ignore_unexpected_eof=False):
start_time = time.time()
timeout_time = start_time + timeout
_status = None
_status = OutputStatus.NONE
_reason = None
line = ""
timeout_extended = False
@ -1217,18 +1220,18 @@ class QEMUWinHandler(Handler): @@ -1217,18 +1220,18 @@ class QEMUWinHandler(Handler):
# of not enough CPU time scheduled by host for
# QEMU process during p.poll(this_timeout)
cpu_time = self._get_cpu_time(self.pid)
if cpu_time < timeout and not _status:
if cpu_time < timeout and _status == OutputStatus.NONE:
timeout_time = time.time() + (timeout - cpu_time)
continue
except psutil.NoSuchProcess:
pass
except ProcessLookupError:
_status = "failed"
_status = OutputStatus.FAIL
_reason = "Execution error"
break
if not _status:
_status = "failed"
if _status == OutputStatus.NONE:
_status = OutputStatus.FAIL
_reason = "timeout"
break
@ -1247,14 +1250,14 @@ class QEMUWinHandler(Handler): @@ -1247,14 +1250,14 @@ class QEMUWinHandler(Handler):
c = c.decode("utf-8")
except UnicodeDecodeError:
# Test is writing something weird, fail
_status = "failed"
_status = OutputStatus.FAIL
_reason = "unexpected byte"
break
if c == "":
# EOF, this shouldn't happen unless QEMU crashes
if not ignore_unexpected_eof:
_status = "failed"
_status = OutputStatus.FAIL
_reason = "unexpected eof"
break
line = line + c
@ -1268,11 +1271,11 @@ class QEMUWinHandler(Handler): @@ -1268,11 +1271,11 @@ class QEMUWinHandler(Handler):
logger.debug(f"QEMU ({self.pid}): {line}")
harness.handle(line)
if harness.state:
if harness.state != HarnessStatus.NONE:
# if we have registered a fail make sure the state is not
# overridden by a false success message coming from the
# testsuite
if _status != 'failed':
if _status != OutputStatus.FAIL:
_status = harness.state
_reason = harness.reason
@ -1329,7 +1332,7 @@ class QEMUWinHandler(Handler): @@ -1329,7 +1332,7 @@ class QEMUWinHandler(Handler):
time.sleep(0.5)
proc.kill()
if harness.state == "passed":
if harness.state == HarnessStatus.PASS:
self.returncode = 0
else:
self.returncode = proc.returncode

154
scripts/pylib/twister/twisterlib/harness.py

@ -18,6 +18,7 @@ import json @@ -18,6 +18,7 @@ import json
from twisterlib.error import ConfigurationError
from twisterlib.environment import ZEPHYR_BASE, PYTEST_PLUGIN_INSTALLED
from twisterlib.handlers import Handler, terminate_process, SUPPORTED_SIMS_IN_PYTEST
from twisterlib.statuses import HarnessStatus, ReportStatus, TestCaseStatus, TestInstanceStatus
from twisterlib.testinstance import TestInstance
@ -38,14 +39,14 @@ class Harness: @@ -38,14 +39,14 @@ class Harness:
ztest_to_status = {
'PASS': 'passed',
'SKIP': 'skipped',
'BLOCK': 'blocked',
'FAIL': 'failed'
'PASS': TestCaseStatus.PASS,
'SKIP': TestCaseStatus.SKIP,
'BLOCK': TestCaseStatus.BLOCK,
'FAIL': TestCaseStatus.FAIL
}
def __init__(self):
self.state = None
self.state = HarnessStatus.NONE
self.reason = None
self.type = None
self.regex = []
@ -132,13 +133,13 @@ class Harness: @@ -132,13 +133,13 @@ class Harness:
if self.RUN_PASSED in line:
if self.fault:
self.state = "failed"
self.state = HarnessStatus.FAIL
self.reason = "Fault detected while running test"
else:
self.state = "passed"
self.state = HarnessStatus.PASS
if self.RUN_FAILED in line:
self.state = "failed"
self.state = HarnessStatus.FAIL
self.reason = "Testsuite failed"
if self.fail_on_fault:
@ -169,9 +170,9 @@ class Robot(Harness): @@ -169,9 +170,9 @@ class Robot(Harness):
handle is trying to give a PASS or FAIL to avoid timeout, nothing
is writen into handler.log
'''
self.instance.state = "passed"
self.instance.state = TestInstanceStatus.PASS
tc = self.instance.get_case_or_create(self.id)
tc.status = "passed"
tc.status = TestCaseStatus.PASS
def run_robot_test(self, command, handler):
start_time = time.time()
@ -202,16 +203,16 @@ class Robot(Harness): @@ -202,16 +203,16 @@ class Robot(Harness):
self.instance.execution_time = time.time() - start_time
if renode_test_proc.returncode == 0:
self.instance.status = "passed"
self.instance.status = TestInstanceStatus.PASS
# all tests in one Robot file are treated as a single test case,
# so its status should be set accordingly to the instance status
# please note that there should be only one testcase in testcases list
self.instance.testcases[0].status = "passed"
self.instance.testcases[0].status = TestCaseStatus.PASS
else:
logger.error("Robot test failure: %s for %s" %
(handler.sourcedir, self.instance.platform.name))
self.instance.status = "failed"
self.instance.testcases[0].status = "failed"
self.instance.status = TestInstanceStatus.FAIL
self.instance.testcases[0].status = TestCaseStatus.FAIL
if out:
with open(os.path.join(self.instance.build_dir, handler.log), "wt") as log:
@ -236,10 +237,10 @@ class Console(Harness): @@ -236,10 +237,10 @@ class Console(Harness):
def configure(self, instance):
super(Console, self).configure(instance)
if self.regex is None or len(self.regex) == 0:
self.state = "failed"
self.state = HarnessStatus.FAIL
tc = self.instance.set_case_status_by_name(
self.get_testcase_name(),
"failed",
TestCaseStatus.FAIL,
f"HARNESS:{self.__class__.__name__}:no regex patterns configured."
)
raise ConfigurationError(self.instance.name, tc.reason)
@ -252,10 +253,10 @@ class Console(Harness): @@ -252,10 +253,10 @@ class Console(Harness):
self.patterns.append(re.compile(r))
self.patterns_expected = len(self.patterns)
else:
self.state = "failed"
self.state = HarnessStatus.FAIL
tc = self.instance.set_case_status_by_name(
self.get_testcase_name(),
"failed",
TestCaseStatus.FAIL,
f"HARNESS:{self.__class__.__name__}:incorrect type={self.type}"
)
raise ConfigurationError(self.instance.name, tc.reason)
@ -267,7 +268,7 @@ class Console(Harness): @@ -267,7 +268,7 @@ class Console(Harness):
logger.debug(f"HARNESS:{self.__class__.__name__}:EXPECTED:"
f"'{self.pattern.pattern}'")
self.next_pattern += 1
self.state = "passed"
self.state = HarnessStatus.PASS
elif self.type == "multi_line" and self.ordered:
if (self.next_pattern < len(self.patterns) and
self.patterns[self.next_pattern].search(line)):
@ -276,7 +277,7 @@ class Console(Harness): @@ -276,7 +277,7 @@ class Console(Harness):
f"'{self.patterns[self.next_pattern].pattern}'")
self.next_pattern += 1
if self.next_pattern >= len(self.patterns):
self.state = "passed"
self.state = HarnessStatus.PASS
elif self.type == "multi_line" and not self.ordered:
for i, pattern in enumerate(self.patterns):
r = self.regex[i]
@ -286,7 +287,7 @@ class Console(Harness): @@ -286,7 +287,7 @@ class Console(Harness):
f"{len(self.matches)}/{self.patterns_expected}):"
f"'{pattern.pattern}'")
if len(self.matches) == len(self.regex):
self.state = "passed"
self.state = HarnessStatus.PASS
else:
logger.error("Unknown harness_config type")
@ -300,31 +301,35 @@ class Console(Harness): @@ -300,31 +301,35 @@ class Console(Harness):
self.capture_coverage = False
self.process_test(line)
# Reset the resulting test state to 'failed' when not all of the patterns were
# Reset the resulting test state to FAIL when not all of the patterns were
# found in the output, but just ztest's 'PROJECT EXECUTION SUCCESSFUL'.
# It might happen because of the pattern sequence diverged from the
# test code, the test platform has console issues, or even some other
# test image was executed.
# TODO: Introduce explicit match policy type to reject
# unexpected console output, allow missing patterns, deny duplicates.
if self.state == "passed" and self.ordered and self.next_pattern < self.patterns_expected:
if self.state == HarnessStatus.PASS and \
self.ordered and \
self.next_pattern < self.patterns_expected:
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
f" {self.next_pattern} of {self.patterns_expected}"
f" expected ordered patterns.")
self.state = "failed"
self.state = HarnessStatus.FAIL
self.reason = "patterns did not match (ordered)"
if self.state == "passed" and not self.ordered and len(self.matches) < self.patterns_expected:
if self.state == HarnessStatus.PASS and \
not self.ordered and \
len(self.matches) < self.patterns_expected:
logger.error(f"HARNESS:{self.__class__.__name__}: failed with"
f" {len(self.matches)} of {self.patterns_expected}"
f" expected unordered patterns.")
self.state = "failed"
self.state = HarnessStatus.FAIL
self.reason = "patterns did not match (unordered)"
tc = self.instance.get_case_or_create(self.get_testcase_name())
if self.state == "passed":
tc.status = "passed"
if self.state == HarnessStatus.PASS:
tc.status = TestCaseStatus.PASS
else:
tc.status = "failed"
tc.status = TestCaseStatus.FAIL
class PytestHarnessException(Exception):
@ -347,7 +352,7 @@ class Pytest(Harness): @@ -347,7 +352,7 @@ class Pytest(Harness):
self.run_command(cmd, timeout)
except PytestHarnessException as pytest_exception:
logger.error(str(pytest_exception))
self.state = 'failed'
self.state = HarnessStatus.FAIL
self.instance.reason = str(pytest_exception)
finally:
if self.reserved_serial:
@ -481,10 +486,10 @@ class Pytest(Harness): @@ -481,10 +486,10 @@ class Pytest(Harness):
logger.warning('Timeout has occurred. Can be extended in testspec file. '
f'Currently set to {timeout} seconds.')
self.instance.reason = 'Pytest timeout'
self.state = 'failed'
self.state = HarnessStatus.FAIL
proc.wait(timeout)
except subprocess.TimeoutExpired:
self.state = 'failed'
self.state = HarnessStatus.FAIL
proc.kill()
@staticmethod
@ -520,36 +525,37 @@ class Pytest(Harness): @@ -520,36 +525,37 @@ class Pytest(Harness):
proc.communicate()
def _update_test_status(self):
if not self.state:
if self.state == HarnessStatus.NONE:
self.instance.testcases = []
try:
self._parse_report_file(self.report_file)
except Exception as e:
logger.error(f'Error when parsing file {self.report_file}: {e}')
self.state = 'failed'
self.state = HarnessStatus.FAIL
finally:
if not self.instance.testcases:
self.instance.init_cases()
self.instance.status = self.state or 'failed'
if self.instance.status in ['error', 'failed']:
self.instance.status = self.state if self.state != HarnessStatus.NONE else \
TestInstanceStatus.FAIL
if self.instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
self.instance.reason = self.instance.reason or 'Pytest failed'
self.instance.add_missing_case_status('blocked', self.instance.reason)
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, self.instance.reason)
def _parse_report_file(self, report):
tree = ET.parse(report)
root = tree.getroot()
if elem_ts := root.find('testsuite'):
if elem_ts.get('failures') != '0':
self.state = 'failed'
self.state = HarnessStatus.FAIL
self.instance.reason = f"{elem_ts.get('failures')}/{elem_ts.get('tests')} pytest scenario(s) failed"
elif elem_ts.get('errors') != '0':
self.state = 'error'
self.state = HarnessStatus.ERROR
self.instance.reason = 'Error during pytest execution'
elif elem_ts.get('skipped') == elem_ts.get('tests'):
self.state = 'skipped'
self.state = HarnessStatus.SKIP
else:
self.state = 'passed'
self.state = HarnessStatus.PASS
self.instance.execution_time = float(elem_ts.get('time'))
for elem_tc in elem_ts.findall('testcase'):
@ -557,18 +563,18 @@ class Pytest(Harness): @@ -557,18 +563,18 @@ class Pytest(Harness):
tc.duration = float(elem_tc.get('time'))
elem = elem_tc.find('*')
if elem is None:
tc.status = 'passed'
tc.status = TestCaseStatus.PASS
else:
if elem.tag == 'skipped':
tc.status = 'skipped'
elif elem.tag == 'failure':
tc.status = 'failed'
if elem.tag == ReportStatus.SKIP:
tc.status = TestCaseStatus.SKIP
elif elem.tag == ReportStatus.FAIL:
tc.status = TestCaseStatus.FAIL
else:
tc.status = 'error'
tc.status = TestCaseStatus.ERROR
tc.reason = elem.get('message')
tc.output = elem.text
else:
self.state = 'skipped'
self.state = HarnessStatus.SKIP
self.instance.reason = 'No tests collected'
@ -589,7 +595,7 @@ class Gtest(Harness): @@ -589,7 +595,7 @@ class Gtest(Harness):
# Strip the ANSI characters, they mess up the patterns
non_ansi_line = self.ANSI_ESCAPE.sub('', line)
if self.state:
if self.state != HarnessStatus.NONE:
return
# Check if we started running a new test
@ -615,7 +621,7 @@ class Gtest(Harness): @@ -615,7 +621,7 @@ class Gtest(Harness):
# Create the test instance and set the context
tc = self.instance.get_case_or_create(name)
self.tc = tc
self.tc.status = "started"
self.tc.status = TestCaseStatus.STARTED
self.testcase_output += line + "\n"
self._match = True
@ -624,16 +630,16 @@ class Gtest(Harness): @@ -624,16 +630,16 @@ class Gtest(Harness):
if finished_match:
tc = self.instance.get_case_or_create(self.id)
if self.has_failures or self.tc is not None:
self.state = "failed"
tc.status = "failed"
self.state = HarnessStatus.FAIL
tc.status = TestCaseStatus.FAIL
else:
self.state = "passed"
tc.status = "passed"
self.state = HarnessStatus.PASS
tc.status = TestCaseStatus.PASS
return
# Check if the individual test finished
state, name = self._check_result(non_ansi_line)
if state is None or name is None:
if state == TestCaseStatus.NONE or name is None:
# Nothing finished, keep processing lines
return
@ -648,7 +654,7 @@ class Gtest(Harness): @@ -648,7 +654,7 @@ class Gtest(Harness):
# Update the status of the test
tc.status = state
if tc.status == "failed":
if tc.status == TestCaseStatus.FAIL:
self.has_failures = True
tc.output = self.testcase_output
self.testcase_output = ""
@ -657,13 +663,25 @@ class Gtest(Harness): @@ -657,13 +663,25 @@ class Gtest(Harness):
def _check_result(self, line):
test_pass_match = re.search(self.TEST_PASS_PATTERN, line)
if test_pass_match:
return "passed", "{}.{}.{}".format(self.id, test_pass_match.group("suite_name"), test_pass_match.group("test_name"))
return TestCaseStatus.PASS, \
"{}.{}.{}".format(
self.id, test_pass_match.group("suite_name"),
test_pass_match.group("test_name")
)
test_skip_match = re.search(self.TEST_SKIP_PATTERN, line)
if test_skip_match:
return "skipped", "{}.{}.{}".format(self.id, test_skip_match.group("suite_name"), test_skip_match.group("test_name"))
return TestCaseStatus.SKIP, \
"{}.{}.{}".format(
self.id, test_skip_match.group("suite_name"),
test_skip_match.group("test_name")
)
test_fail_match = re.search(self.TEST_FAIL_PATTERN, line)
if test_fail_match:
return "failed", "{}.{}.{}".format(self.id, test_fail_match.group("suite_name"), test_fail_match.group("test_name"))
return TestCaseStatus.FAIL, \
"{}.{}.{}".format(
self.id, test_fail_match.group("suite_name"),
test_fail_match.group("test_name")
)
return None, None
@ -687,7 +705,7 @@ class Test(Harness): @@ -687,7 +705,7 @@ class Test(Harness):
# Mark the test as started, if something happens here, it is mostly
# due to this tests, for example timeout. This should in this case
# be marked as failed and not blocked (not run).
tc.status = "started"
tc.status = TestCaseStatus.STARTED
if testcase_match or self._match:
self.testcase_output += line + "\n"
@ -706,10 +724,10 @@ class Test(Harness): @@ -706,10 +724,10 @@ class Test(Harness):
name = "{}.{}".format(self.id, result_match.group(3))
tc = self.instance.get_case_or_create(name)
tc.status = self.ztest_to_status[matched_status]
if tc.status == "skipped":
if tc.status == TestCaseStatus.SKIP:
tc.reason = "ztest skip"
tc.duration = float(result_match.group(4))
if tc.status == "failed":
if tc.status == TestCaseStatus.FAIL:
tc.output = self.testcase_output
self.testcase_output = ""
self._match = False
@ -720,10 +738,10 @@ class Test(Harness): @@ -720,10 +738,10 @@ class Test(Harness):
name = "{}.{}".format(self.id, summary_match.group(4))
tc = self.instance.get_case_or_create(name)
tc.status = self.ztest_to_status[matched_status]
if tc.status == "skipped":
if tc.status == TestCaseStatus.SKIP:
tc.reason = "ztest skip"
tc.duration = float(summary_match.group(5))
if tc.status == "failed":
if tc.status == TestCaseStatus.FAIL:
tc.output = self.testcase_output
self.testcase_output = ""
self._match = False
@ -731,13 +749,13 @@ class Test(Harness): @@ -731,13 +749,13 @@ class Test(Harness):
self.process_test(line)
if not self.ztest and self.state:
if not self.ztest and self.state != HarnessStatus.NONE:
logger.debug(f"not a ztest and no state for {self.id}")
tc = self.instance.get_case_or_create(self.id)
if self.state == "passed":
tc.status = "passed"
if self.state == HarnessStatus.PASS:
tc.status = TestCaseStatus.PASS
else:
tc.status = "failed"
tc.status = TestCaseStatus.FAIL
tc.reason = "Test failure"

4
scripts/pylib/twister/twisterlib/package.py

@ -7,6 +7,8 @@ import tarfile @@ -7,6 +7,8 @@ import tarfile
import json
import os
from twisterlib.statuses import TestSuiteStatus
class Artifacts:
def __init__(self, env):
@ -25,7 +27,7 @@ class Artifacts: @@ -25,7 +27,7 @@ class Artifacts:
with open(os.path.join(self.options.outdir, "twister.json"), "r") as json_test_plan:
jtp = json.load(json_test_plan)
for t in jtp['testsuites']:
if t['status'] != "filtered":
if t['status'] != TestSuiteStatus.FILTER:
p = t['platform']
normalized = p.replace("/", "_")
dirs.append(os.path.join(self.options.outdir, normalized, t['name']))

72
scripts/pylib/twister/twisterlib/reports.py

@ -13,6 +13,8 @@ import string @@ -13,6 +13,8 @@ import string
from datetime import datetime
from pathlib import PosixPath
from twisterlib.statuses import ReportStatus, TestCaseStatus, TestInstanceStatus, TestSuiteStatus
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
@ -55,7 +57,7 @@ class Reporting: @@ -55,7 +57,7 @@ class Reporting:
def xunit_testcase(eleTestsuite, name, classname, status, ts_status, reason, duration, runnable, stats, log, build_only_as_skip):
fails, passes, errors, skips = stats
if status in ['skipped', 'filtered']:
if status in [TestCaseStatus.SKIP, TestCaseStatus.FILTER]:
duration = 0
eleTestcase = ET.SubElement(
@ -64,34 +66,34 @@ class Reporting: @@ -64,34 +66,34 @@ class Reporting:
name=f"{name}",
time=f"{duration}")
if status in ['skipped', 'filtered']:
if status in [TestCaseStatus.SKIP, TestCaseStatus.FILTER]:
skips += 1
# temporarily add build_only_as_skip to restore existing CI report behaviour
if ts_status == "passed" and not runnable:
if ts_status == TestSuiteStatus.PASS and not runnable:
tc_type = "build"
else:
tc_type = status
ET.SubElement(eleTestcase, 'skipped', type=f"{tc_type}", message=f"{reason}")
elif status in ["failed", "blocked"]:
ET.SubElement(eleTestcase, ReportStatus.SKIP, type=f"{tc_type}", message=f"{reason}")
elif status in [TestCaseStatus.FAIL, TestCaseStatus.BLOCK]:
fails += 1
el = ET.SubElement(eleTestcase, 'failure', type="failure", message=f"{reason}")
el = ET.SubElement(eleTestcase, ReportStatus.FAIL, type="failure", message=f"{reason}")
if log:
el.text = log
elif status == "error":
elif status == TestCaseStatus.ERROR:
errors += 1
el = ET.SubElement(eleTestcase, 'error', type="failure", message=f"{reason}")
el = ET.SubElement(eleTestcase, ReportStatus.ERROR, type="failure", message=f"{reason}")
if log:
el.text = log
elif status == 'passed':
elif status == TestCaseStatus.PASS:
if not runnable and build_only_as_skip:
ET.SubElement(eleTestcase, 'skipped', type="build", message="built only")
ET.SubElement(eleTestcase, ReportStatus.SKIP, type="build", message="built only")
skips += 1
else:
passes += 1
else:
if not status:
if status == TestCaseStatus.NONE:
logger.debug(f"{name}: No status")
ET.SubElement(eleTestcase, 'skipped', type=f"untested", message="No results captured, testsuite misconfiguration?")
ET.SubElement(eleTestcase, ReportStatus.SKIP, type=f"untested", message="No results captured, testsuite misconfiguration?")
else:
logger.error(f"{name}: Unknown status '{status}'")
@ -114,7 +116,7 @@ class Reporting: @@ -114,7 +116,7 @@ class Reporting:
suites_to_report = all_suites
# do not create entry if everything is filtered out
if not self.env.options.detailed_skipped_report:
suites_to_report = list(filter(lambda d: d.get('status') != "filtered", all_suites))
suites_to_report = list(filter(lambda d: d.get('status') != TestSuiteStatus.FILTER, all_suites))
for suite in suites_to_report:
duration = 0
@ -184,7 +186,7 @@ class Reporting: @@ -184,7 +186,7 @@ class Reporting:
suites = list(filter(lambda d: d['platform'] == platform, all_suites))
# do not create entry if everything is filtered out
if not self.env.options.detailed_skipped_report:
non_filtered = list(filter(lambda d: d.get('status') != "filtered", suites))
non_filtered = list(filter(lambda d: d.get('status') != TestSuiteStatus.FILTER, suites))
if not non_filtered:
continue
@ -210,7 +212,7 @@ class Reporting: @@ -210,7 +212,7 @@ class Reporting:
ts_status = ts.get('status')
# Do not report filtered testcases
if ts_status == 'filtered' and not self.env.options.detailed_skipped_report:
if ts_status == TestSuiteStatus.FILTER and not self.env.options.detailed_skipped_report:
continue
if full_report:
for tc in ts.get("testcases", []):
@ -301,7 +303,7 @@ class Reporting: @@ -301,7 +303,7 @@ class Reporting:
suite['run_id'] = instance.run_id
suite["runnable"] = False
if instance.status != 'filtered':
if instance.status != TestInstanceStatus.FILTER:
suite["runnable"] = instance.run
if used_ram:
@ -317,7 +319,7 @@ class Reporting: @@ -317,7 +319,7 @@ class Reporting:
suite["available_ram"] = available_ram
if available_rom:
suite["available_rom"] = available_rom
if instance.status in ["error", "failed"]:
if instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
suite['status'] = instance.status
suite["reason"] = instance.reason
# FIXME
@ -329,16 +331,16 @@ class Reporting: @@ -329,16 +331,16 @@ class Reporting:
suite["log"] = self.process_log(device_log)
else:
suite["log"] = self.process_log(build_log)
elif instance.status == 'filtered':
suite["status"] = "filtered"
elif instance.status == TestInstanceStatus.FILTER:
suite["status"] = TestSuiteStatus.FILTER
suite["reason"] = instance.reason
elif instance.status == 'passed':
suite["status"] = "passed"
elif instance.status == 'skipped':
suite["status"] = "skipped"
elif instance.status == TestInstanceStatus.PASS:
suite["status"] = TestSuiteStatus.PASS
elif instance.status == TestInstanceStatus.SKIP:
suite["status"] = TestSuiteStatus.SKIP
suite["reason"] = instance.reason
if instance.status is not None:
if instance.status != TestInstanceStatus.NONE:
suite["execution_time"] = f"{float(handler_time):.2f}"
suite["build_time"] = f"{float(instance.build_time):.2f}"
@ -354,11 +356,11 @@ class Reporting: @@ -354,11 +356,11 @@ class Reporting:
# if we discover those at runtime, the fallback testcase wont be
# needed anymore and can be removed from the output, it does
# not have a status and would otherwise be reported as skipped.
if case.freeform and case.status is None and len(instance.testcases) > 1:
if case.freeform and case.status == TestCaseStatus.NONE and len(instance.testcases) > 1:
continue
testcase = {}
testcase['identifier'] = case.name
if instance.status:
if instance.status != TestInstanceStatus.NONE:
if single_case_duration:
testcase['execution_time'] = single_case_duration
else:
@ -367,11 +369,11 @@ class Reporting: @@ -367,11 +369,11 @@ class Reporting:
if case.output != "":
testcase['log'] = case.output
if case.status == "skipped":
if instance.status == "filtered":
testcase["status"] = "filtered"
if case.status == TestCaseStatus.SKIP:
if instance.status == TestInstanceStatus.FILTER:
testcase["status"] = TestCaseStatus.FILTER
else:
testcase["status"] = "skipped"
testcase["status"] = TestCaseStatus.SKIP
testcase["reason"] = case.reason or instance.reason
else:
testcase["status"] = case.status
@ -513,13 +515,17 @@ class Reporting: @@ -513,13 +515,17 @@ class Reporting:
example_instance = None
detailed_test_id = self.env.options.detailed_test_id
for instance in self.instances.values():
if instance.status not in ["passed", "filtered", "skipped"]:
if instance.status not in [TestInstanceStatus.PASS, TestInstanceStatus.FILTER, TestInstanceStatus.SKIP]:
cnt += 1
if cnt == 1:
logger.info("-+" * 40)
logger.info(log_txt)
logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {instance.status} ({instance.reason})")
status = instance.status
if self.env.options.report_summary is not None and \
status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
status = Fore.RED + status.upper() + Fore.RESET
logger.info(f"{cnt}) {instance.testsuite.name} on {instance.platform.name} {status} ({instance.reason})")
example_instance = instance
if cnt == count:
break
@ -545,7 +551,7 @@ class Reporting: @@ -545,7 +551,7 @@ class Reporting:
failed = 0
run = 0
for instance in self.instances.values():
if instance.status == "failed":
if instance.status == TestInstanceStatus.FAIL:
failed += 1
elif not ignore_unrecognized_sections and instance.metrics.get("unrecognized"):
logger.error("%sFAILED%s: %s has unrecognized binary sections: %s" %

75
scripts/pylib/twister/twisterlib/runner.py

@ -26,6 +26,7 @@ from domains import Domains @@ -26,6 +26,7 @@ from domains import Domains
from twisterlib.cmakecache import CMakeCache
from twisterlib.environment import canonical_zephyr_base
from twisterlib.error import BuildError, ConfigurationError
from twisterlib.statuses import TestCaseStatus, TestInstanceStatus
import elftools
from elftools.elf.elffile import ELFFile
@ -284,9 +285,9 @@ class CMake: @@ -284,9 +285,9 @@ class CMake:
msg = f"Finished building {self.source_dir} for {self.platform.name} in {duration:.2f} seconds"
logger.debug(msg)
self.instance.status = "passed"
self.instance.status = TestInstanceStatus.PASS
if not self.instance.run:
self.instance.add_missing_case_status("skipped", "Test was built only")
self.instance.add_missing_case_status(TestCaseStatus.SKIP, "Test was built only")
ret = {"returncode": p.returncode}
if out:
@ -308,15 +309,15 @@ class CMake: @@ -308,15 +309,15 @@ class CMake:
imgtool_overflow_found = re.findall(r"Error: Image size \(.*\) \+ trailer \(.*\) exceeds requested size", log_msg)
if overflow_found and not self.options.overflow_as_errors:
logger.debug("Test skipped due to {} Overflow".format(overflow_found[0]))
self.instance.status = "skipped"
self.instance.status = TestInstanceStatus.SKIP
self.instance.reason = "{} overflow".format(overflow_found[0])
change_skip_to_error_if_integration(self.options, self.instance)
elif imgtool_overflow_found and not self.options.overflow_as_errors:
self.instance.status = "skipped"
self.instance.status = TestInstanceStatus.SKIP
self.instance.reason = "imgtool overflow"
change_skip_to_error_if_integration(self.options, self.instance)
else:
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Build failure"
ret = {
@ -415,7 +416,7 @@ class CMake: @@ -415,7 +416,7 @@ class CMake:
'filter': filter_results
}
else:
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Cmake build failure"
for tc in self.instance.testcases:
@ -607,16 +608,16 @@ class ProjectBuilder(FilterBuilder): @@ -607,16 +608,16 @@ class ProjectBuilder(FilterBuilder):
if op == "filter":
ret = self.cmake(filter_stages=self.instance.filter_stages)
if self.instance.status in ["failed", "error"]:
if self.instance.status in [TestInstanceStatus.FAIL, TestInstanceStatus.ERROR]:
pipeline.put({"op": "report", "test": self.instance})
else:
# Here we check the dt/kconfig filter results coming from running cmake
if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
logger.debug("filtering %s" % self.instance.name)
self.instance.status = "filtered"
self.instance.status = TestInstanceStatus.FILTER
self.instance.reason = "runtime filter"
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped")
self.instance.add_missing_case_status(TestCaseStatus.SKIP)
pipeline.put({"op": "report", "test": self.instance})
else:
pipeline.put({"op": "cmake", "test": self.instance})
@ -624,20 +625,20 @@ class ProjectBuilder(FilterBuilder): @@ -624,20 +625,20 @@ class ProjectBuilder(FilterBuilder):
# The build process, call cmake and build with configured generator
elif op == "cmake":
ret = self.cmake()
if self.instance.status in ["failed", "error"]:
if self.instance.status in [TestInstanceStatus.FAIL, TestInstanceStatus.ERROR]:
pipeline.put({"op": "report", "test": self.instance})
elif self.options.cmake_only:
if self.instance.status is None:
self.instance.status = "passed"
if self.instance.status == TestInstanceStatus.NONE:
self.instance.status = TestInstanceStatus.PASS
pipeline.put({"op": "report", "test": self.instance})
else:
# Here we check the runtime filter results coming from running cmake
if self.instance.name in ret['filter'] and ret['filter'][self.instance.name]:
logger.debug("filtering %s" % self.instance.name)
self.instance.status = "filtered"
self.instance.status = TestInstanceStatus.FILTER
self.instance.reason = "runtime filter"
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped")
self.instance.add_missing_case_status(TestCaseStatus.SKIP)
pipeline.put({"op": "report", "test": self.instance})
else:
pipeline.put({"op": "build", "test": self.instance})
@ -646,18 +647,18 @@ class ProjectBuilder(FilterBuilder): @@ -646,18 +647,18 @@ class ProjectBuilder(FilterBuilder):
logger.debug("build test: %s" % self.instance.name)
ret = self.build()
if not ret:
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = "Build Failure"
pipeline.put({"op": "report", "test": self.instance})
else:
# Count skipped cases during build, for example
# due to ram/rom overflow.
if self.instance.status == "skipped":
if self.instance.status == TestInstanceStatus.SKIP:
results.skipped_runtime += 1
self.instance.add_missing_case_status("skipped", self.instance.reason)
self.instance.add_missing_case_status(TestCaseStatus.SKIP, self.instance.reason)
if ret.get('returncode', 1) > 0:
self.instance.add_missing_case_status("blocked", self.instance.reason)
self.instance.add_missing_case_status(TestCaseStatus.BLOCK, self.instance.reason)
pipeline.put({"op": "report", "test": self.instance})
else:
if self.instance.testsuite.harness in ['ztest', 'test']:
@ -667,7 +668,7 @@ class ProjectBuilder(FilterBuilder): @@ -667,7 +668,7 @@ class ProjectBuilder(FilterBuilder):
pipeline.put({"op": "gather_metrics", "test": self.instance})
except BuildError as e:
logger.error(str(e))
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = str(e)
pipeline.put({"op": "report", "test": self.instance})
else:
@ -713,7 +714,7 @@ class ProjectBuilder(FilterBuilder): @@ -713,7 +714,7 @@ class ProjectBuilder(FilterBuilder):
if not self.options.coverage:
if self.options.prep_artifacts_for_testing:
pipeline.put({"op": "cleanup", "mode": "device", "test": self.instance})
elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == "passed":
elif self.options.runtime_artifact_cleanup == "pass" and self.instance.status == TestInstanceStatus.PASS:
pipeline.put({"op": "cleanup", "mode": "passed", "test": self.instance})
elif self.options.runtime_artifact_cleanup == "all":
pipeline.put({"op": "cleanup", "mode": "all", "test": self.instance})
@ -966,8 +967,8 @@ class ProjectBuilder(FilterBuilder): @@ -966,8 +967,8 @@ class ProjectBuilder(FilterBuilder):
if results.iteration == 1:
results.cases += len(instance.testcases)
if instance.status in ["error", "failed"]:
if instance.status == "error":
if instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
if instance.status == TestInstanceStatus.ERROR:
results.error += 1
txt = " ERROR "
else:
@ -986,17 +987,17 @@ class ProjectBuilder(FilterBuilder): @@ -986,17 +987,17 @@ class ProjectBuilder(FilterBuilder):
instance.reason))
if not self.options.verbose:
self.log_info_file(self.options.inline_logs)
elif instance.status in ["skipped", "filtered"]:
elif instance.status in [TestInstanceStatus.SKIP, TestInstanceStatus.FILTER]:
status = Fore.YELLOW + "SKIPPED" + Fore.RESET
results.skipped_configs += 1
# test cases skipped at the test instance level
results.skipped_cases += len(instance.testsuite.testcases)
elif instance.status == "passed":
elif instance.status == TestInstanceStatus.PASS:
status = Fore.GREEN + "PASSED" + Fore.RESET
results.passed += 1
for case in instance.testcases:
# test cases skipped at the test case level
if case.status == 'skipped':
if case.status == TestCaseStatus.SKIP:
results.skipped_cases += 1
else:
logger.debug(f"Unknown status = {instance.status}")
@ -1005,7 +1006,7 @@ class ProjectBuilder(FilterBuilder): @@ -1005,7 +1006,7 @@ class ProjectBuilder(FilterBuilder):
if self.options.verbose:
if self.options.cmake_only:
more_info = "cmake"
elif instance.status in ["skipped", "filtered"]:
elif instance.status in [TestInstanceStatus.SKIP, TestInstanceStatus.FILTER]:
more_info = instance.reason
else:
if instance.handler.ready and instance.run:
@ -1018,7 +1019,7 @@ class ProjectBuilder(FilterBuilder): @@ -1018,7 +1019,7 @@ class ProjectBuilder(FilterBuilder):
else:
more_info = "build"
if ( instance.status in ["error", "failed", "timeout", "flash_error"]
if ( instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL, TestInstanceStatus.TIMEOUT, TestInstanceStatus.FLASH]
and hasattr(self.instance.handler, 'seed')
and self.instance.handler.seed is not None ):
more_info += "/seed: " + str(self.options.seed)
@ -1026,7 +1027,7 @@ class ProjectBuilder(FilterBuilder): @@ -1026,7 +1027,7 @@ class ProjectBuilder(FilterBuilder):
results.done, total_tests_width, total_to_do , instance.platform.name,
instance.testsuite.name, status, more_info))
if instance.status in ["error", "failed", "timeout"]:
if instance.status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL, TestInstanceStatus.TIMEOUT]:
self.log_info_file(self.options.inline_logs)
else:
completed_perc = 0
@ -1109,7 +1110,7 @@ class ProjectBuilder(FilterBuilder): @@ -1109,7 +1110,7 @@ class ProjectBuilder(FilterBuilder):
harness.instance = self.instance
harness.build()
except ConfigurationError as error:
self.instance.status = "error"
self.instance.status = TestInstanceStatus.ERROR
self.instance.reason = str(error)
logger.error(self.instance.reason)
return
@ -1121,7 +1122,7 @@ class ProjectBuilder(FilterBuilder): @@ -1121,7 +1122,7 @@ class ProjectBuilder(FilterBuilder):
if instance.handler.ready:
logger.debug(f"Reset instance status from '{instance.status}' to None before run.")
instance.status = None
instance.status = TestInstanceStatus.NONE
if instance.handler.type_str == "device":
instance.handler.duts = self.duts
@ -1139,7 +1140,7 @@ class ProjectBuilder(FilterBuilder): @@ -1139,7 +1140,7 @@ class ProjectBuilder(FilterBuilder):
try:
harness.configure(instance)
except ConfigurationError as error:
instance.status = "error"
instance.status = TestInstanceStatus.ERROR
instance.reason = str(error)
logger.error(instance.reason)
return
@ -1167,7 +1168,7 @@ class ProjectBuilder(FilterBuilder): @@ -1167,7 +1168,7 @@ class ProjectBuilder(FilterBuilder):
@staticmethod
def calc_size(instance: TestInstance, from_buildlog: bool):
if instance.status not in ["error", "failed", "skipped"]:
if instance.status not in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL, TestInstanceStatus.SKIP]:
if not instance.platform.type in ["native", "qemu", "unit"]:
generate_warning = bool(instance.platform.type == "mcu")
size_calc = instance.calculate_sizes(from_buildlog=from_buildlog, generate_warning=generate_warning)
@ -1278,12 +1279,12 @@ class TwisterRunner: @@ -1278,12 +1279,12 @@ class TwisterRunner:
the static filter stats. So need to prepare them before pipline starts.
'''
for instance in self.instances.values():
if instance.status == 'filtered' and not instance.reason == 'runtime filter':
if instance.status == TestInstanceStatus.FILTER and not instance.reason == 'runtime filter':
self.results.skipped_filter += 1
self.results.skipped_configs += 1
self.results.skipped_cases += len(instance.testsuite.testcases)
self.results.cases += len(instance.testsuite.testcases)
elif instance.status == 'error':
elif instance.status == TestInstanceStatus.ERROR:
self.results.error += 1
def show_brief(self):
@ -1299,15 +1300,15 @@ class TwisterRunner: @@ -1299,15 +1300,15 @@ class TwisterRunner:
if build_only:
instance.run = False
no_retry_statuses = ['passed', 'skipped', 'filtered']
no_retry_statuses = [TestInstanceStatus.PASS, TestInstanceStatus.SKIP, TestInstanceStatus.FILTER]
if not retry_build_errors:
no_retry_statuses.append("error")
if instance.status not in no_retry_statuses:
logger.debug(f"adding {instance.name}")
if instance.status:
if instance.status != TestInstanceStatus.NONE:
instance.retries += 1
instance.status = None
instance.status = TestInstanceStatus.NONE
# Check if cmake package_helper script can be run in advance.
instance.filter_stages = []

86
scripts/pylib/twister/twisterlib/statuses.py

@ -0,0 +1,86 @@ @@ -0,0 +1,86 @@
#!/usr/bin/env python3
# Copyright (c) 2024 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
"""
Status classes to be used instead of str statuses.
"""
from enum import Enum
class TestInstanceStatus(str, Enum):
def __str__(self):
return str(self.value)
NONE = None # to preserve old functionality
ERROR = 'error'
FAIL = 'failed'
FILTER = 'filtered'
FLASH = 'flash_error'
PASS = 'passed'
SKIP = 'skipped'
TIMEOUT = 'timeout'
# Possible direct assignments:
# * TestSuiteStatus <- TestInstanceStatus
class TestSuiteStatus(str, Enum):
def __str__(self):
return str(self.value)
NONE = None # to preserve old functionality
FILTER = 'filtered'
PASS = 'passed'
SKIP = 'skipped'
# Possible direct assignments:
# * TestCaseStatus <- TestInstanceStatus
class TestCaseStatus(str, Enum):
def __str__(self):
return str(self.value)
NONE = None # to preserve old functionality
BLOCK = 'blocked'
ERROR = 'error'
FAIL = 'failed'
FILTER = 'filtered'
PASS = 'passed'
SKIP = 'skipped'
STARTED = 'started'
# Possible direct assignments:
# * OutputStatus <- HarnessStatus
class OutputStatus(str, Enum):
def __str__(self):
return str(self.value)
NONE = None # to preserve old functionality
BYTE = 'unexpected byte'
EOF = 'unexpected eof'
FAIL = 'failed'
TIMEOUT = 'timeout'
# Possible direct assignments:
# * TestInstanceStatus <- HarnessStatus
class HarnessStatus(str, Enum):
def __str__(self):
return str(self.value)
NONE = None # to preserve old functionality
ERROR = 'error'
FAIL = 'failed'
PASS = 'passed'
SKIP = 'skipped'
class ReportStatus(str, Enum):
def __str__(self):
return str(self.value)
ERROR = 'error'
FAIL = 'failure' # Note the difference!
SKIP = 'skipped'

11
scripts/pylib/twister/twisterlib/testinstance.py

@ -16,6 +16,7 @@ from twisterlib.testsuite import TestCase, TestSuite @@ -16,6 +16,7 @@ from twisterlib.testsuite import TestCase, TestSuite
from twisterlib.platform import Platform
from twisterlib.error import BuildError
from twisterlib.size_calc import SizeCalculator
from twisterlib.statuses import TestCaseStatus, TestInstanceStatus
from twisterlib.handlers import (
Handler,
SimulationHandler,
@ -46,7 +47,7 @@ class TestInstance: @@ -46,7 +47,7 @@ class TestInstance:
self.testsuite: TestSuite = testsuite
self.platform: Platform = platform
self.status = None
self.status = TestInstanceStatus.NONE
self.reason = "Unknown"
self.metrics = dict()
self.handler = None
@ -94,7 +95,7 @@ class TestInstance: @@ -94,7 +95,7 @@ class TestInstance:
def add_filter(self, reason, filter_type):
self.filters.append({'type': filter_type, 'reason': reason })
self.status = "filtered"
self.status = TestInstanceStatus.FILTER
self.reason = reason
self.filter_type = filter_type
@ -124,9 +125,9 @@ class TestInstance: @@ -124,9 +125,9 @@ class TestInstance:
def add_missing_case_status(self, status, reason=None):
for case in self.testcases:
if case.status == 'started':
case.status = "failed"
elif not case.status:
if case.status == TestCaseStatus.STARTED:
case.status = TestCaseStatus.FAIL
elif case.status == TestCaseStatus.NONE:
case.status = status
if reason:
case.reason = reason

37
scripts/pylib/twister/twisterlib/testplan.py

@ -33,6 +33,7 @@ from twisterlib.testsuite import TestSuite, scan_testsuite_path @@ -33,6 +33,7 @@ from twisterlib.testsuite import TestSuite, scan_testsuite_path
from twisterlib.error import TwisterRuntimeError
from twisterlib.platform import Platform
from twisterlib.config_parser import TwisterConfigParser
from twisterlib.statuses import TestCaseStatus, TestInstanceStatus, TestSuiteStatus
from twisterlib.testinstance import TestInstance
from twisterlib.quarantine import Quarantine
@ -286,7 +287,7 @@ class TestPlan: @@ -286,7 +287,7 @@ class TestPlan:
# at runtime, ignore the cases we already know going to be skipped.
# This fixes an issue where some sets would get majority of skips and
# basically run nothing beside filtering.
to_run = {k : v for k,v in self.instances.items() if v.status is None}
to_run = {k : v for k,v in self.instances.items() if v.status == TestInstanceStatus.NONE}
total = len(to_run)
per_set = int(total / sets)
num_extra_sets = total - (per_set * sets)
@ -303,8 +304,8 @@ class TestPlan: @@ -303,8 +304,8 @@ class TestPlan:
end = start + per_set
sliced_instances = islice(to_run.items(), start, end)
skipped = {k : v for k,v in self.instances.items() if v.status == 'skipped'}
errors = {k : v for k,v in self.instances.items() if v.status == 'error'}
skipped = {k : v for k,v in self.instances.items() if v.status == TestInstanceStatus.SKIP}
errors = {k : v for k,v in self.instances.items() if v.status == TestInstanceStatus.ERROR}
self.instances = OrderedDict(sliced_instances)
if subset == 1:
# add all pre-filtered tests that are skipped or got error status
@ -624,23 +625,21 @@ class TestPlan: @@ -624,23 +625,21 @@ class TestPlan:
instance.metrics['available_ram'] = ts.get('available_ram', 0)
instance.metrics['available_rom'] = ts.get('available_rom', 0)
status = ts.get('status', None)
status = ts.get('status', TestSuiteStatus.NONE)
reason = ts.get("reason", "Unknown")
if status in ["error", "failed"]:
if status in [TestInstanceStatus.ERROR, TestInstanceStatus.FAIL]:
if self.options.report_summary is not None:
if status == "error": status = "ERROR"
elif status == "failed": status = "FAILED"
instance.status = Fore.RED + status + Fore.RESET
instance.status = status
instance.reason = reason
self.instance_fail_count += 1
else:
instance.status = None
instance.status = TestInstanceStatus.NONE
instance.reason = None
instance.retries += 1
# test marked as passed (built only) but can run when
# --test-only is used. Reset status to capture new results.
elif status == 'passed' and instance.run and self.options.test_only:
instance.status = None
elif status == TestInstanceStatus.PASS and instance.run and self.options.test_only:
instance.status = TestInstanceStatus.NONE
instance.reason = None
else:
instance.status = status
@ -650,13 +649,13 @@ class TestPlan: @@ -650,13 +649,13 @@ class TestPlan:
for tc in ts.get('testcases', []):
identifier = tc['identifier']
tc_status = tc.get('status', None)
tc_status = tc.get('status', TestCaseStatus.NONE)
tc_reason = None
# we set reason only if status is valid, it might have been
# reset above...
if instance.status:
if instance.status != TestInstanceStatus.NONE:
tc_reason = tc.get('reason')
if tc_status:
if tc_status != TestCaseStatus.NONE:
case = instance.set_case_status_by_name(identifier, tc_status, tc_reason)
case.duration = tc.get('execution_time', 0)
if tc.get('log'):
@ -904,7 +903,7 @@ class TestPlan: @@ -904,7 +903,7 @@ class TestPlan:
for this_snippet in snippet_args['snippets']:
if this_snippet not in found_snippets:
logger.error(f"Can't find snippet '%s' for test '%s'", this_snippet, ts.name)
instance.status = "error"
instance.status = TestInstanceStatus.ERROR
instance.reason = f"Snippet {this_snippet} not found"
missing_snippet = True
break
@ -1015,14 +1014,14 @@ class TestPlan: @@ -1015,14 +1014,14 @@ class TestPlan:
self.selected_platforms = set(p.platform.name for p in self.instances.values())
filtered_instances = list(filter(lambda item: item.status == "filtered", self.instances.values()))
filtered_instances = list(filter(lambda item: item.status == TestInstanceStatus.FILTER, self.instances.values()))
for filtered_instance in filtered_instances:
change_skip_to_error_if_integration(self.options, filtered_instance)
filtered_instance.add_missing_case_status(filtered_instance.status)
self.filtered_platforms = set(p.platform.name for p in self.instances.values()
if p.status != "skipped" )
if p.status != TestInstanceStatus.SKIP )
def add_instances(self, instance_list):
for instance in instance_list:
@ -1063,7 +1062,7 @@ class TestPlan: @@ -1063,7 +1062,7 @@ class TestPlan:
os.mkdir(links_dir_path)
for instance in self.instances.values():
if instance.status != "skipped":
if instance.status != TestInstanceStatus.SKIP:
self._create_build_dir_link(links_dir_path, instance)
def _create_build_dir_link(self, links_dir_path, instance):
@ -1103,5 +1102,5 @@ def change_skip_to_error_if_integration(options, instance): @@ -1103,5 +1102,5 @@ def change_skip_to_error_if_integration(options, instance):
Filters.QUARANTINE}
if filters.intersection(ignore_filters):
return
instance.status = "error"
instance.status = TestInstanceStatus.ERROR
instance.reason += " but is one of the integration platforms"

4
scripts/pylib/twister/twisterlib/testsuite.py

@ -11,9 +11,11 @@ import contextlib @@ -11,9 +11,11 @@ import contextlib
import mmap
import glob
from typing import List
from twisterlib.mixins import DisablePyTestCollectionMixin
from twisterlib.environment import canonical_zephyr_base
from twisterlib.error import TwisterException, TwisterRuntimeError
from twisterlib.statuses import TestCaseStatus
logger = logging.getLogger('twister')
logger.setLevel(logging.DEBUG)
@ -357,7 +359,7 @@ class TestCase(DisablePyTestCollectionMixin): @@ -357,7 +359,7 @@ class TestCase(DisablePyTestCollectionMixin):
def __init__(self, name=None, testsuite=None):
self.duration = 0
self.name = name
self.status = None
self.status = TestCaseStatus.NONE
self.reason = None
self.testsuite = testsuite
self.output = ""

3
scripts/pylib/twister/twisterlib/twister_main.py

@ -12,6 +12,7 @@ import time @@ -12,6 +12,7 @@ import time
from colorama import Fore
from twisterlib.statuses import TestInstanceStatus
from twisterlib.testplan import TestPlan
from twisterlib.reports import Reporting
from twisterlib.hardwaremap import HardwareMap
@ -141,7 +142,7 @@ def main(options, default_options): @@ -141,7 +142,7 @@ def main(options, default_options):
# command line
for i in tplan.instances.values():
if i.status == "filtered":
if i.status == TestInstanceStatus.FILTER:
if options.platform and i.platform.name not in options.platform:
continue
logger.debug(

121
scripts/tests/twister/test_handlers.py

@ -25,6 +25,7 @@ import twisterlib.harness @@ -25,6 +25,7 @@ import twisterlib.harness
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
from twisterlib.error import TwisterException
from twisterlib.statuses import HarnessStatus, OutputStatus, TestCaseStatus, TestInstanceStatus
from twisterlib.handlers import (
Handler,
BinaryHandler,
@ -55,7 +56,7 @@ def mocked_instance(tmp_path): @@ -55,7 +56,7 @@ def mocked_instance(tmp_path):
return_value=2
)
instance.status = None
instance.status = TestInstanceStatus.NONE
instance.reason = 'Unknown'
return instance
@ -137,10 +138,10 @@ def test_handler_final_handle_actions(mocked_instance): @@ -137,10 +138,10 @@ def test_handler_final_handle_actions(mocked_instance):
handler._final_handle_actions(harness, handler_time)
assert handler.instance.status == 'failed'
assert handler.instance.status == TestInstanceStatus.FAIL
assert handler.instance.execution_time == handler_time
assert handler.instance.reason == 'RunID mismatch'
assert all(testcase.status == 'failed' for \
assert all(testcase.status == TestCaseStatus.FAIL for \
testcase in handler.instance.testcases)
handler.instance.reason = 'This reason shan\'t be changed.'
@ -170,7 +171,7 @@ def test_handler_verify_ztest_suite_name( @@ -170,7 +171,7 @@ def test_handler_verify_ztest_suite_name(
instance = mocked_instance
type(instance.testsuite).ztest_suite_names = ['dummy_testsuite_name']
harness_state = 'passed'
harness_state = HarnessStatus.PASS
handler_time = mock.Mock()
@ -201,11 +202,11 @@ def test_handler_missing_suite_name(mocked_instance): @@ -201,11 +202,11 @@ def test_handler_missing_suite_name(mocked_instance):
handler._missing_suite_name(expected_suite_names, handler_time)
assert handler.instance.status == 'failed'
assert handler.instance.status == TestInstanceStatus.FAIL
assert handler.instance.execution_time == handler_time
assert handler.instance.reason == 'Testsuite mismatch'
assert all(
testcase.status == 'failed' for testcase in handler.instance.testcases
testcase.status == TestCaseStatus.FAIL for testcase in handler.instance.testcases
)
@ -294,7 +295,7 @@ def test_binaryhandler_try_kill_process_by_pid(mocked_instance): @@ -294,7 +295,7 @@ def test_binaryhandler_try_kill_process_by_pid(mocked_instance):
TESTDATA_3 = [
(
[b'This\\r\\n', b'is\r', b'a short', b'file.'],
mock.Mock(state=False, capture_coverage=False),
mock.Mock(state=HarnessStatus.NONE, capture_coverage=False),
[
mock.call('This\\r\\n'),
mock.call('is\r'),
@ -312,7 +313,7 @@ TESTDATA_3 = [ @@ -312,7 +313,7 @@ TESTDATA_3 = [
),
(
[b'Too much.'] * 120, # Should be more than the timeout
mock.Mock(state=False, capture_coverage=False),
mock.Mock(state=HarnessStatus.PASS, capture_coverage=False),
None,
None,
True,
@ -320,7 +321,7 @@ TESTDATA_3 = [ @@ -320,7 +321,7 @@ TESTDATA_3 = [
),
(
[b'Too much.'] * 120, # Should be more than the timeout
mock.Mock(state=True, capture_coverage=False),
mock.Mock(state=HarnessStatus.PASS, capture_coverage=False),
None,
None,
True,
@ -328,7 +329,7 @@ TESTDATA_3 = [ @@ -328,7 +329,7 @@ TESTDATA_3 = [
),
(
[b'Too much.'] * 120, # Should be more than the timeout
mock.Mock(state=True, capture_coverage=True),
mock.Mock(state=HarnessStatus.PASS, capture_coverage=True),
None,
None,
False,
@ -505,18 +506,18 @@ def test_binaryhandler_create_env( @@ -505,18 +506,18 @@ def test_binaryhandler_create_env(
TESTDATA_6 = [
(None, False, 2, True, 'failed', 'Valgrind error', False),
(None, False, 1, False, 'failed', 'Failed', False),
('failed', False, 0, False, 'failed', 'Failed', False),
(HarnessStatus.NONE, False, 2, True, TestInstanceStatus.FAIL, 'Valgrind error', False),
(HarnessStatus.NONE, False, 1, False, TestInstanceStatus.FAIL, 'Failed', False),
(HarnessStatus.FAIL, False, 0, False, TestInstanceStatus.FAIL, 'Failed', False),
('success', False, 0, False, 'success', 'Unknown', False),
(None, True, 1, True, 'failed', 'Timeout', True),
(HarnessStatus.NONE, True, 1, True, TestInstanceStatus.FAIL, 'Timeout', True),
]
@pytest.mark.parametrize(
'harness_state, terminated, returncode, enable_valgrind,' \
' expected_status, expected_reason, do_add_missing',
TESTDATA_6,
ids=['valgrind error', 'failed', 'harness failed', 'success', 'no state']
ids=['valgrind error', 'failed', 'harness failed', 'custom success', 'no state']
)
def test_binaryhandler_update_instance_info(
mocked_instance,
@ -544,7 +545,7 @@ def test_binaryhandler_update_instance_info( @@ -544,7 +545,7 @@ def test_binaryhandler_update_instance_info(
assert handler.instance.reason == expected_reason
if do_add_missing:
missing_mock.assert_called_once_with('blocked', expected_reason)
missing_mock.assert_called_once_with(TestCaseStatus.BLOCK, expected_reason)
TESTDATA_7 = [
@ -695,8 +696,8 @@ def test_devicehandler_monitor_serial( @@ -695,8 +696,8 @@ def test_devicehandler_monitor_serial(
is_set_iter = [False] * haltless_count + [True] \
if end_by_halt else iter(lambda: False, True)
state_iter = [False] * stateless_count + [True] \
if end_by_state else iter(lambda: False, True)
state_iter = [HarnessStatus.NONE] * stateless_count + [HarnessStatus.PASS] \
if end_by_state else iter(lambda: HarnessStatus.NONE, HarnessStatus.PASS)
halt_event = mock.Mock(is_set=mock.Mock(side_effect=is_set_iter))
ser = mock.Mock(
@ -943,7 +944,7 @@ def test_devicehandler_get_hardware( @@ -943,7 +944,7 @@ def test_devicehandler_get_hardware(
if raise_exception:
assert 'dummy message' in caplog.text.lower()
assert mocked_instance.status == 'failed'
assert mocked_instance.status == TestInstanceStatus.FAIL
assert mocked_instance.reason == 'dummy message'
else:
assert hardware == expected_hardware
@ -1083,17 +1084,17 @@ def test_devicehandler_create_command( @@ -1083,17 +1084,17 @@ def test_devicehandler_create_command(
TESTDATA_14 = [
('success', False, 'success', 'Unknown', False),
('failed', False, 'failed', 'Failed', True),
('error', False, 'error', 'Unknown', True),
(None, True, None, 'Unknown', False),
(None, False, 'failed', 'Timeout', True),
(HarnessStatus.FAIL, False, TestInstanceStatus.FAIL, 'Failed', True),
(HarnessStatus.ERROR, False, TestInstanceStatus.ERROR, 'Unknown', True),
(HarnessStatus.NONE, True, TestInstanceStatus.NONE, 'Unknown', False),
(HarnessStatus.NONE, False, TestInstanceStatus.FAIL, 'Timeout', True),
]
@pytest.mark.parametrize(
'harness_state, flash_error,' \
' expected_status, expected_reason, do_add_missing',
TESTDATA_14,
ids=['success', 'failed', 'error', 'flash error', 'no status']
ids=['custom success', 'failed', 'error', 'flash error', 'no status']
)
def test_devicehandler_update_instance_info(
mocked_instance,
@ -1186,7 +1187,7 @@ def test_devicehandler_create_serial_connection( @@ -1186,7 +1187,7 @@ def test_devicehandler_create_serial_connection(
assert result is not None
if expected_exception:
assert handler.instance.status == 'failed'
assert handler.instance.status == TestInstanceStatus.FAIL
assert handler.instance.reason == 'Serial Device Error'
missing_mock.assert_called_once_with('blocked', 'Serial Device Error')
@ -1241,20 +1242,20 @@ def test_devicehandler_get_serial_device( @@ -1241,20 +1242,20 @@ def test_devicehandler_get_serial_device(
TESTDATA_17 = [
(False, False, False, False, None, False, False,
None, None, []),
TestInstanceStatus.NONE, None, []),
(True, True, False, False, None, False, False,
None, None, []),
TestInstanceStatus.NONE, None, []),
(True, False, True, False, None, False, False,
'error', 'Device issue (Flash error)', []),
TestInstanceStatus.ERROR, 'Device issue (Flash error)', []),
(True, False, False, True, None, False, False,
'error', 'Device issue (Timeout)', ['Flash operation timed out.']),
TestInstanceStatus.ERROR, 'Device issue (Timeout)', ['Flash operation timed out.']),
(True, False, False, False, 1, False, False,
'error', 'Device issue (Flash error?)', []),
TestInstanceStatus.ERROR, 'Device issue (Flash error?)', []),
(True, False, False, False, 0, True, False,
None, None, ['Timed out while monitoring serial output on IPName']),
TestInstanceStatus.NONE, None, ['Timed out while monitoring serial output on IPName']),
(True, False, False, False, 0, False, True,
None, None, ["Terminating serial-pty:'Serial PTY'",
"Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]),
TestInstanceStatus.NONE, None, ["Terminating serial-pty:'Serial PTY'",
"Terminated serial-pty:'Serial PTY', stdout:'', stderr:''"]),
]
@pytest.mark.parametrize(
@ -1547,7 +1548,7 @@ TESTDATA_21 = [ @@ -1547,7 +1548,7 @@ TESTDATA_21 = [
None,
'good dummy state',
False,
None,
TestInstanceStatus.NONE,
None,
False
),
@ -1557,7 +1558,7 @@ TESTDATA_21 = [ @@ -1557,7 +1558,7 @@ TESTDATA_21 = [
None,
'good dummy state',
False,
None,
TestInstanceStatus.NONE,
None,
False
),
@ -1565,9 +1566,9 @@ TESTDATA_21 = [ @@ -1565,9 +1566,9 @@ TESTDATA_21 = [
0,
False,
None,
None,
HarnessStatus.NONE,
True,
'failed',
TestInstanceStatus.FAIL,
'Timeout',
True
),
@ -1575,9 +1576,9 @@ TESTDATA_21 = [ @@ -1575,9 +1576,9 @@ TESTDATA_21 = [
1,
False,
None,
None,
HarnessStatus.NONE,
False,
'failed',
TestInstanceStatus.FAIL,
'Exited with 1',
True
),
@ -1587,7 +1588,7 @@ TESTDATA_21 = [ @@ -1587,7 +1588,7 @@ TESTDATA_21 = [
'preexisting reason',
'good dummy state',
False,
'failed',
TestInstanceStatus.FAIL,
'preexisting reason',
True
),
@ -1625,7 +1626,7 @@ def test_qemuhandler_update_instance_info( @@ -1625,7 +1626,7 @@ def test_qemuhandler_update_instance_info(
if expected_called_missing_case:
mocked_instance.add_missing_case_status.assert_called_once_with(
'blocked'
TestCaseStatus.BLOCK
)
@ -1731,11 +1732,11 @@ def test_qemuhandler_thread_close_files(is_pid, is_lookup_error): @@ -1731,11 +1732,11 @@ def test_qemuhandler_thread_close_files(is_pid, is_lookup_error):
TESTDATA_24 = [
('failed', 'timeout', 'failed', 'timeout'),
('failed', 'Execution error', 'failed', 'Execution error'),
('failed', 'unexpected eof', 'failed', 'unexpected eof'),
('failed', 'unexpected byte', 'failed', 'unexpected byte'),
(None, None, None, 'Unknown'),
(OutputStatus.FAIL, 'timeout', TestInstanceStatus.FAIL, 'timeout'),
(OutputStatus.FAIL, 'Execution error', TestInstanceStatus.FAIL, 'Execution error'),
(OutputStatus.FAIL, 'unexpected eof', TestInstanceStatus.FAIL, 'unexpected eof'),
(OutputStatus.FAIL, 'unexpected byte', TestInstanceStatus.FAIL, 'unexpected byte'),
(OutputStatus.NONE, None, TestInstanceStatus.NONE, 'Unknown'),
]
@pytest.mark.parametrize(
@ -1766,10 +1767,10 @@ TESTDATA_25 = [ @@ -1766,10 +1767,10 @@ TESTDATA_25 = [
('1\n' * 60).encode('utf-8'),
60,
1,
[None] * 60 + ['success'] * 6,
[HarnessStatus.NONE] * 60 + ['success'] * 6,
1000,
False,
'failed',
OutputStatus.FAIL,
'timeout',
[mock.call('1\n'), mock.call('1\n')]
),
@ -1777,10 +1778,10 @@ TESTDATA_25 = [ @@ -1777,10 +1778,10 @@ TESTDATA_25 = [
('1\n' * 60).encode('utf-8'),
60,
-1,
[None] * 60 + ['success'] * 30,
[HarnessStatus.NONE] * 60 + ['success'] * 30,
100,
False,
'failed',
OutputStatus.FAIL,
None,
[mock.call('1\n'), mock.call('1\n')]
),
@ -1791,7 +1792,7 @@ TESTDATA_25 = [ @@ -1791,7 +1792,7 @@ TESTDATA_25 = [
['success'] * 3,
100,
False,
'failed',
OutputStatus.FAIL,
'unexpected eof',
[]
),
@ -1802,7 +1803,7 @@ TESTDATA_25 = [ @@ -1802,7 +1803,7 @@ TESTDATA_25 = [
['success'] * 3,
100,
False,
'failed',
OutputStatus.FAIL,
'unexpected byte',
[]
),
@ -1810,7 +1811,7 @@ TESTDATA_25 = [ @@ -1810,7 +1811,7 @@ TESTDATA_25 = [
'1\n2\n3\n4\n5\n'.encode('utf-8'),
600,
1,
[None] * 3 + ['success'] * 7,
[HarnessStatus.NONE] * 3 + ['success'] * 7,
100,
False,
'success',
@ -1821,10 +1822,10 @@ TESTDATA_25 = [ @@ -1821,10 +1822,10 @@ TESTDATA_25 = [
'1\n2\n3\n4\n5\n'.encode('utf-8'),
600,
0,
[None] * 3 + ['success'] * 7,
[HarnessStatus.NONE] * 3 + ['success'] * 7,
100,
False,
'failed',
OutputStatus.FAIL,
'timeout',
[mock.call('1\n'), mock.call('2\n')]
),
@ -1832,7 +1833,7 @@ TESTDATA_25 = [ @@ -1832,7 +1833,7 @@ TESTDATA_25 = [
'1\n2\n3\n4\n5\n'.encode('utf-8'),
60,
1,
[None] * 3 + ['success'] * 7,
[HarnessStatus.NONE] * 3 + ['success'] * 7,
(n for n in [100, 100, 10000]),
True,
'success',
@ -1939,6 +1940,8 @@ def test_qemuhandler_thread( @@ -1939,6 +1940,8 @@ def test_qemuhandler_thread(
handler.ignore_unexpected_eof
)
print(mock_thread_update_instance_info.call_args_list)
mock_thread_update_instance_info.assert_called_once_with(
handler,
mock.ANY,
@ -1950,11 +1953,11 @@ def test_qemuhandler_thread( @@ -1950,11 +1953,11 @@ def test_qemuhandler_thread(
TESTDATA_26 = [
(True, False, None, True,
(True, False, HarnessStatus.NONE, True,
['No timeout, return code from QEMU (1): 1',
'return code from QEMU (1): 1']),
(False, True, 'passed', True, ['return code from QEMU (1): 0']),
(False, True, 'failed', False, ['return code from QEMU (None): 1']),
(False, True, HarnessStatus.PASS, True, ['return code from QEMU (1): 0']),
(False, True, HarnessStatus.FAIL, False, ['return code from QEMU (None): 1']),
]
@pytest.mark.parametrize(

132
scripts/tests/twister/test_harness.py

@ -13,18 +13,24 @@ import pytest @@ -13,18 +13,24 @@ import pytest
import re
import logging as logger
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
#ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
from conftest import ZEPHYR_BASE
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
from twisterlib.harness import Gtest, Bsim
from twisterlib.harness import Harness
from twisterlib.harness import Robot
from twisterlib.harness import Test
#from scripts.pylib.twister.twisterlib.statuses import HarnessStatus, TestCaseStatus, TestInstanceStatus
from twisterlib.harness import (
Bsim,
Console,
Gtest,
Harness,
HarnessImporter,
Pytest,
PytestHarnessException,
Robot,
Test
)
from twisterlib.statuses import HarnessStatus, TestCaseStatus, TestInstanceStatus
from twisterlib.testinstance import TestInstance
from twisterlib.harness import Console
from twisterlib.harness import Pytest
from twisterlib.harness import PytestHarnessException
from twisterlib.harness import HarnessImporter
GTEST_START_STATE = " RUN "
GTEST_PASS_STATE = " OK "
@ -87,13 +93,14 @@ def test_harness_parse_record(lines, pattern, expected_records, as_json): @@ -87,13 +93,14 @@ def test_harness_parse_record(lines, pattern, expected_records, as_json):
assert harness.recording == expected_records
TEST_DATA_1 = [('RunID: 12345', False, False, False, None, True),
('PROJECT EXECUTION SUCCESSFUL', False, False, False, 'passed', False),
('PROJECT EXECUTION SUCCESSFUL', True, False, False, 'failed', False),
('PROJECT EXECUTION FAILED', False, False, False, 'failed', False),
('ZEPHYR FATAL ERROR', False, True, False, None, False),
('GCOV_COVERAGE_DUMP_START', None, None, True, None, False),
('GCOV_COVERAGE_DUMP_END', None, None, False, None, False),]
TEST_DATA_1 = [('RunID: 12345', False, False, False, HarnessStatus.NONE, True),
('PROJECT EXECUTION SUCCESSFUL', False, False, False, HarnessStatus.PASS, False),
('PROJECT EXECUTION SUCCESSFUL', True, False, False, HarnessStatus.FAIL, False),
('PROJECT EXECUTION FAILED', False, False, False, HarnessStatus.FAIL, False),
('ZEPHYR FATAL ERROR', False, True, False, HarnessStatus.NONE, False),
('GCOV_COVERAGE_DUMP_START', None, None, True, HarnessStatus.NONE, False),
('GCOV_COVERAGE_DUMP_END', None, None, False, HarnessStatus.NONE, False),]
@pytest.mark.parametrize(
"line, fault, fail_on_fault, cap_cov, exp_stat, exp_id",
TEST_DATA_1,
@ -103,7 +110,7 @@ def test_harness_process_test(line, fault, fail_on_fault, cap_cov, exp_stat, exp @@ -103,7 +110,7 @@ def test_harness_process_test(line, fault, fail_on_fault, cap_cov, exp_stat, exp
#Arrange
harness = Harness()
harness.run_id = 12345
harness.state = None
harness.state = HarnessStatus.NONE
harness.fault = fault
harness.fail_on_fault = fail_on_fault
mock.patch.object(Harness, 'parse_record', return_value=None)
@ -173,11 +180,14 @@ def test_robot_handle(tmp_path): @@ -173,11 +180,14 @@ def test_robot_handle(tmp_path):
tc = instance.get_case_or_create('test_case_1')
#Assert
assert instance.state == "passed"
assert tc.status == "passed"
assert instance.state == TestInstanceStatus.PASS
assert tc.status == TestCaseStatus.PASS
TEST_DATA_2 = [("", 0, "passed"), ("Robot test failure: sourcedir for mock_platform", 1, "failed"),]
TEST_DATA_2 = [
("", 0, TestInstanceStatus.PASS),
("Robot test failure: sourcedir for mock_platform", 1, TestInstanceStatus.FAIL),
]
@pytest.mark.parametrize(
"exp_out, returncode, expected_status",
TEST_DATA_2,
@ -272,13 +282,13 @@ def test_console_configure(tmp_path, type, num_patterns): @@ -272,13 +282,13 @@ def test_console_configure(tmp_path, type, num_patterns):
assert console.pattern.pattern == 'pattern1'
TEST_DATA_4 = [("one_line", True, "passed", "line", False, False),
("multi_line", True, "passed", "line", False, False),
("multi_line", False, "passed", "line", False, False),
("invalid_type", False, None, "line", False, False),
("invalid_type", False, None, "ERROR", True, False),
("invalid_type", False, None, "COVERAGE_START", False, True),
("invalid_type", False, None, "COVERAGE_END", False, False)]
TEST_DATA_4 = [("one_line", True, HarnessStatus.PASS, "line", False, False),
("multi_line", True, HarnessStatus.PASS, "line", False, False),
("multi_line", False, HarnessStatus.PASS, "line", False, False),
("invalid_type", False, HarnessStatus.NONE, "line", False, False),
("invalid_type", False, HarnessStatus.NONE, "ERROR", True, False),
("invalid_type", False, HarnessStatus.NONE, "COVERAGE_START", False, True),
("invalid_type", False, HarnessStatus.NONE, "COVERAGE_END", False, False)]
@pytest.mark.parametrize(
"line_type, ordered_val, exp_state, line, exp_fault, exp_capture",
TEST_DATA_4,
@ -304,7 +314,7 @@ def test_console_handle(tmp_path, line_type, ordered_val, exp_state, line, exp_f @@ -304,7 +314,7 @@ def test_console_handle(tmp_path, line_type, ordered_val, exp_state, line, exp_f
console.patterns = [re.compile("pattern1"), re.compile("pattern2")]
console.pattern = re.compile("pattern")
console.patterns_expected = 0
console.state = None
console.state = HarnessStatus.NONE
console.fail_on_fault = True
console.FAULT = "ERROR"
console.GCOV_START = "COVERAGE_START"
@ -461,7 +471,7 @@ def test_pytest_run(tmp_path, caplog): @@ -461,7 +471,7 @@ def test_pytest_run(tmp_path, caplog):
# Act
test_obj.pytest_run(timeout)
# Assert
assert test_obj.state == 'failed'
assert test_obj.state == HarnessStatus.FAIL
assert exp_out in caplog.text
@ -483,13 +493,13 @@ def test_get_harness(name): @@ -483,13 +493,13 @@ def test_get_harness(name):
assert isinstance(harness_class, Test)
TEST_DATA_7 = [("", "Running TESTSUITE suite_name", ['suite_name'], None, True, None),
("", "START - test_testcase", [], "started", True, None),
("", "PASS - test_example in 0 seconds", [], "passed", True, None),
("", "SKIP - test_example in 0 seconds", [], "skipped", True, None),
("", "FAIL - test_example in 0 seconds", [], "failed", True, None),
("not a ztest and no state for test_id", "START - test_testcase", [], "passed", False, "passed"),
("not a ztest and no state for test_id", "START - test_testcase", [], "failed", False, "failed")]
TEST_DATA_7 = [("", "Running TESTSUITE suite_name", ['suite_name'], TestCaseStatus.NONE, True, HarnessStatus.NONE),
("", "START - test_testcase", [], TestCaseStatus.STARTED, True, HarnessStatus.NONE),
("", "PASS - test_example in 0 seconds", [], TestCaseStatus.PASS, True, HarnessStatus.NONE),
("", "SKIP - test_example in 0 seconds", [], TestCaseStatus.SKIP, True, HarnessStatus.NONE),
("", "FAIL - test_example in 0 seconds", [], TestCaseStatus.FAIL, True, HarnessStatus.NONE),
("not a ztest and no state for test_id", "START - test_testcase", [], TestCaseStatus.PASS, False, HarnessStatus.PASS),
("not a ztest and no state for test_id", "START - test_testcase", [], TestCaseStatus.FAIL, False, HarnessStatus.FAIL)]
@pytest.mark.parametrize(
"exp_out, line, exp_suite_name, exp_status, ztest, state",
TEST_DATA_7,
@ -553,7 +563,7 @@ def gtest(tmp_path): @@ -553,7 +563,7 @@ def gtest(tmp_path):
def test_gtest_start_test_no_suites_detected(gtest):
process_logs(gtest, [SAMPLE_GTEST_START])
assert len(gtest.detected_suite_names) == 0
assert gtest.state is None
assert gtest.state == HarnessStatus.NONE
def test_gtest_start_test(gtest):
@ -566,12 +576,12 @@ def test_gtest_start_test(gtest): @@ -566,12 +576,12 @@ def test_gtest_start_test(gtest):
),
],
)
assert gtest.state is None
assert gtest.state == HarnessStatus.NONE
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert (
gtest.instance.get_case_by_name("id.suite_name.test_name").status == "started"
gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.STARTED
)
@ -588,11 +598,11 @@ def test_gtest_pass(gtest): @@ -588,11 +598,11 @@ def test_gtest_pass(gtest):
),
],
)
assert gtest.state is None
assert gtest.state == HarnessStatus.NONE
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == "passed"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.PASS
def test_gtest_failed(gtest):
@ -608,11 +618,11 @@ def test_gtest_failed(gtest): @@ -608,11 +618,11 @@ def test_gtest_failed(gtest):
),
],
)
assert gtest.state is None
assert gtest.state == HarnessStatus.NONE
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == "failed"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.FAIL
def test_gtest_skipped(gtest):
@ -628,11 +638,11 @@ def test_gtest_skipped(gtest): @@ -628,11 +638,11 @@ def test_gtest_skipped(gtest):
),
],
)
assert gtest.state is None
assert gtest.state == HarnessStatus.NONE
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == "skipped"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.SKIP
def test_gtest_all_pass(gtest):
@ -649,11 +659,11 @@ def test_gtest_all_pass(gtest): @@ -649,11 +659,11 @@ def test_gtest_all_pass(gtest):
SAMPLE_GTEST_END,
],
)
assert gtest.state == "passed"
assert gtest.state == HarnessStatus.PASS
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == "passed"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.PASS
def test_gtest_one_skipped(gtest):
@ -676,13 +686,13 @@ def test_gtest_one_skipped(gtest): @@ -676,13 +686,13 @@ def test_gtest_one_skipped(gtest):
SAMPLE_GTEST_END,
],
)
assert gtest.state == "passed"
assert gtest.state == HarnessStatus.PASS
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == "passed"
assert gtest.instance.get_case_by_name("id.suite_name.test_name1") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test_name1").status == "skipped"
assert gtest.instance.get_case_by_name("id.suite_name.test_name") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name").status == TestCaseStatus.PASS
assert gtest.instance.get_case_by_name("id.suite_name.test_name1") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test_name1").status == TestCaseStatus.SKIP
def test_gtest_one_fail(gtest):
@ -705,13 +715,13 @@ def test_gtest_one_fail(gtest): @@ -705,13 +715,13 @@ def test_gtest_one_fail(gtest):
SAMPLE_GTEST_END,
],
)
assert gtest.state == "failed"
assert gtest.state == HarnessStatus.FAIL
assert len(gtest.detected_suite_names) == 1
assert gtest.detected_suite_names[0] == "suite_name"
assert gtest.instance.get_case_by_name("id.suite_name.test0") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test0").status == "passed"
assert gtest.instance.get_case_by_name("id.suite_name.test1") is not None
assert gtest.instance.get_case_by_name("id.suite_name.test1").status == "failed"
assert gtest.instance.get_case_by_name("id.suite_name.test0") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test0").status == TestCaseStatus.PASS
assert gtest.instance.get_case_by_name("id.suite_name.test1") != TestCaseStatus.NONE
assert gtest.instance.get_case_by_name("id.suite_name.test1").status == TestCaseStatus.FAIL
def test_gtest_missing_result(gtest):

123
scripts/tests/twister/test_runner.py

@ -24,6 +24,7 @@ from typing import List @@ -24,6 +24,7 @@ from typing import List
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
from twisterlib.statuses import TestCaseStatus, TestInstanceStatus
from twisterlib.error import BuildError
from twisterlib.harness import Pytest
@ -253,15 +254,15 @@ TESTDATA_1_1 = [ @@ -253,15 +254,15 @@ TESTDATA_1_1 = [
]
TESTDATA_1_2 = [
(0, False, 'dummy out',
True, True, 'passed', None, False, True),
True, True, TestInstanceStatus.PASS, None, False, True),
(0, True, '',
False, False, 'passed', None, False, False),
False, False, TestInstanceStatus.PASS, None, False, False),
(1, True, 'ERROR: region `FLASH\' overflowed by 123 MB',
True, True, 'skipped', 'FLASH overflow', True, False),
True, True, TestInstanceStatus.SKIP, 'FLASH overflow', True, False),
(1, True, 'Error: Image size (99 B) + trailer (1 B) exceeds requested size',
True, True, 'skipped', 'imgtool overflow', True, False),
True, True, TestInstanceStatus.SKIP, 'imgtool overflow', True, False),
(1, True, 'mock.ANY',
True, True, 'error', 'Build failure', False, False)
True, True, TestInstanceStatus.ERROR, 'Build failure', False, False)
]
@pytest.mark.parametrize(
@ -306,7 +307,7 @@ def test_cmake_run_build( @@ -306,7 +307,7 @@ def test_cmake_run_build(
instance_mock = mock.Mock(add_missing_case_status=mock.Mock())
instance_mock.build_time = 0
instance_mock.run = is_instance_run
instance_mock.status = None
instance_mock.status = TestInstanceStatus.NONE
instance_mock.reason = None
cmake = CMake(testsuite_mock, platform_mock, source_dir, build_dir,
@ -354,7 +355,7 @@ def test_cmake_run_build( @@ -354,7 +355,7 @@ def test_cmake_run_build(
if expected_add_missing:
cmake.instance.add_missing_case_status.assert_called_once_with(
'skipped', 'Test was built only'
TestInstanceStatus.SKIP, 'Test was built only'
)
@ -366,7 +367,7 @@ TESTDATA_2_2 = [ @@ -366,7 +367,7 @@ TESTDATA_2_2 = [
(True, ['dummy_stage_1', 'ds2'],
0, False, '',
True, True, False,
None, None,
TestInstanceStatus.NONE, None,
[os.path.join('dummy', 'cmake'),
'-B' + os.path.join('build', 'dir'), '-DTC_RUNID=1',
'-DSB_CONFIG_COMPILER_WARNINGS_AS_ERRORS=y',
@ -380,7 +381,7 @@ TESTDATA_2_2 = [ @@ -380,7 +381,7 @@ TESTDATA_2_2 = [
(False, [],
1, True, 'ERROR: region `FLASH\' overflowed by 123 MB',
True, False, True,
'error', 'Cmake build failure',
TestInstanceStatus.ERROR, 'Cmake build failure',
[os.path.join('dummy', 'cmake'),
'-B' + os.path.join('build', 'dir'), '-DTC_RUNID=1',
'-DSB_CONFIG_COMPILER_WARNINGS_AS_ERRORS=n',
@ -438,13 +439,13 @@ def test_cmake_run_cmake( @@ -438,13 +439,13 @@ def test_cmake_run_cmake(
instance_mock.run = is_instance_run
instance_mock.run_id = 1
instance_mock.build_time = 0
instance_mock.status = None
instance_mock.status = TestInstanceStatus.NONE
instance_mock.reason = None
instance_mock.testsuite = mock.Mock()
instance_mock.testsuite.required_snippets = ['dummy snippet 1', 'ds2']
instance_mock.testcases = [mock.Mock(), mock.Mock()]
instance_mock.testcases[0].status = None
instance_mock.testcases[1].status = None
instance_mock.testcases[0].status = TestCaseStatus.NONE
instance_mock.testcases[1].status = TestCaseStatus.NONE
cmake = CMake(testsuite_mock, platform_mock, source_dir, build_dir,
jobserver_mock)
@ -859,7 +860,7 @@ def test_projectbuilder_log_info_file( @@ -859,7 +860,7 @@ def test_projectbuilder_log_info_file(
TESTDATA_6 = [
(
{'op': 'filter'},
'failed',
TestInstanceStatus.FAIL,
'Failed',
mock.ANY,
mock.ANY,
@ -874,14 +875,14 @@ TESTDATA_6 = [ @@ -874,14 +875,14 @@ TESTDATA_6 = [
mock.ANY,
[],
{'op': 'report', 'test': mock.ANY},
'failed',
TestInstanceStatus.FAIL,
'Failed',
0,
None
),
(
{'op': 'filter'},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
mock.ANY,
mock.ANY,
@ -896,14 +897,14 @@ TESTDATA_6 = [ @@ -896,14 +897,14 @@ TESTDATA_6 = [
mock.ANY,
['filtering dummy instance name'],
{'op': 'report', 'test': mock.ANY},
'filtered',
TestInstanceStatus.FILTER,
'runtime filter',
1,
('skipped',)
(TestCaseStatus.SKIP,)
),
(
{'op': 'filter'},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
mock.ANY,
mock.ANY,
@ -918,14 +919,14 @@ TESTDATA_6 = [ @@ -918,14 +919,14 @@ TESTDATA_6 = [
mock.ANY,
[],
{'op': 'cmake', 'test': mock.ANY},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
0,
None
),
(
{'op': 'cmake'},
'error',
TestInstanceStatus.ERROR,
'dummy error',
mock.ANY,
mock.ANY,
@ -940,14 +941,14 @@ TESTDATA_6 = [ @@ -940,14 +941,14 @@ TESTDATA_6 = [
mock.ANY,
[],
{'op': 'report', 'test': mock.ANY},
'error',
TestInstanceStatus.ERROR,
'dummy error',
0,
None
),
(
{'op': 'cmake'},
None,
TestInstanceStatus.NONE,
mock.ANY,
mock.ANY,
mock.ANY,
@ -962,7 +963,7 @@ TESTDATA_6 = [ @@ -962,7 +963,7 @@ TESTDATA_6 = [
mock.ANY,
[],
{'op': 'report', 'test': mock.ANY},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
0,
None
@ -1006,10 +1007,10 @@ TESTDATA_6 = [ @@ -1006,10 +1007,10 @@ TESTDATA_6 = [
mock.ANY,
['filtering dummy instance name'],
{'op': 'report', 'test': mock.ANY},
'filtered',
TestInstanceStatus.FILTER,
'runtime filter',
1,
('skipped',)
(TestCaseStatus.SKIP,)
),
(
{'op': 'cmake'},
@ -1050,14 +1051,14 @@ TESTDATA_6 = [ @@ -1050,14 +1051,14 @@ TESTDATA_6 = [
mock.ANY,
['build test: dummy instance name'],
{'op': 'report', 'test': mock.ANY},
'error',
TestInstanceStatus.ERROR,
'Build Failure',
0,
None
),
(
{'op': 'build'},
'skipped',
TestInstanceStatus.SKIP,
mock.ANY,
mock.ANY,
mock.ANY,
@ -1076,11 +1077,11 @@ TESTDATA_6 = [ @@ -1076,11 +1077,11 @@ TESTDATA_6 = [
mock.ANY,
mock.ANY,
1,
('skipped', mock.ANY)
(TestCaseStatus.SKIP, mock.ANY)
),
(
{'op': 'build'},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
mock.ANY,
mock.ANY,
@ -1095,10 +1096,10 @@ TESTDATA_6 = [ @@ -1095,10 +1096,10 @@ TESTDATA_6 = [
mock.ANY,
['build test: dummy instance name'],
{'op': 'report', 'test': mock.ANY},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
0,
('blocked', mock.ANY)
(TestCaseStatus.BLOCK, mock.ANY)
),
(
{'op': 'build'},
@ -1141,7 +1142,7 @@ TESTDATA_6 = [ @@ -1141,7 +1142,7 @@ TESTDATA_6 = [
['build test: dummy instance name',
'Determine test cases for test instance: dummy instance name'],
{'op': 'report', 'test': mock.ANY},
'error',
TestInstanceStatus.ERROR,
'Determine Testcases Error!',
0,
None
@ -1237,7 +1238,7 @@ TESTDATA_6 = [ @@ -1237,7 +1238,7 @@ TESTDATA_6 = [
),
(
{'op': 'run'},
'failed',
TestInstanceStatus.FAIL,
mock.ANY,
mock.ANY,
mock.ANY,
@ -1254,7 +1255,7 @@ TESTDATA_6 = [ @@ -1254,7 +1255,7 @@ TESTDATA_6 = [
'run status: dummy instance name failed',
'RuntimeError: Pipeline Error!'],
None,
'failed',
TestInstanceStatus.FAIL,
mock.ANY,
0,
None
@ -1283,7 +1284,7 @@ TESTDATA_6 = [ @@ -1283,7 +1284,7 @@ TESTDATA_6 = [
),
(
{'op': 'report'},
'passed',
TestInstanceStatus.PASS,
mock.ANY,
mock.ANY,
mock.ANY,
@ -1928,14 +1929,14 @@ def test_projectbuilder_sanitize_zephyr_base_from_files( @@ -1928,14 +1929,14 @@ def test_projectbuilder_sanitize_zephyr_base_from_files(
TESTDATA_13 = [
(
'error', True, True, False,
TestInstanceStatus.ERROR, True, True, False,
['INFO 20/25 dummy platform' \
' dummy.testsuite.name' \
' ERROR dummy reason (cmake)'],
None
),
(
'failed', False, False, False,
TestInstanceStatus.FAIL, False, False, False,
['ERROR dummy platform' \
' dummy.testsuite.name' \
' FAILED : dummy reason'],
@ -1943,20 +1944,20 @@ TESTDATA_13 = [ @@ -1943,20 +1944,20 @@ TESTDATA_13 = [
' failed: 3, error: 1'
),
(
'skipped', True, False, False,
TestInstanceStatus.SKIP, True, False, False,
['INFO 20/25 dummy platform' \
' dummy.testsuite.name' \
' SKIPPED (dummy reason)'],
None
),
(
'filtered', False, False, False,
TestInstanceStatus.FILTER, False, False, False,
[],
'INFO - Total complete: 20/ 25 80% skipped: 4,' \
' failed: 2, error: 1'
),
(
'passed', True, False, True,
TestInstanceStatus.PASS, True, False, True,
['INFO 20/25 dummy platform' \
' dummy.testsuite.name' \
' PASSED' \
@ -1964,7 +1965,7 @@ TESTDATA_13 = [ @@ -1964,7 +1965,7 @@ TESTDATA_13 = [
None
),
(
'passed', True, False, False,
TestInstanceStatus.PASS, True, False, False,
['INFO 20/25 dummy platform' \
' dummy.testsuite.name' \
' PASSED (build)'],
@ -1977,7 +1978,7 @@ TESTDATA_13 = [ @@ -1977,7 +1978,7 @@ TESTDATA_13 = [
' failed: 2, error: 1\r'
),
(
'timeout', True, False, True,
TestInstanceStatus.TIMEOUT, True, False, True,
['INFO 20/25 dummy platform' \
' dummy.testsuite.name' \
' UNKNOWN' \
@ -2017,7 +2018,7 @@ def test_projectbuilder_report_out( @@ -2017,7 +2018,7 @@ def test_projectbuilder_report_out(
instance_mock.testsuite.name = 'dummy.testsuite.name'
instance_mock.testsuite.testcases = [mock.Mock() for _ in range(25)]
instance_mock.testcases = [mock.Mock() for _ in range(24)] + \
[mock.Mock(status='skipped')]
[mock.Mock(status=TestCaseStatus.SKIP)]
env_mock = mock.Mock()
pb = ProjectBuilder(instance_mock, env_mock, mocked_jobserver)
@ -2309,14 +2310,14 @@ def test_projectbuilder_gather_metrics( @@ -2309,14 +2310,14 @@ def test_projectbuilder_gather_metrics(
TESTDATA_16 = [
('error', mock.ANY, False, False, False),
('failed', mock.ANY, False, False, False),
('skipped', mock.ANY, False, False, False),
('filtered', 'native', False, False, True),
('passed', 'qemu', False, False, True),
('filtered', 'unit', False, False, True),
('filtered', 'mcu', True, True, False),
('passed', 'frdm_k64f', False, True, False),
(TestInstanceStatus.ERROR, mock.ANY, False, False, False),
(TestInstanceStatus.FAIL, mock.ANY, False, False, False),
(TestInstanceStatus.SKIP, mock.ANY, False, False, False),
(TestInstanceStatus.FILTER, 'native', False, False, True),
(TestInstanceStatus.PASS, 'qemu', False, False, True),
(TestInstanceStatus.FILTER, 'unit', False, False, True),
(TestInstanceStatus.FILTER, 'mcu', True, True, False),
(TestInstanceStatus.PASS, 'frdm_k64f', False, True, False),
]
@pytest.mark.parametrize(
@ -2474,35 +2475,35 @@ def test_twisterrunner_run( @@ -2474,35 +2475,35 @@ def test_twisterrunner_run(
def test_twisterrunner_update_counting_before_pipeline():
instances = {
'dummy1': mock.Mock(
status='filtered',
status=TestInstanceStatus.FILTER,
reason='runtime filter',
testsuite=mock.Mock(
testcases=[mock.Mock()]
)
),
'dummy2': mock.Mock(
status='filtered',
status=TestInstanceStatus.FILTER,
reason='static filter',
testsuite=mock.Mock(
testcases=[mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()]
)
),
'dummy3': mock.Mock(
status='error',
status=TestInstanceStatus.ERROR,
reason='error',
testsuite=mock.Mock(
testcases=[mock.Mock()]
)
),
'dummy4': mock.Mock(
status='passed',
status=TestInstanceStatus.PASS,
reason='OK',
testsuite=mock.Mock(
testcases=[mock.Mock()]
)
),
'dummy5': mock.Mock(
status='skipped',
status=TestInstanceStatus.SKIP,
reason=None,
testsuite=mock.Mock(
testcases=[mock.Mock()]
@ -2589,11 +2590,11 @@ def test_twisterrunner_add_tasks_to_queue( @@ -2589,11 +2590,11 @@ def test_twisterrunner_add_tasks_to_queue(
return [filter]
instances = {
'dummy1': mock.Mock(run=True, retries=0, status='passed', build_dir="/tmp"),
'dummy2': mock.Mock(run=True, retries=0, status='skipped', build_dir="/tmp"),
'dummy3': mock.Mock(run=True, retries=0, status='filtered', build_dir="/tmp"),
'dummy4': mock.Mock(run=True, retries=0, status='error', build_dir="/tmp"),
'dummy5': mock.Mock(run=True, retries=0, status='failed', build_dir="/tmp")
'dummy1': mock.Mock(run=True, retries=0, status=TestInstanceStatus.PASS, build_dir="/tmp"),
'dummy2': mock.Mock(run=True, retries=0, status=TestInstanceStatus.SKIP, build_dir="/tmp"),
'dummy3': mock.Mock(run=True, retries=0, status=TestInstanceStatus.FILTER, build_dir="/tmp"),
'dummy4': mock.Mock(run=True, retries=0, status=TestInstanceStatus.ERROR, build_dir="/tmp"),
'dummy5': mock.Mock(run=True, retries=0, status=TestInstanceStatus.FAIL, build_dir="/tmp")
}
instances['dummy4'].testsuite.filter = 'some'
instances['dummy5'].testsuite.filter = 'full'

13
scripts/tests/twister/test_testinstance.py

@ -16,6 +16,7 @@ import mock @@ -16,6 +16,7 @@ import mock
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
from twisterlib.statuses import TestCaseStatus, TestInstanceStatus
from twisterlib.testinstance import TestInstance
from twisterlib.error import BuildError
from twisterlib.runner import TwisterRunner
@ -265,7 +266,7 @@ def test_testinstance_add_filter(testinstance): @@ -265,7 +266,7 @@ def test_testinstance_add_filter(testinstance):
testinstance.add_filter(reason, filter_type)
assert {'type': filter_type, 'reason': reason} in testinstance.filters
assert testinstance.status == 'filtered'
assert testinstance.status == TestInstanceStatus.FILTER
assert testinstance.reason == reason
assert testinstance.filter_type == filter_type
@ -310,17 +311,17 @@ TESTDATA_2 = [ @@ -310,17 +311,17 @@ TESTDATA_2 = [
def test_testinstance_add_missing_case_status(testinstance, reason, expected_reason):
testinstance.reason = 'dummy reason'
status = 'passed'
status = TestCaseStatus.PASS
assert len(testinstance.testcases) > 1, 'Selected testsuite does not have enough testcases.'
testinstance.testcases[0].status = 'started'
testinstance.testcases[-1].status = None
testinstance.testcases[0].status = TestCaseStatus.STARTED
testinstance.testcases[-1].status = TestCaseStatus.NONE
testinstance.add_missing_case_status(status, reason)
assert testinstance.testcases[0].status == 'failed'
assert testinstance.testcases[-1].status == 'passed'
assert testinstance.testcases[0].status == TestCaseStatus.FAIL
assert testinstance.testcases[-1].status == TestCaseStatus.PASS
assert testinstance.testcases[-1].reason == expected_reason

51
scripts/tests/twister/test_testplan.py

@ -16,6 +16,7 @@ from contextlib import nullcontext @@ -16,6 +16,7 @@ from contextlib import nullcontext
ZEPHYR_BASE = os.getenv("ZEPHYR_BASE")
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/pylib/twister"))
from twisterlib.statuses import TestInstanceStatus
from twisterlib.testplan import TestPlan, change_skip_to_error_if_integration
from twisterlib.testinstance import TestInstance
from twisterlib.testsuite import TestSuite
@ -180,7 +181,7 @@ def test_apply_filters_part1(class_testplan, all_testsuites_dict, platforms_list @@ -180,7 +181,7 @@ def test_apply_filters_part1(class_testplan, all_testsuites_dict, platforms_list
plan.apply_filters(exclude_platform=['demo_board_1'],
platform=['demo_board_2'])
filtered_instances = list(filter(lambda item: item.status == "filtered", plan.instances.values()))
filtered_instances = list(filter(lambda item: item.status == TestInstanceStatus.FILTER, plan.instances.values()))
for d in filtered_instances:
assert d.reason == expected_discards
@ -214,7 +215,7 @@ def test_apply_filters_part2(class_testplan, all_testsuites_dict, @@ -214,7 +215,7 @@ def test_apply_filters_part2(class_testplan, all_testsuites_dict,
]
}
class_testplan.apply_filters(**kwargs)
filtered_instances = list(filter(lambda item: item.status == "filtered", class_testplan.instances.values()))
filtered_instances = list(filter(lambda item: item.status == TestInstanceStatus.FILTER, class_testplan.instances.values()))
for d in filtered_instances:
assert d.reason == expected_discards
@ -245,7 +246,7 @@ def test_apply_filters_part3(class_testplan, all_testsuites_dict, platforms_list @@ -245,7 +246,7 @@ def test_apply_filters_part3(class_testplan, all_testsuites_dict, platforms_list
class_testplan.apply_filters(exclude_platform=['demo_board_1'],
platform=['demo_board_2'])
filtered_instances = list(filter(lambda item: item.status == "filtered", class_testplan.instances.values()))
filtered_instances = list(filter(lambda item: item.status == TestInstanceStatus.FILTER, class_testplan.instances.values()))
assert not filtered_instances
def test_add_instances_short(tmp_path, class_env, all_testsuites_dict, platforms_list):
@ -338,16 +339,16 @@ def test_quarantine_short(class_testplan, platforms_list, test_data, @@ -338,16 +339,16 @@ def test_quarantine_short(class_testplan, platforms_list, test_data,
for testname, instance in class_testplan.instances.items():
if quarantine_verify:
if testname in expected_val:
assert not instance.status
assert instance.status == TestInstanceStatus.NONE
else:
assert instance.status == 'filtered'
assert instance.status == TestInstanceStatus.FILTER
assert instance.reason == "Not under quarantine"
else:
if testname in expected_val:
assert instance.status == 'filtered'
assert instance.status == TestInstanceStatus.FILTER
assert instance.reason == "Quarantine: " + expected_val[testname]
else:
assert not instance.status
assert instance.status == TestInstanceStatus.NONE
TESTDATA_PART4 = [
@ -392,7 +393,7 @@ def test_required_snippets_short( @@ -392,7 +393,7 @@ def test_required_snippets_short(
plan.apply_filters()
filtered_instances = list(
filter(lambda item: item.status == "filtered", plan.instances.values())
filter(lambda item: item.status == TestInstanceStatus.FILTER, plan.instances.values())
)
if expected_filtered_len is not None:
assert len(filtered_instances) == expected_filtered_len
@ -808,14 +809,14 @@ def test_testplan_generate_subset( @@ -808,14 +809,14 @@ def test_testplan_generate_subset(
shuffle_tests_seed=seed
)
testplan.instances = {
'plat1/testA': mock.Mock(status=None),
'plat1/testB': mock.Mock(status=None),
'plat1/testC': mock.Mock(status=None),
'plat2/testA': mock.Mock(status=None),
'plat2/testB': mock.Mock(status=None),
'plat3/testA': mock.Mock(status='skipped'),
'plat3/testB': mock.Mock(status='skipped'),
'plat3/testC': mock.Mock(status='error'),
'plat1/testA': mock.Mock(status=TestInstanceStatus.NONE),
'plat1/testB': mock.Mock(status=TestInstanceStatus.NONE),
'plat1/testC': mock.Mock(status=TestInstanceStatus.NONE),
'plat2/testA': mock.Mock(status=TestInstanceStatus.NONE),
'plat2/testB': mock.Mock(status=TestInstanceStatus.NONE),
'plat3/testA': mock.Mock(status=TestInstanceStatus.SKIP),
'plat3/testB': mock.Mock(status=TestInstanceStatus.SKIP),
'plat3/testC': mock.Mock(status=TestInstanceStatus.ERROR),
}
testplan.generate_subset(subset, sets)
@ -1567,7 +1568,7 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter): @@ -1567,7 +1568,7 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter):
'retries': 0,
'testcases': {
'TS1.tc1': {
'status': 'passed',
'status': TestInstanceStatus.PASS,
'reason': None,
'duration': 60.0,
'output': ''
@ -1596,13 +1597,13 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter): @@ -1596,13 +1597,13 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter):
'retries': 1,
'testcases': {
'TS3.tc1': {
'status': 'error',
'status': TestInstanceStatus.ERROR,
'reason': None,
'duration': 360.0,
'output': '[ERROR]: File \'dummy.yaml\' not found!\nClosing...'
},
'TS3.tc2': {
'status': None,
'status': TestInstanceStatus.NONE,
'reason': None,
'duration': 0,
'output': ''
@ -1620,7 +1621,7 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter): @@ -1620,7 +1621,7 @@ def test_testplan_load_from_file(caplog, device_testing, expected_tfilter):
'retries': 0,
'testcases': {
'TS4.tc1': {
'status': 'skipped',
'status': TestInstanceStatus.SKIP,
'reason': 'Not in requested test list.',
'duration': 360.0,
'output': '[INFO] Parsing...'
@ -1721,9 +1722,9 @@ def test_testplan_create_build_dir_links(exists): @@ -1721,9 +1722,9 @@ def test_testplan_create_build_dir_links(exists):
instances_linked.append(instance)
instances = {
'inst0': mock.Mock(status='passed'),
'inst1': mock.Mock(status='skipped'),
'inst2': mock.Mock(status='error'),
'inst0': mock.Mock(status=TestInstanceStatus.PASS),
'inst1': mock.Mock(status=TestInstanceStatus.SKIP),
'inst2': mock.Mock(status=TestInstanceStatus.ERROR),
}
expected_instances = [instances['inst0'], instances['inst2']]
@ -1788,7 +1789,7 @@ TESTDATA_14 = [ @@ -1788,7 +1789,7 @@ TESTDATA_14 = [
('bad platform', 'dummy reason', [],
'dummy status', 'dummy reason'),
('good platform', 'quarantined', [],
'error', 'quarantined but is one of the integration platforms'),
TestInstanceStatus.ERROR, 'quarantined but is one of the integration platforms'),
('good platform', 'dummy reason', [{'type': 'command line filter'}],
'dummy status', 'dummy reason'),
('good platform', 'dummy reason', [{'type': 'Skip filter'}],
@ -1800,7 +1801,7 @@ TESTDATA_14 = [ @@ -1800,7 +1801,7 @@ TESTDATA_14 = [
('good platform', 'dummy reason', [{'type': 'Module filter'}],
'dummy status', 'dummy reason'),
('good platform', 'dummy reason', [{'type': 'testsuite filter'}],
'error', 'dummy reason but is one of the integration platforms'),
TestInstanceStatus.ERROR, 'dummy reason but is one of the integration platforms'),
]
@pytest.mark.parametrize(

4
scripts/tests/twister_blackbox/test_tooling.py

@ -14,7 +14,9 @@ import pytest @@ -14,7 +14,9 @@ import pytest
import sys
import json
# pylint: disable=no-name-in-module
from conftest import ZEPHYR_BASE, TEST_DATA, sample_filename_mock, testsuite_filename_mock
from twisterlib.statuses import TestCaseStatus
from twisterlib.testplan import TestPlan
@ -83,7 +85,7 @@ class TestTooling: @@ -83,7 +85,7 @@ class TestTooling:
# Normally, board not supporting our toolchain would be filtered, so we check against that
assert len(filtered_j) == 1
assert filtered_j[0][3] != 'filtered'
assert filtered_j[0][3] != TestCaseStatus.FILTER
@pytest.mark.parametrize(
'test_path, test_platforms',

Loading…
Cancel
Save