You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2883 lines
104 KiB
2883 lines
104 KiB
#!/usr/bin/env python3 |
|
# vim: set syntax=python ts=4 : |
|
"""Zephyr Sanity Tests |
|
|
|
This script scans for the set of unit test applications in the git |
|
repository and attempts to execute them. By default, it tries to |
|
build each test case on one platform per architecture, using a precedence |
|
list defined in an architecture configuration file, and if possible |
|
run the tests in the QEMU emulator. |
|
|
|
Test cases are detected by the presence of a 'testcase.yaml' or a sample.yaml |
|
files in the application's project directory. This file may contain one or more |
|
blocks, each identifying a test scenario. The title of the block is a name for |
|
the test case, which only needs to be unique for the test cases specified in |
|
that testcase meta-data. The full canonical name for each test case is <path to |
|
test case>/<block>. |
|
|
|
Each test block in the testcase meta data can define the following key/value |
|
pairs: |
|
|
|
tags: <list of tags> (required) |
|
A set of string tags for the testcase. Usually pertains to |
|
functional domains but can be anything. Command line invocations |
|
of this script can filter the set of tests to run based on tag. |
|
|
|
skip: <True|False> (default False) |
|
skip testcase unconditionally. This can be used for broken tests. |
|
|
|
slow: <True|False> (default False) |
|
Don't run this test case unless --enable-slow was passed in on the |
|
command line. Intended for time-consuming test cases that are only |
|
run under certain circumstances, like daily builds. These test cases |
|
are still compiled. |
|
|
|
extra_args: <list of extra arguments> |
|
Extra cache entries to pass to CMake when building or running the |
|
test case. |
|
|
|
extra_configs: <list of extra configurations> |
|
Extra configuration options to be merged with a master prj.conf |
|
when building or running the test case. |
|
|
|
build_only: <True|False> (default False) |
|
If true, don't try to run the test under QEMU even if the |
|
selected platform supports it. |
|
|
|
build_on_all: <True|False> (default False) |
|
If true, attempt to build test on all available platforms. |
|
|
|
depends_on: <list of features> |
|
A board or platform can announce what features it supports, this option |
|
will enable the test only those platforms that provide this feature. |
|
|
|
min_ram: <integer> |
|
minimum amount of RAM needed for this test to build and run. This is |
|
compared with information provided by the board metadata. |
|
|
|
min_flash: <integer> |
|
minimum amount of ROM needed for this test to build and run. This is |
|
compared with information provided by the board metadata. |
|
|
|
timeout: <number of seconds> |
|
Length of time to run test in QEMU before automatically killing it. |
|
Default to 60 seconds. |
|
|
|
arch_whitelist: <list of arches, such as x86, arm, arc> |
|
Set of architectures that this test case should only be run for. |
|
|
|
arch_exclude: <list of arches, such as x86, arm, arc> |
|
Set of architectures that this test case should not run on. |
|
|
|
platform_whitelist: <list of platforms> |
|
Set of platforms that this test case should only be run for. |
|
|
|
platform_exclude: <list of platforms> |
|
Set of platforms that this test case should not run on. |
|
|
|
extra_sections: <list of extra binary sections> |
|
When computing sizes, sanitycheck will report errors if it finds |
|
extra, unexpected sections in the Zephyr binary unless they are named |
|
here. They will not be included in the size calculation. |
|
|
|
filter: <expression> |
|
Filter whether the testcase should be run by evaluating an expression |
|
against an environment containing the following values: |
|
|
|
{ ARCH : <architecture>, |
|
PLATFORM : <platform>, |
|
<all CONFIG_* key/value pairs in the test's generated defconfig>, |
|
*<env>: any environment variable available |
|
} |
|
|
|
The grammar for the expression language is as follows: |
|
|
|
expression ::= expression "and" expression |
|
| expression "or" expression |
|
| "not" expression |
|
| "(" expression ")" |
|
| symbol "==" constant |
|
| symbol "!=" constant |
|
| symbol "<" number |
|
| symbol ">" number |
|
| symbol ">=" number |
|
| symbol "<=" number |
|
| symbol "in" list |
|
| symbol ":" string |
|
| symbol |
|
|
|
list ::= "[" list_contents "]" |
|
|
|
list_contents ::= constant |
|
| list_contents "," constant |
|
|
|
constant ::= number |
|
| string |
|
|
|
|
|
For the case where expression ::= symbol, it evaluates to true |
|
if the symbol is defined to a non-empty string. |
|
|
|
Operator precedence, starting from lowest to highest: |
|
|
|
or (left associative) |
|
and (left associative) |
|
not (right associative) |
|
all comparison operators (non-associative) |
|
|
|
arch_whitelist, arch_exclude, platform_whitelist, platform_exclude |
|
are all syntactic sugar for these expressions. For instance |
|
|
|
arch_exclude = x86 arc |
|
|
|
Is the same as: |
|
|
|
filter = not ARCH in ["x86", "arc"] |
|
|
|
The ':' operator compiles the string argument as a regular expression, |
|
and then returns a true value only if the symbol's value in the environment |
|
matches. For example, if CONFIG_SOC="quark_se" then |
|
|
|
filter = CONFIG_SOC : "quark.*" |
|
|
|
Would match it. |
|
|
|
The set of test cases that actually run depends on directives in the testcase |
|
filed and options passed in on the command line. If there is any confusion, |
|
running with -v or --discard-report can help show why particular test cases |
|
were skipped. |
|
|
|
Metrics (such as pass/fail state and binary size) for the last code |
|
release are stored in scripts/sanity_chk/sanity_last_release.csv. |
|
To update this, pass the --all --release options. |
|
|
|
To load arguments from a file, write '+' before the file name, e.g., |
|
+file_name. File content must be one or more valid arguments separated by |
|
line break instead of white spaces. |
|
|
|
Most everyday users will run with no arguments. |
|
""" |
|
|
|
import contextlib |
|
import mmap |
|
import argparse |
|
import os |
|
import sys |
|
import re |
|
import subprocess |
|
import multiprocessing |
|
import select |
|
import shutil |
|
import signal |
|
import threading |
|
import time |
|
import csv |
|
import glob |
|
import serial |
|
import concurrent |
|
import concurrent.futures |
|
import xml.etree.ElementTree as ET |
|
from xml.sax.saxutils import escape |
|
from collections import OrderedDict |
|
from itertools import islice |
|
from functools import cmp_to_key |
|
|
|
import logging |
|
from sanity_chk import scl |
|
from sanity_chk import expr_parser |
|
|
|
log_format = "%(levelname)s %(name)s::%(module)s.%(funcName)s():%(lineno)d: %(message)s" |
|
logging.basicConfig(format=log_format, level=30) |
|
|
|
if "ZEPHYR_BASE" not in os.environ: |
|
sys.stderr.write("$ZEPHYR_BASE environment variable undefined.\n") |
|
exit(1) |
|
ZEPHYR_BASE = os.environ["ZEPHYR_BASE"] |
|
|
|
|
|
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/")) |
|
|
|
|
|
VERBOSE = 0 |
|
LAST_SANITY = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", |
|
"last_sanity.csv") |
|
LAST_SANITY_XUNIT = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", |
|
"last_sanity.xml") |
|
RELEASE_DATA = os.path.join(ZEPHYR_BASE, "scripts", "sanity_chk", |
|
"sanity_last_release.csv") |
|
CPU_COUNTS = multiprocessing.cpu_count() |
|
|
|
if os.isatty(sys.stdout.fileno()): |
|
TERMINAL = True |
|
COLOR_NORMAL = '\033[0m' |
|
COLOR_RED = '\033[91m' |
|
COLOR_GREEN = '\033[92m' |
|
COLOR_YELLOW = '\033[93m' |
|
else: |
|
TERMINAL = False |
|
COLOR_NORMAL = "" |
|
COLOR_RED = "" |
|
COLOR_GREEN = "" |
|
COLOR_YELLOW = "" |
|
|
|
class SanityCheckException(Exception): |
|
pass |
|
|
|
|
|
class SanityRuntimeError(SanityCheckException): |
|
pass |
|
|
|
|
|
class ConfigurationError(SanityCheckException): |
|
def __init__(self, cfile, message): |
|
self.cfile = cfile |
|
self.message = message |
|
|
|
def __str__(self): |
|
return repr(self.cfile + ": " + self.message) |
|
|
|
|
|
class MakeError(SanityCheckException): |
|
pass |
|
|
|
|
|
class BuildError(MakeError): |
|
pass |
|
|
|
|
|
class ExecutionError(MakeError): |
|
pass |
|
|
|
|
|
log_file = None |
|
|
|
|
|
# Debug Functions |
|
def info(what): |
|
sys.stdout.write(what + "\n") |
|
sys.stdout.flush() |
|
if log_file: |
|
log_file.write(what + "\n") |
|
log_file.flush() |
|
|
|
|
|
def error(what): |
|
sys.stderr.write(COLOR_RED + what + COLOR_NORMAL + "\n") |
|
if log_file: |
|
log_file(what + "\n") |
|
log_file.flush() |
|
|
|
|
|
def debug(what): |
|
if VERBOSE >= 1: |
|
info(what) |
|
|
|
|
|
def verbose(what): |
|
if VERBOSE >= 2: |
|
info(what) |
|
|
|
class HarnessImporter: |
|
|
|
def __init__(self, name): |
|
sys.path.insert(0, os.path.join(ZEPHYR_BASE, "scripts/sanity_chk")) |
|
module = __import__("harness") |
|
if name: |
|
my_class = getattr(module, name) |
|
else: |
|
my_class = getattr(module, "Test") |
|
|
|
self.instance = my_class() |
|
|
|
class Handler: |
|
def __init__(self, instance): |
|
"""Constructor |
|
|
|
@param name Arbitrary name of the created thread |
|
@param outdir Working directory, should be where handler pid file (qemu.pid for example) |
|
gets created by the build system |
|
@param log_fn Absolute path to write out handler's log data |
|
@param timeout Kill the handler process if it doesn't finish up within |
|
the given number of seconds |
|
""" |
|
self.lock = threading.Lock() |
|
self.state = "waiting" |
|
self.metrics = {} |
|
self.metrics["handler_time"] = 0 |
|
self.metrics["ram_size"] = 0 |
|
self.metrics["rom_size"] = 0 |
|
|
|
self.name = instance.name |
|
self.instance = instance |
|
self.timeout = instance.test.timeout |
|
self.sourcedir = instance.test.code_location |
|
self.outdir = instance.outdir |
|
self.handler_log = os.path.join(self.outdir, "handler.log") |
|
self.returncode = 0 |
|
self.set_state("running", {}) |
|
|
|
def set_state(self, state, metrics): |
|
self.lock.acquire() |
|
self.state = state |
|
self.metrics.update(metrics) |
|
self.lock.release() |
|
|
|
def get_state(self): |
|
self.lock.acquire() |
|
ret = (self.state, self.metrics) |
|
self.lock.release() |
|
return ret |
|
|
|
|
|
class DeviceHandler(Handler): |
|
|
|
def __init__(self, instance): |
|
"""Constructor |
|
|
|
@param instance Test Instance |
|
""" |
|
super().__init__(instance) |
|
|
|
def monitor_serial(self, ser, harness): |
|
log_out_fp = open(self.handler_log, "wt") |
|
|
|
while ser.isOpen(): |
|
try: |
|
serial_line = ser.readline() |
|
except TypeError: |
|
pass |
|
|
|
if serial_line: |
|
sl = serial_line.decode('utf-8', 'ignore') |
|
verbose("DEVICE: {0}".format(sl.rstrip())) |
|
|
|
log_out_fp.write(sl) |
|
log_out_fp.flush() |
|
harness.handle(sl.rstrip()) |
|
if harness.state: |
|
ser.close() |
|
break |
|
|
|
log_out_fp.close() |
|
|
|
def handle(self): |
|
out_state = "failed" |
|
|
|
if options.ninja: |
|
generator_cmd = "ninja" |
|
else: |
|
generator_cmd = "make" |
|
|
|
command = [generator_cmd, "-C", self.outdir, "flash"] |
|
|
|
device = options.device_serial |
|
ser = serial.Serial( |
|
device, |
|
baudrate=115200, |
|
parity=serial.PARITY_NONE, |
|
stopbits=serial.STOPBITS_ONE, |
|
bytesize=serial.EIGHTBITS, |
|
timeout=self.timeout |
|
) |
|
|
|
ser.flush() |
|
|
|
harness_name = self.instance.test.harness.capitalize() |
|
harness_import = HarnessImporter(harness_name) |
|
harness = harness_import.instance |
|
harness.configure(self.instance) |
|
|
|
t = threading.Thread(target=self.monitor_serial, args=(ser, harness)) |
|
t.start() |
|
|
|
try: |
|
subprocess.check_output(command, stderr=subprocess.PIPE) |
|
except subprocess.CalledProcessError: |
|
pass |
|
|
|
t.join(self.timeout) |
|
if t.is_alive(): |
|
out_state = "timeout" |
|
ser.close() |
|
|
|
if ser.isOpen(): |
|
ser.close() |
|
|
|
if out_state == "timeout": |
|
for c in self.instance.test.cases: |
|
if c not in harness.tests: |
|
harness.tests[c] = "BLOCK" |
|
|
|
self.instance.results = harness.tests |
|
|
|
if harness.state: |
|
self.set_state(harness.state, {}) |
|
else: |
|
self.set_state(out_state, {}) |
|
|
|
class NativeHandler(Handler): |
|
def __init__(self, instance): |
|
"""Constructor |
|
|
|
@param instance Test Instance |
|
""" |
|
super().__init__(instance) |
|
|
|
self.valgrind = False |
|
|
|
def _output_reader(self, proc, harness): |
|
log_out_fp = open(self.handler_log, "wt") |
|
for line in iter(proc.stdout.readline, b''): |
|
verbose("NATIVE: {0}".format(line.decode('utf-8').rstrip())) |
|
log_out_fp.write(line.decode('utf-8')) |
|
log_out_fp.flush() |
|
harness.handle(line.decode('utf-8').rstrip()) |
|
if harness.state: |
|
proc.terminate() |
|
break |
|
|
|
log_out_fp.close() |
|
|
|
def handle(self): |
|
out_state = "failed" |
|
|
|
harness_name = self.instance.test.harness.capitalize() |
|
harness_import = HarnessImporter(harness_name) |
|
harness = harness_import.instance |
|
harness.configure(self.instance) |
|
|
|
binary = os.path.join(self.outdir, "zephyr", "zephyr.exe") |
|
command = [binary] |
|
if shutil.which("valgrind") and self.valgrind: |
|
command = ["valgrind", "--error-exitcode=2", |
|
"--leak-check=full"] + command |
|
|
|
with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: |
|
t = threading.Thread(target=self._output_reader, args=(proc, harness, )) |
|
t.start() |
|
t.join(self.timeout) |
|
if t.is_alive(): |
|
proc.terminate() |
|
out_state = "timeout" |
|
t.join() |
|
|
|
proc.wait() |
|
self.returncode = proc.returncode |
|
if proc.returncode != 0: |
|
out_state = "failed" |
|
|
|
returncode = subprocess.call(["GCOV_PREFIX=" + self.outdir, "gcov", self.sourcedir, "-b", "-s", self.outdir], shell=True) |
|
|
|
|
|
self.instance.results = harness.tests |
|
if harness.state: |
|
self.set_state(harness.state, {}) |
|
else: |
|
self.set_state(out_state, {}) |
|
|
|
|
|
class UnitHandler(Handler): |
|
def __init__(self, instance): |
|
"""Constructor |
|
|
|
@param instance Test instance |
|
""" |
|
super().__init__(instance) |
|
|
|
|
|
def handle(self): |
|
out_state = "failed" |
|
|
|
with open(self.handler_log, "wt") as hl: |
|
try: |
|
binary = os.path.join(self.outdir, "testbinary") |
|
command = [binary] |
|
if shutil.which("valgrind"): |
|
command = ["valgrind", "--error-exitcode=2", |
|
"--leak-check=full"] + command |
|
returncode = subprocess.call(command, timeout=self.timeout, |
|
stdout=hl, stderr=hl) |
|
self.returncode = returncode |
|
if returncode != 0: |
|
if self.returncode == 1: |
|
out_state = "failed" |
|
else: |
|
out_state = "failed valgrind" |
|
else: |
|
out_state = "passed" |
|
except subprocess.TimeoutExpired: |
|
out_state = "timeout" |
|
self.returncode = 1 |
|
|
|
returncode = subprocess.call( |
|
["GCOV_PREFIX=" + self.outdir, "gcov", self.sourcedir, "-s", self.outdir], shell=True) |
|
|
|
self.set_state(out_state, {}) |
|
|
|
|
|
class QEMUHandler(Handler): |
|
"""Spawns a thread to monitor QEMU output from pipes |
|
|
|
We pass QEMU_PIPE to 'make run' and monitor the pipes for output. |
|
We need to do this as once qemu starts, it runs forever until killed. |
|
Test cases emit special messages to the console as they run, we check |
|
for these to collect whether the test passed or failed. |
|
""" |
|
|
|
@staticmethod |
|
def _thread(handler, timeout, outdir, logfile, fifo_fn, pid_fn, results, harness): |
|
fifo_in = fifo_fn + ".in" |
|
fifo_out = fifo_fn + ".out" |
|
|
|
# These in/out nodes are named from QEMU's perspective, not ours |
|
if os.path.exists(fifo_in): |
|
os.unlink(fifo_in) |
|
os.mkfifo(fifo_in) |
|
if os.path.exists(fifo_out): |
|
os.unlink(fifo_out) |
|
os.mkfifo(fifo_out) |
|
|
|
# We don't do anything with out_fp but we need to open it for |
|
# writing so that QEMU doesn't block, due to the way pipes work |
|
out_fp = open(fifo_in, "wb") |
|
# Disable internal buffering, we don't |
|
# want read() or poll() to ever block if there is data in there |
|
in_fp = open(fifo_out, "rb", buffering=0) |
|
log_out_fp = open(logfile, "wt") |
|
|
|
start_time = time.time() |
|
timeout_time = start_time + timeout |
|
p = select.poll() |
|
p.register(in_fp, select.POLLIN) |
|
|
|
metrics = {} |
|
line = "" |
|
while True: |
|
this_timeout = int((timeout_time - time.time()) * 1000) |
|
if this_timeout < 0 or not p.poll(this_timeout): |
|
out_state = "timeout" |
|
break |
|
|
|
try: |
|
c = in_fp.read(1).decode("utf-8") |
|
except UnicodeDecodeError: |
|
# Test is writing something weird, fail |
|
out_state = "unexpected byte" |
|
break |
|
|
|
if c == "": |
|
# EOF, this shouldn't happen unless QEMU crashes |
|
out_state = "unexpected eof" |
|
break |
|
line = line + c |
|
if c != "\n": |
|
continue |
|
|
|
# line contains a full line of data output from QEMU |
|
log_out_fp.write(line) |
|
log_out_fp.flush() |
|
line = line.strip() |
|
verbose("QEMU: %s" % line) |
|
|
|
harness.handle(line) |
|
if harness.state: |
|
out_state = harness.state |
|
break |
|
|
|
# TODO: Add support for getting numerical performance data |
|
# from test cases. Will involve extending test case reporting |
|
# APIs. Add whatever gets reported to the metrics dictionary |
|
line = "" |
|
|
|
metrics["handler_time"] = time.time() - start_time |
|
verbose("QEMU complete (%s) after %f seconds" % |
|
(out_state, metrics["handler_time"])) |
|
handler.set_state(out_state, metrics) |
|
|
|
log_out_fp.close() |
|
out_fp.close() |
|
in_fp.close() |
|
|
|
pid = int(open(pid_fn).read()) |
|
os.unlink(pid_fn) |
|
try: |
|
os.kill(pid, signal.SIGTERM) |
|
except ProcessLookupError: |
|
# Oh well, as long as it's dead! User probably sent Ctrl-C |
|
pass |
|
|
|
os.unlink(fifo_in) |
|
os.unlink(fifo_out) |
|
|
|
def __init__(self, instance): |
|
"""Constructor |
|
|
|
@param instance Test instance |
|
""" |
|
|
|
super().__init__(instance) |
|
|
|
self.results = {} |
|
|
|
# We pass this to QEMU which looks for fifos with .in and .out |
|
# suffixes. |
|
self.fifo_fn = os.path.join(instance.outdir, "qemu-fifo") |
|
|
|
self.pid_fn = os.path.join(instance.outdir, "qemu.pid") |
|
if os.path.exists(self.pid_fn): |
|
os.unlink(self.pid_fn) |
|
|
|
self.log_fn = self.handler_log |
|
|
|
harness_import = HarnessImporter(instance.test.harness.capitalize()) |
|
harness = harness_import.instance |
|
harness.configure(self.instance) |
|
self.thread = threading.Thread(name=self.name, target=QEMUHandler._thread, |
|
args=(self, self.timeout, self.outdir, |
|
self.log_fn, self.fifo_fn, |
|
self.pid_fn, self.results, harness)) |
|
|
|
self.instance.results = harness.tests |
|
self.thread.daemon = True |
|
verbose("Spawning QEMU process for %s" % self.name) |
|
self.thread.start() |
|
|
|
def get_fifo(self): |
|
return self.fifo_fn |
|
|
|
|
|
class SizeCalculator: |
|
|
|
alloc_sections = ["bss", "noinit", "app_bss", "app_noinit", "ccm_bss", |
|
"ccm_noinit"] |
|
rw_sections = ["datas", "initlevel", "_k_task_list", "_k_event_list", |
|
"_k_memory_pool", "exceptions", "initshell", |
|
"_static_thread_area", "_k_timer_area", "_k_work_area", |
|
"_k_mem_slab_area", "_k_mem_pool_area", |
|
"_k_sem_area", "_k_mutex_area", "_k_alert_area", |
|
"_k_fifo_area", "_k_lifo_area", "_k_stack_area", |
|
"_k_msgq_area", "_k_mbox_area", "_k_pipe_area", |
|
"net_if", "net_if_dev", "net_stack", "net_l2_data", |
|
"_k_queue_area", "_net_buf_pool_area", "app_datas", |
|
"kobject_data", "mmu_tables", "app_pad", "priv_stacks", |
|
"ccm_data"] |
|
# These get copied into RAM only on non-XIP |
|
ro_sections = ["text", "ctors", "init_array", "reset", "object_access", |
|
"rodata", "devconfig", "net_l2", "vector"] |
|
|
|
def __init__(self, filename, extra_sections): |
|
"""Constructor |
|
|
|
@param filename Path to the output binary |
|
The <filename> is parsed by objdump to determine section sizes |
|
""" |
|
# Make sure this is an ELF binary |
|
with open(filename, "rb") as f: |
|
magic = f.read(4) |
|
|
|
if (magic != b'\x7fELF'): |
|
raise SanityRuntimeError("%s is not an ELF binary" % filename) |
|
|
|
# Search for CONFIG_XIP in the ELF's list of symbols using NM and AWK. |
|
# GREP can not be used as it returns an error if the symbol is not |
|
# found. |
|
is_xip_command = "nm " + filename + \ |
|
" | awk '/CONFIG_XIP/ { print $3 }'" |
|
is_xip_output = subprocess.check_output( |
|
is_xip_command, shell=True, stderr=subprocess.STDOUT).decode( |
|
"utf-8").strip() |
|
if is_xip_output.endswith("no symbols"): |
|
raise SanityRuntimeError("%s has no symbol information" % filename) |
|
self.is_xip = (len(is_xip_output) != 0) |
|
|
|
self.filename = filename |
|
self.sections = [] |
|
self.rom_size = 0 |
|
self.ram_size = 0 |
|
self.extra_sections = extra_sections |
|
|
|
self._calculate_sizes() |
|
|
|
def get_ram_size(self): |
|
"""Get the amount of RAM the application will use up on the device |
|
|
|
@return amount of RAM, in bytes |
|
""" |
|
return self.ram_size |
|
|
|
def get_rom_size(self): |
|
"""Get the size of the data that this application uses on device's flash |
|
|
|
@return amount of ROM, in bytes |
|
""" |
|
return self.rom_size |
|
|
|
def unrecognized_sections(self): |
|
"""Get a list of sections inside the binary that weren't recognized |
|
|
|
@return list of unrecognized section names |
|
""" |
|
slist = [] |
|
for v in self.sections: |
|
if not v["recognized"]: |
|
slist.append(v["name"]) |
|
return slist |
|
|
|
def _calculate_sizes(self): |
|
""" Calculate RAM and ROM usage by section """ |
|
objdump_command = "objdump -h " + self.filename |
|
objdump_output = subprocess.check_output( |
|
objdump_command, shell=True).decode("utf-8").splitlines() |
|
|
|
for line in objdump_output: |
|
words = line.split() |
|
|
|
if (len(words) == 0): # Skip lines that are too short |
|
continue |
|
|
|
index = words[0] |
|
if (not index[0].isdigit()): # Skip lines that do not start |
|
continue # with a digit |
|
|
|
name = words[1] # Skip lines with section names |
|
if (name[0] == '.'): # starting with '.' |
|
continue |
|
|
|
# TODO this doesn't actually reflect the size in flash or RAM as |
|
# it doesn't include linker-imposed padding between sections. |
|
# It is close though. |
|
size = int(words[2], 16) |
|
if size == 0: |
|
continue |
|
|
|
load_addr = int(words[4], 16) |
|
virt_addr = int(words[3], 16) |
|
|
|
# Add section to memory use totals (for both non-XIP and XIP scenarios) |
|
# Unrecognized section names are not included in the calculations. |
|
recognized = True |
|
if name in SizeCalculator.alloc_sections: |
|
self.ram_size += size |
|
stype = "alloc" |
|
elif name in SizeCalculator.rw_sections: |
|
self.ram_size += size |
|
self.rom_size += size |
|
stype = "rw" |
|
elif name in SizeCalculator.ro_sections: |
|
self.rom_size += size |
|
if not self.is_xip: |
|
self.ram_size += size |
|
stype = "ro" |
|
else: |
|
stype = "unknown" |
|
if name not in self.extra_sections: |
|
recognized = False |
|
|
|
self.sections.append({"name": name, "load_addr": load_addr, |
|
"size": size, "virt_addr": virt_addr, |
|
"type": stype, "recognized": recognized}) |
|
|
|
|
|
class MakeGoal: |
|
"""Metadata class representing one of the sub-makes called by MakeGenerator |
|
|
|
MakeGenerator returns a dictionary of these which can then be associated |
|
with TestInstances to get a complete picture of what happened during a test. |
|
MakeGenerator is used for tasks outside of building tests (such as |
|
defconfigs) which is why MakeGoal is a separate class from TestInstance. |
|
""" |
|
|
|
def __init__(self, name, text, handler, make_log, build_log, run_log, handler_log): |
|
self.name = name |
|
self.text = text |
|
self.handler = handler |
|
self.make_log = make_log |
|
self.build_log = build_log |
|
self.run_log = run_log |
|
self.handler_log = handler_log |
|
self.make_state = "waiting" |
|
self.failed = False |
|
self.finished = False |
|
self.reason = None |
|
self.metrics = {} |
|
|
|
def get_error_log(self): |
|
if self.make_state == "waiting": |
|
# Shouldn't ever see this; breakage in the main Makefile itself. |
|
return self.make_log |
|
elif self.make_state == "building": |
|
# Failure when calling the sub-make to build the code |
|
return self.build_log |
|
elif self.make_state == "running": |
|
# Failure in sub-make for "make run", qemu probably failed to start |
|
return self.run_log |
|
elif self.make_state == "finished": |
|
# Execution handler finished, but timed out or otherwise wasn't successful |
|
return self.handler_log |
|
|
|
def fail(self, reason): |
|
self.failed = True |
|
self.finished = True |
|
self.reason = reason |
|
|
|
def success(self): |
|
self.finished = True |
|
|
|
def __str__(self): |
|
if self.finished: |
|
if self.failed: |
|
return "[%s] failed (%s: see %s)" % (self.name, self.reason, |
|
self.get_error_log()) |
|
else: |
|
return "[%s] passed" % self.name |
|
else: |
|
return "[%s] in progress (%s)" % (self.name, self.make_state) |
|
|
|
|
|
class MakeGenerator: |
|
"""Generates a Makefile which just calls a bunch of sub-make sessions |
|
|
|
In any given test suite we may need to build dozens if not hundreds of |
|
test cases. The cleanest way to parallelize this is to just let Make |
|
do the parallelization, sharing the jobserver among all the different |
|
sub-make targets. |
|
""" |
|
|
|
GOAL_HEADER_TMPL = """.PHONY: {goal} |
|
{goal}: |
|
""" |
|
|
|
MAKE_RULE_TMPL = """\t@echo sanity_test_{phase} {goal} >&2 |
|
\tcmake \\ |
|
\t\t-G"{generator}"\\ |
|
\t\t-H{directory}\\ |
|
\t\t-B{outdir}\\ |
|
\t\t-DEXTRA_CFLAGS="-Werror {cflags}"\\ |
|
\t\t-DEXTRA_AFLAGS=-Wa,--fatal-warnings\\ |
|
\t\t-DEXTRA_LDFLAGS="{ldflags}"\\ |
|
\t\t{args}\\ |
|
\t\t>{logfile} 2>&1 |
|
\t{generator_cmd} -C {outdir}\\ |
|
\t\t{verb} {make_args}\\ |
|
\t\t>>{logfile} 2>&1 |
|
""" |
|
MAKE_RULE_TMPL_RUN = """\t@echo sanity_test_{phase} {goal} >&2 |
|
\t{generator_cmd} -C {outdir}\\ |
|
\t\t{verb} {make_args}\\ |
|
\t\t>>{logfile} 2>&1 |
|
""" |
|
|
|
GOAL_FOOTER_TMPL = "\t@echo sanity_test_finished {goal} >&2\n\n" |
|
|
|
re_make = re.compile( |
|
"sanity_test_([A-Za-z0-9]+) (.+)|$|make[:] \*\*\* \[(.+:.+: )?(.+)\] Error.+$") |
|
|
|
def __init__(self, base_outdir): |
|
"""MakeGenerator constructor |
|
|
|
@param base_outdir Intended to be the base out directory. A make.log |
|
file will be created here which contains the output of the |
|
top-level Make session, as well as the dynamic control Makefile |
|
@param verbose If true, pass V=1 to all the sub-makes which greatly |
|
increases their verbosity |
|
""" |
|
self.goals = {} |
|
if not os.path.exists(base_outdir): |
|
os.makedirs(base_outdir) |
|
self.logfile = os.path.join(base_outdir, "make.log") |
|
self.makefile = os.path.join(base_outdir, "Makefile") |
|
self.asserts = options.enable_asserts |
|
self.deprecations = options.error_on_deprecations |
|
|
|
def _get_rule_header(self, name): |
|
return MakeGenerator.GOAL_HEADER_TMPL.format(goal=name) |
|
|
|
def _get_sub_make(self, name, phase, workdir, outdir, |
|
logfile, args, make_args=""): |
|
""" |
|
@param args Arguments given to CMake |
|
@param make_args Arguments given to the Makefile generated by CMake |
|
""" |
|
args = " ".join(["-D{}".format(a) for a in args]) |
|
ldflags = "" |
|
|
|
if self.asserts: |
|
cflags = "-DCONFIG_ASSERT=1 -D__ASSERT_ON=2" |
|
else: |
|
cflags = "" |
|
|
|
if self.deprecations: |
|
cflags = cflags + " -Wno-deprecated-declarations" |
|
|
|
ldflags="-Wl,--fatal-warnings" |
|
|
|
if options.ninja: |
|
generator = "Ninja" |
|
generator_cmd = "ninja -j1" |
|
verb = "-v" if VERBOSE else "" |
|
else: |
|
generator = "Unix Makefiles" |
|
generator_cmd = "$(MAKE)" |
|
verb = "VERBOSE=1" if VERBOSE else "VERBOSE=0" |
|
|
|
if phase == 'running': |
|
return MakeGenerator.MAKE_RULE_TMPL_RUN.format( |
|
generator_cmd=generator_cmd, |
|
phase=phase, |
|
goal=name, |
|
outdir=outdir, |
|
verb=verb, |
|
logfile=logfile, |
|
make_args=make_args |
|
) |
|
else: |
|
return MakeGenerator.MAKE_RULE_TMPL.format( |
|
generator=generator, |
|
generator_cmd=generator_cmd, |
|
phase=phase, |
|
goal=name, |
|
outdir=outdir, |
|
cflags=cflags, |
|
ldflags=ldflags, |
|
directory=workdir, |
|
verb=verb, |
|
args=args, |
|
logfile=logfile, |
|
make_args=make_args |
|
) |
|
|
|
def _get_rule_footer(self, name): |
|
return MakeGenerator.GOAL_FOOTER_TMPL.format(goal=name) |
|
|
|
def _add_goal(self, outdir): |
|
if not os.path.exists(outdir): |
|
os.makedirs(outdir) |
|
|
|
def add_instance_build_goal(self, instance, args, buildlog, make_args=""): |
|
|
|
self.add_build_goal(instance.name, instance.test.code_location, |
|
instance.outdir, args, buildlog, make_args) |
|
|
|
def add_build_goal(self, name, directory, outdir, |
|
args, buildlog, make_args=""): |
|
"""Add a goal to invoke a Kbuild session |
|
|
|
@param name A unique string name for this build goal. The results |
|
dictionary returned by execute() will be keyed by this name. |
|
@param directory Absolute path to working directory, will be passed |
|
to make -C |
|
@param outdir Absolute path to output directory, will be passed to |
|
cmake via -B=<path> |
|
@param args Extra command line arguments to pass to 'cmake', typically |
|
environment variables or specific Make goals |
|
""" |
|
self._add_goal(outdir) |
|
build_logfile = os.path.join(outdir, buildlog) |
|
text = ( |
|
self._get_rule_header(name) + |
|
self._get_sub_make( |
|
name, |
|
"building", |
|
directory, |
|
outdir, |
|
build_logfile, |
|
args, |
|
make_args=make_args) + |
|
self._get_rule_footer(name)) |
|
self.goals[name] = MakeGoal( |
|
name, |
|
text, |
|
None, |
|
self.logfile, |
|
build_logfile, |
|
None, |
|
None) |
|
|
|
def add_qemu_goal(self, instance, args): |
|
"""Add a goal to build a Zephyr project and then run it under QEMU |
|
|
|
The generated make goal invokes Make twice, the first time it will |
|
build the default goal, and the second will invoke the 'qemu' goal. |
|
The output of the QEMU session will be monitored, and terminated |
|
either upon pass/fail result of the test program, or the timeout |
|
is reached. |
|
|
|
@param name A unique string name for this build goal. The results |
|
dictionary returned by execute() will be keyed by this name. |
|
@param directory Absolute path to working directory, will be passed |
|
to make -C |
|
@param outdir Absolute path to output directory, will be passed to |
|
Kbuild via -O=<path> |
|
@param args Extra cache entries to define in CMake. |
|
@param timeout Maximum length of time QEMU session should be allowed |
|
to run before automatically killing it. Default is 30 seconds. |
|
""" |
|
|
|
name = instance.name |
|
directory = instance.test.code_location |
|
outdir = instance.outdir |
|
|
|
build_logfile = os.path.join(outdir, "build.log") |
|
run_logfile = os.path.join(outdir, "run.log") |
|
handler_logfile = os.path.join(outdir, "handler.log") |
|
self._add_goal(outdir) |
|
|
|
qemu_handler = QEMUHandler(instance) |
|
args.append("QEMU_PIPE=%s" % qemu_handler.get_fifo()) |
|
text = (self._get_rule_header(name) + |
|
self._get_sub_make(name, "building", directory, |
|
outdir, build_logfile, args) + |
|
self._get_sub_make(name, "running", directory, |
|
outdir, run_logfile, |
|
args, make_args="run") + |
|
self._get_rule_footer(name)) |
|
self.goals[name] = MakeGoal(name, text, qemu_handler, self.logfile, build_logfile, |
|
run_logfile, handler_logfile) |
|
|
|
def add_unit_goal(self, instance, args, timeout=30): |
|
outdir = instance.outdir |
|
timeout = instance.test.timeout |
|
name = instance.name |
|
directory = instance.test.code_location |
|
|
|
self._add_goal(outdir) |
|
build_logfile = os.path.join(outdir, "build.log") |
|
run_logfile = os.path.join(outdir, "run.log") |
|
handler_logfile = os.path.join(outdir, "handler.log") |
|
|
|
args += ["COVERAGE=1", "EXTRA_LDFLAGS=--coverage"] |
|
|
|
# we handle running in the UnitHandler class |
|
text = (self._get_rule_header(name) + |
|
self._get_sub_make(name, "building", directory, |
|
outdir, build_logfile, args) + |
|
self._get_rule_footer(name)) |
|
unit_handler = UnitHandler(instance) |
|
self.goals[name] = MakeGoal(name, text, unit_handler, self.logfile, build_logfile, |
|
run_logfile, handler_logfile) |
|
|
|
def add_native_goal(self, instance, args): |
|
|
|
outdir = instance.outdir |
|
timeout = instance.test.timeout |
|
name = instance.name |
|
directory = instance.test.code_location |
|
|
|
self._add_goal(outdir) |
|
build_logfile = os.path.join(outdir, "build.log") |
|
run_logfile = os.path.join(outdir, "run.log") |
|
handler_logfile = os.path.join(outdir, "handler.log") |
|
|
|
# we handle running in the NativeHandler class |
|
text = (self._get_rule_header(name) + |
|
self._get_sub_make(name, "building", directory, |
|
outdir, build_logfile, args) + |
|
self._get_rule_footer(name)) |
|
native_handler = NativeHandler(instance) |
|
self.goals[name] = MakeGoal(name, text, native_handler, self.logfile, build_logfile, |
|
run_logfile, handler_logfile) |
|
|
|
def add_device_goal(self, instance, args): |
|
|
|
outdir = instance.outdir |
|
timeout = instance.test.timeout |
|
name = instance.name |
|
directory = instance.test.code_location |
|
|
|
self._add_goal(outdir) |
|
build_logfile = os.path.join(outdir, "build.log") |
|
run_logfile = os.path.join(outdir, "run.log") |
|
handler_logfile = os.path.join(outdir, "handler.log") |
|
|
|
# we handle running in the NativeHandler class |
|
text = (self._get_rule_header(name) + |
|
self._get_sub_make(name, "building", directory, |
|
outdir, build_logfile, args) + |
|
self._get_rule_footer(name)) |
|
handler = DeviceHandler(instance) |
|
self.goals[name] = MakeGoal(name, text, handler, self.logfile, build_logfile, |
|
run_logfile, handler_logfile) |
|
|
|
def add_test_instance(self, ti, extra_args=[]): |
|
"""Add a goal to build/test a TestInstance object |
|
|
|
@param ti TestInstance object to build. The status dictionary returned |
|
by execute() will be keyed by its .name field. |
|
""" |
|
args = ti.test.extra_args[:] |
|
if len(ti.test.extra_configs) > 0: |
|
args.append("OVERLAY_CONFIG=%s" % |
|
os.path.join(ti.outdir, "overlay.conf")) |
|
|
|
args.append("BOARD={}".format(ti.platform.name)) |
|
args.extend(extra_args) |
|
|
|
do_run_slow = options.enable_slow or not ti.test.slow |
|
do_build_only = ti.build_only or options.build_only |
|
do_run = (not do_build_only) and do_run_slow |
|
|
|
if ti.platform.qemu_support and do_run: |
|
self.add_qemu_goal(ti, args) |
|
|
|
elif ti.test.type == "unit": |
|
self.add_unit_goal(ti, args) |
|
|
|
elif ti.platform.type == "native" and do_run: |
|
self.add_native_goal(ti, args) |
|
|
|
elif options.device_testing and (not ti.build_only) and (not options.build_only): |
|
self.add_device_goal(ti, args) |
|
|
|
else: |
|
self.add_instance_build_goal(ti, args, "build.log") |
|
|
|
def execute(self, callback_fn=None, context=None): |
|
"""Execute all the registered build goals |
|
|
|
@param callback_fn If not None, a callback function will be called |
|
as individual goals transition between states. This function |
|
should accept two parameters: a string state and an arbitrary |
|
context object, supplied here |
|
@param context Context object to pass to the callback function. |
|
Type and semantics are specific to that callback function. |
|
@return A dictionary mapping goal names to final status. |
|
""" |
|
|
|
with open(self.makefile, "wt") as tf, \ |
|
open(os.devnull, "wb") as devnull, \ |
|
open(self.logfile, "wt") as make_log: |
|
# Create our dynamic Makefile and execute it. |
|
# Watch stderr output which is where we will keep |
|
# track of build state |
|
for name, goal in self.goals.items(): |
|
tf.write(goal.text) |
|
tf.write("all: %s\n" % (" ".join(self.goals.keys()))) |
|
tf.flush() |
|
|
|
# Normally spawn 2x as many processes as CPUs. Ninja, |
|
# though, seems to have significant parallelism internally |
|
# and results in rather high loads, so use 1.5x there to |
|
# match what we see with GNU make. |
|
loadmul = 2 |
|
if options.ninja: |
|
loadmul = 1.5 |
|
|
|
cmd = ["make", "-k", "-j", |
|
str(int(CPU_COUNTS * loadmul)), "-f", tf.name, "all"] |
|
p = subprocess.Popen(cmd, stderr=subprocess.PIPE, |
|
stdout=devnull) |
|
|
|
for line in iter(p.stderr.readline, b''): |
|
line = line.decode("utf-8") |
|
make_log.write(line) |
|
verbose("MAKE: " + repr(line.strip())) |
|
m = MakeGenerator.re_make.match(line) |
|
if not m: |
|
continue |
|
|
|
state, name, _, error = m.groups() |
|
if error: |
|
goal = self.goals[error] |
|
# Sometimes QEMU will run an image and then crash out, which |
|
# will cause the 'make run' invocation to exit with |
|
# nonzero status. |
|
# Need to distinguish this case from a compilation failure. |
|
if goal.handler: |
|
goal.fail("handler_crash") |
|
else: |
|
goal.fail("build_error") |
|
else: |
|
goal = self.goals[name] |
|
goal.make_state = state |
|
|
|
if state == "finished": |
|
if goal.handler: |
|
if hasattr(goal.handler, "handle"): |
|
goal.handler.handle() |
|
goal.handler_log = goal.handler.handler_log |
|
|
|
thread_status, metrics = goal.handler.get_state() |
|
goal.metrics.update(metrics) |
|
if thread_status == "passed": |
|
goal.success() |
|
else: |
|
goal.fail(thread_status) |
|
else: |
|
goal.success() |
|
|
|
if callback_fn: |
|
callback_fn(context, self.goals, goal) |
|
|
|
p.wait() |
|
return self.goals |
|
|
|
|
|
# "list" - List of strings |
|
# "list:<type>" - List of <type> |
|
# "set" - Set of unordered, unique strings |
|
# "set:<type>" - Set of <type> |
|
# "float" - Floating point |
|
# "int" - Integer |
|
# "bool" - Boolean |
|
# "str" - String |
|
|
|
# XXX Be sure to update __doc__ if you change any of this!! |
|
|
|
platform_valid_keys = {"qemu_support": {"type": "bool", "default": False}, |
|
"supported_toolchains": {"type": "list", "default": []}} |
|
|
|
testcase_valid_keys = {"tags": {"type": "set", "required": False}, |
|
"type": {"type": "str", "default": "integration"}, |
|
"extra_args": {"type": "list"}, |
|
"extra_configs": {"type": "list"}, |
|
"build_only": {"type": "bool", "default": False}, |
|
"build_on_all": {"type": "bool", "default": False}, |
|
"skip": {"type": "bool", "default": False}, |
|
"slow": {"type": "bool", "default": False}, |
|
"timeout": {"type": "int", "default": 60}, |
|
"min_ram": {"type": "int", "default": 8}, |
|
"depends_on": {"type": "set"}, |
|
"min_flash": {"type": "int", "default": 32}, |
|
"arch_whitelist": {"type": "set"}, |
|
"arch_exclude": {"type": "set"}, |
|
"extra_sections": {"type": "list", "default": []}, |
|
"platform_exclude": {"type": "set"}, |
|
"platform_whitelist": {"type": "set"}, |
|
"toolchain_exclude": {"type": "set"}, |
|
"toolchain_whitelist": {"type": "set"}, |
|
"filter": {"type": "str"}, |
|
"harness": {"type": "str"}, |
|
"harness_config": {"type": "map"} |
|
} |
|
|
|
|
|
class SanityConfigParser: |
|
"""Class to read test case files with semantic checking |
|
""" |
|
|
|
def __init__(self, filename, schema): |
|
"""Instantiate a new SanityConfigParser object |
|
|
|
@param filename Source .yaml file to read |
|
""" |
|
self.data = scl.yaml_load_verify(filename, schema) |
|
self.filename = filename |
|
self.tests = {} |
|
self.common = {} |
|
if 'tests' in self.data: |
|
self.tests = self.data['tests'] |
|
if 'common' in self.data: |
|
self.common = self.data['common'] |
|
|
|
def _cast_value(self, value, typestr): |
|
if isinstance(value, str): |
|
v = value.strip() |
|
if typestr == "str": |
|
return v |
|
|
|
elif typestr == "float": |
|
return float(value) |
|
|
|
elif typestr == "int": |
|
return int(value) |
|
|
|
elif typestr == "bool": |
|
return value |
|
|
|
elif typestr.startswith("list") and isinstance(value, list): |
|
return value |
|
elif typestr.startswith("list") and isinstance(value, str): |
|
vs = v.split() |
|
if len(typestr) > 4 and typestr[4] == ":": |
|
return [self._cast_value(vsi, typestr[5:]) for vsi in vs] |
|
else: |
|
return vs |
|
|
|
elif typestr.startswith("set"): |
|
vs = v.split() |
|
if len(typestr) > 3 and typestr[3] == ":": |
|
return set([self._cast_value(vsi, typestr[4:]) for vsi in vs]) |
|
else: |
|
return set(vs) |
|
|
|
elif typestr.startswith("map"): |
|
return value |
|
else: |
|
raise ConfigurationError( |
|
self.filename, "unknown type '%s'" % value) |
|
|
|
def get_test(self, name, valid_keys): |
|
"""Get a dictionary representing the keys/values within a test |
|
|
|
@param name The test in the .yaml file to retrieve data from |
|
@param valid_keys A dictionary representing the intended semantics |
|
for this test. Each key in this dictionary is a key that could |
|
be specified, if a key is given in the .yaml file which isn't in |
|
here, it will generate an error. Each value in this dictionary |
|
is another dictionary containing metadata: |
|
|
|
"default" - Default value if not given |
|
"type" - Data type to convert the text value to. Simple types |
|
supported are "str", "float", "int", "bool" which will get |
|
converted to respective Python data types. "set" and "list" |
|
may also be specified which will split the value by |
|
whitespace (but keep the elements as strings). finally, |
|
"list:<type>" and "set:<type>" may be given which will |
|
perform a type conversion after splitting the value up. |
|
"required" - If true, raise an error if not defined. If false |
|
and "default" isn't specified, a type conversion will be |
|
done on an empty string |
|
@return A dictionary containing the test key-value pairs with |
|
type conversion and default values filled in per valid_keys |
|
""" |
|
|
|
d = {} |
|
for k, v in self.common.items(): |
|
d[k] = v |
|
|
|
for k, v in self.tests[name].items(): |
|
if k not in valid_keys: |
|
raise ConfigurationError( |
|
self.filename, |
|
"Unknown config key '%s' in definition for '%s'" % |
|
(k, name)) |
|
|
|
if k in d: |
|
if isinstance(d[k], str): |
|
d[k] += " " + v |
|
else: |
|
d[k] = v |
|
|
|
for k, kinfo in valid_keys.items(): |
|
if k not in d: |
|
if "required" in kinfo: |
|
required = kinfo["required"] |
|
else: |
|
required = False |
|
|
|
if required: |
|
raise ConfigurationError( |
|
self.filename, |
|
"missing required value for '%s' in test '%s'" % |
|
(k, name)) |
|
else: |
|
if "default" in kinfo: |
|
default = kinfo["default"] |
|
else: |
|
default = self._cast_value("", kinfo["type"]) |
|
d[k] = default |
|
else: |
|
try: |
|
d[k] = self._cast_value(d[k], kinfo["type"]) |
|
except ValueError as ve: |
|
raise ConfigurationError( |
|
self.filename, "bad %s value '%s' for key '%s' in name '%s'" % |
|
(kinfo["type"], d[k], k, name)) |
|
|
|
return d |
|
|
|
|
|
class Platform: |
|
"""Class representing metadata for a particular platform |
|
|
|
Maps directly to BOARD when building""" |
|
|
|
yaml_platform_schema = scl.yaml_load( |
|
os.path.join( |
|
os.environ['ZEPHYR_BASE'], |
|
"scripts", |
|
"sanity_chk", |
|
"sanitycheck-platform-schema.yaml")) |
|
|
|
def __init__(self, cfile): |
|
"""Constructor. |
|
|
|
@param cfile Path to platform configuration file, which gives |
|
info about the platform to be added. |
|
""" |
|
scp = SanityConfigParser(cfile, self.yaml_platform_schema) |
|
data = scp.data |
|
|
|
self.name = data['identifier'] |
|
# if no RAM size is specified by the board, take a default of 128K |
|
self.ram = data.get("ram", 128) |
|
testing = data.get("testing", {}) |
|
self.ignore_tags = testing.get("ignore_tags", []) |
|
self.default = testing.get("default", False) |
|
# if no flash size is specified by the board, take a default of 512K |
|
self.flash = data.get("flash", 512) |
|
self.supported = set() |
|
for supp_feature in data.get("supported", []): |
|
for item in supp_feature.split(":"): |
|
self.supported.add(item) |
|
|
|
self.qemu_support = True if data.get('simulation', "na") == 'qemu' else False |
|
self.arch = data['arch'] |
|
self.type = data.get('type', "na") |
|
self.simulation = data.get('simulation', "na") |
|
self.supported_toolchains = data.get("toolchain", []) |
|
self.defconfig = None |
|
pass |
|
|
|
def __repr__(self): |
|
return "<%s on %s>" % (self.name, self.arch) |
|
|
|
|
|
class Architecture: |
|
"""Class representing metadata for a particular architecture |
|
""" |
|
|
|
def __init__(self, name, platforms): |
|
"""Architecture constructor |
|
|
|
@param name String name for this architecture |
|
@param platforms list of platforms belonging to this architecture |
|
""" |
|
self.platforms = platforms |
|
|
|
self.name = name |
|
|
|
def __repr__(self): |
|
return "<arch %s>" % self.name |
|
|
|
|
|
class TestCase: |
|
"""Class representing a test application |
|
""" |
|
|
|
def __init__(self, testcase_root, workdir, name, tc_dict, yamlfile): |
|
"""TestCase constructor. |
|
|
|
This gets called by TestSuite as it finds and reads test yaml files. |
|
Multiple TestCase instances may be generated from a single testcase.yaml, |
|
each one corresponds to an entry within that file. |
|
|
|
We need to have a unique name for every single test case. Since |
|
a testcase.yaml can define multiple tests, the canonical name for |
|
the test case is <workdir>/<name>. |
|
|
|
@param testcase_root Absolute path to the root directory where |
|
all the test cases live |
|
@param workdir Relative path to the project directory for this |
|
test application from the test_case root. |
|
@param name Name of this test case, corresponding to the entry name |
|
in the test case configuration file. For many test cases that just |
|
define one test, can be anything and is usually "test". This is |
|
really only used to distinguish between different cases when |
|
the testcase.yaml defines multiple tests |
|
@param tc_dict Dictionary with test values for this test case |
|
from the testcase.yaml file |
|
""" |
|
self.code_location = os.path.join(testcase_root, workdir) |
|
self.id = name |
|
self.cases = [] |
|
self.type = tc_dict["type"] |
|
self.tags = tc_dict["tags"] |
|
self.extra_args = tc_dict["extra_args"] |
|
self.extra_configs = tc_dict["extra_configs"] |
|
self.arch_whitelist = tc_dict["arch_whitelist"] |
|
self.arch_exclude = tc_dict["arch_exclude"] |
|
self.skip = tc_dict["skip"] |
|
self.platform_exclude = tc_dict["platform_exclude"] |
|
self.platform_whitelist = tc_dict["platform_whitelist"] |
|
self.toolchain_exclude = tc_dict["toolchain_exclude"] |
|
self.toolchain_whitelist = tc_dict["toolchain_whitelist"] |
|
self.tc_filter = tc_dict["filter"] |
|
self.timeout = tc_dict["timeout"] |
|
self.harness = tc_dict["harness"] |
|
self.harness_config = tc_dict["harness_config"] |
|
self.build_only = tc_dict["build_only"] |
|
self.build_on_all = tc_dict["build_on_all"] |
|
self.slow = tc_dict["slow"] |
|
self.min_ram = tc_dict["min_ram"] |
|
self.depends_on = tc_dict["depends_on"] |
|
self.min_flash = tc_dict["min_flash"] |
|
self.extra_sections = tc_dict["extra_sections"] |
|
|
|
self.path = os.path.normpath(os.path.join(os.path.realpath( |
|
testcase_root).replace(os.path.realpath(ZEPHYR_BASE) + "/", ''), |
|
workdir, name)) |
|
|
|
|
|
self.name = os.path.join(self.path) |
|
self.defconfig = {} |
|
self.yamlfile = yamlfile |
|
|
|
def scan_file(self, inf_name): |
|
suite_regex = re.compile( |
|
# do not match until end-of-line, otherwise we won't allow |
|
# stc_regex below to catch the ones that are declared in the same |
|
# line--as we only search starting the end of this match |
|
br"^\s*ztest_test_suite\(\s*(?P<suite_name>[a-zA-Z0-9_]+)\s*,", |
|
re.MULTILINE) |
|
stc_regex = re.compile( |
|
br"^\s*" # empy space at the beginning is ok |
|
# catch the case where it is declared in the same sentence, e.g: |
|
# |
|
# ztest_test_suite(mutex_complex, ztest_user_unit_test(TESTNAME)); |
|
br"(?:ztest_test_suite\([a-zA-Z0-9_]+,\s*)?" |
|
# Catch ztest[_user]_unit_test-[_setup_teardown](TESTNAME) |
|
br"ztest_(?:user_)?unit_test(?:_setup_teardown)?" |
|
# Consume the argument that becomes the extra testcse |
|
br"\(\s*" |
|
br"(?P<stc_name>[a-zA-Z0-9_]+)" |
|
# _setup_teardown() variant has two extra arguments that we ignore |
|
br"(?:\s*,\s*[a-zA-Z0-9_]+\s*,\s*[a-zA-Z0-9_]+)?" |
|
br"\s*\)", |
|
# We don't check how it finishes; we don't care |
|
re.MULTILINE) |
|
suite_run_regex = re.compile( |
|
br"^\s*ztest_run_test_suite\((?P<suite_name>[a-zA-Z0-9_]+)\)", |
|
re.MULTILINE) |
|
achtung_regex = re.compile( |
|
br"(#ifdef|#endif)", |
|
re.MULTILINE) |
|
warnings = None |
|
|
|
with open(inf_name) as inf: |
|
with contextlib.closing(mmap.mmap(inf.fileno(), 0, mmap.MAP_PRIVATE, |
|
mmap.PROT_READ, 0)) as main_c: |
|
suite_regex_match = suite_regex.search(main_c) |
|
if not suite_regex_match: |
|
# can't find ztest_test_suite, maybe a client, because |
|
# it includes ztest.h |
|
return None, None |
|
|
|
suite_run_match = suite_run_regex.search(main_c) |
|
if not suite_run_match: |
|
raise ValueError("can't find ztest_run_test_suite") |
|
|
|
achtung_matches = re.findall( |
|
achtung_regex, |
|
main_c[suite_regex_match.end():suite_run_match.start()]) |
|
if achtung_matches: |
|
warnings = "found invalid %s in ztest_test_suite()" \ |
|
% ", ".join(set([ |
|
match.decode() for match in achtung_matches |
|
])) |
|
_matches = re.findall( |
|
stc_regex, |
|
main_c[suite_regex_match.end():suite_run_match.start()]) |
|
matches = [ match.decode().replace("test_", "") for match in _matches ] |
|
return matches, warnings |
|
|
|
def scan_path(self, path): |
|
subcases = [] |
|
for filename in glob.glob(os.path.join(path, "src", "*.c")): |
|
try: |
|
_subcases, warnings = self.scan_file(filename) |
|
if warnings: |
|
error("%s: %s" % (filename, warnings)) |
|
if _subcases: |
|
subcases += _subcases |
|
except ValueError as e: |
|
error("%s: can't find: %s", filename, e) |
|
return subcases |
|
|
|
|
|
def parse_subcases(self): |
|
results = self.scan_path(self.code_location) |
|
for sub in results: |
|
name = "{}.{}".format(self.id, sub) |
|
self.cases.append(name) |
|
|
|
|
|
def __str__(self): |
|
return self.name |
|
|
|
|
|
class TestInstance: |
|
"""Class representing the execution of a particular TestCase on a platform |
|
|
|
@param test The TestCase object we want to build/execute |
|
@param platform Platform object that we want to build and run against |
|
@param base_outdir Base directory for all test results. The actual |
|
out directory used is <outdir>/<platform>/<test case name> |
|
""" |
|
|
|
def __init__(self, test, platform, base_outdir): |
|
self.test = test |
|
self.platform = platform |
|
self.name = os.path.join(platform.name, test.name) |
|
self.outdir = os.path.join(base_outdir, platform.name, test.path) |
|
self.build_only = options.build_only or test.build_only or (test.harness and test.harness != 'console') |
|
self.results = {} |
|
|
|
def create_overlay(self): |
|
if len(self.test.extra_configs) > 0: |
|
file = os.path.join(self.outdir, "overlay.conf") |
|
os.makedirs(self.outdir, exist_ok=True) |
|
f = open(file, "w") |
|
content = "" |
|
content = "\n".join(self.test.extra_configs) |
|
f.write(content) |
|
f.close() |
|
|
|
def calculate_sizes(self): |
|
"""Get the RAM/ROM sizes of a test case. |
|
|
|
This can only be run after the instance has been executed by |
|
MakeGenerator, otherwise there won't be any binaries to measure. |
|
|
|
@return A SizeCalculator object |
|
""" |
|
fns = glob.glob(os.path.join(self.outdir, "zephyr", "*.elf")) |
|
fns.extend(glob.glob(os.path.join(self.outdir, "zephyr", "*.exe"))) |
|
fns = [x for x in fns if not x.endswith('_prebuilt.elf')] |
|
if (len(fns) != 1): |
|
raise BuildError("Missing/multiple output ELF binary") |
|
return SizeCalculator(fns[0], self.test.extra_sections) |
|
|
|
def __repr__(self): |
|
return "<TestCase %s on %s>" % (self.test.name, self.platform.name) |
|
|
|
|
|
def defconfig_cb(context, goals, goal): |
|
if not goal.failed: |
|
return |
|
|
|
info("%sCould not build defconfig for %s%s" % |
|
(COLOR_RED, goal.name, COLOR_NORMAL)) |
|
if INLINE_LOGS: |
|
with open(goal.get_error_log()) as fp: |
|
data = fp.read() |
|
sys.stdout.write(data) |
|
if log_file: |
|
log_file.write(data) |
|
else: |
|
info("\tsee: " + COLOR_YELLOW + goal.get_error_log() + COLOR_NORMAL) |
|
|
|
|
|
class TestSuite: |
|
config_re = re.compile('(CONFIG_[A-Za-z0-9_]+)[=]\"?([^\"]*)\"?$') |
|
|
|
yaml_tc_schema = scl.yaml_load( |
|
os.path.join(os.environ['ZEPHYR_BASE'], |
|
"scripts", "sanity_chk", "sanitycheck-tc-schema.yaml")) |
|
|
|
def __init__(self, board_root_list, testcase_roots, outdir): |
|
# Keep track of which test cases we've filtered out and why |
|
self.arches = {} |
|
self.testcases = {} |
|
self.platforms = [] |
|
self.outdir = os.path.abspath(outdir) |
|
self.instances = {} |
|
self.goals = None |
|
self.discards = None |
|
self.load_errors = 0 |
|
|
|
for testcase_root in testcase_roots: |
|
testcase_root = os.path.abspath(testcase_root) |
|
|
|
debug("Reading test case configuration files under %s..." % |
|
testcase_root) |
|
for dirpath, dirnames, filenames in os.walk(testcase_root, |
|
topdown=True): |
|
verbose("scanning %s" % dirpath) |
|
if 'sample.yaml' in filenames: |
|
filename = 'sample.yaml' |
|
elif 'testcase.yaml' in filenames: |
|
filename = 'testcase.yaml' |
|
else: |
|
continue |
|
|
|
verbose("Found possible test case in " + dirpath) |
|
dirnames[:] = [] |
|
yaml_path = os.path.join(dirpath, filename) |
|
try: |
|
parsed_data = SanityConfigParser( |
|
yaml_path, self.yaml_tc_schema) |
|
|
|
workdir = os.path.relpath(dirpath, testcase_root) |
|
|
|
for name in parsed_data.tests.keys(): |
|
tc_dict = parsed_data.get_test(name, testcase_valid_keys) |
|
tc = TestCase(testcase_root, workdir, name, tc_dict, |
|
yaml_path) |
|
tc.parse_subcases() |
|
|
|
self.testcases[tc.name] = tc |
|
|
|
except Exception as e: |
|
error("E: %s: can't load (skipping): %s" % (yaml_path, e)) |
|
self.load_errors += 1 |
|
|
|
|
|
for board_root in board_root_list: |
|
board_root = os.path.abspath(board_root) |
|
|
|
debug( |
|
"Reading platform configuration files under %s..." % |
|
board_root) |
|
for fn in glob.glob(os.path.join(board_root, "*", "*", "*.yaml")): |
|
verbose("Found plaform configuration " + fn) |
|
try: |
|
platform = Platform(fn) |
|
self.platforms.append(platform) |
|
except RuntimeError as e: |
|
error("E: %s: can't load: %s" % (fn, e)) |
|
self.load_errors += 1 |
|
|
|
arches = [] |
|
for p in self.platforms: |
|
arches.append(p.arch) |
|
for a in list(set(arches)): |
|
aplatforms = [p for p in self.platforms if p.arch == a] |
|
arch = Architecture(a, aplatforms) |
|
self.arches[a] = arch |
|
|
|
self.instances = {} |
|
|
|
def get_last_failed(self): |
|
if not os.path.exists(LAST_SANITY): |
|
raise SanityRuntimeError("Couldn't find last sanity run.") |
|
result = [] |
|
with open(LAST_SANITY, "r") as fp: |
|
cr = csv.DictReader(fp) |
|
for row in cr: |
|
if row["passed"] == "True": |
|
continue |
|
test = row["test"] |
|
platform = row["platform"] |
|
result.append((test, platform)) |
|
return result |
|
|
|
def load_from_file(self, file): |
|
if not os.path.exists(file): |
|
raise SanityRuntimeError( |
|
"Couldn't find input file with list of tests.") |
|
with open(file, "r") as fp: |
|
cr = csv.reader(fp) |
|
instance_list = [] |
|
for row in cr: |
|
name = os.path.join(row[0], row[1]) |
|
platforms = self.arches[row[3]].platforms |
|
myp = None |
|
for p in platforms: |
|
if p.name == row[2]: |
|
myp = p |
|
break |
|
instance = TestInstance(self.testcases[name], myp, self.outdir) |
|
instance.create_overlay() |
|
instance_list.append(instance) |
|
self.add_instances(instance_list) |
|
|
|
def apply_filters(self): |
|
|
|
toolchain = os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT", None) or \ |
|
os.environ.get("ZEPHYR_GCC_VARIANT", None) |
|
if not toolchain: |
|
raise SanityRuntimeError("E: Variable ZEPHYR_TOOLCHAIN_VARIANT is not defined") |
|
|
|
|
|
instances = [] |
|
discards = {} |
|
platform_filter = options.platform |
|
last_failed = options.only_failed |
|
testcase_filter = options.test |
|
arch_filter = options.arch |
|
tag_filter = options.tag |
|
exclude_tag = options.exclude_tag |
|
config_filter = options.config |
|
extra_args = options.extra_args |
|
all_plats = options.all |
|
|
|
verbose("platform filter: " + str(platform_filter)) |
|
verbose(" arch_filter: " + str(arch_filter)) |
|
verbose(" tag_filter: " + str(tag_filter)) |
|
verbose(" exclude_tag: " + str(exclude_tag)) |
|
verbose(" config_filter: " + str(config_filter)) |
|
|
|
if last_failed: |
|
failed_tests = self.get_last_failed() |
|
|
|
default_platforms = False |
|
|
|
if all_plats: |
|
info("Selecting all possible platforms per test case") |
|
# When --all used, any --platform arguments ignored |
|
platform_filter = [] |
|
elif not platform_filter: |
|
info("Selecting default platforms per test case") |
|
default_platforms = True |
|
|
|
mg = MakeGenerator(self.outdir) |
|
dlist = {} |
|
for tc_name, tc in self.testcases.items(): |
|
for arch_name, arch in self.arches.items(): |
|
for plat in arch.platforms: |
|
instance = TestInstance(tc, plat, self.outdir) |
|
|
|
if (arch_name == "unit") != (tc.type == "unit"): |
|
continue |
|
|
|
if tc.build_on_all and not platform_filter: |
|
platform_filter = [] |
|
|
|
if tc.skip: |
|
continue |
|
|
|
if tag_filter and not tc.tags.intersection(tag_filter): |
|
continue |
|
|
|
if exclude_tag and tc.tags.intersection(exclude_tag): |
|
continue |
|
|
|
if testcase_filter and tc_name not in testcase_filter: |
|
continue |
|
|
|
if last_failed and ( |
|
tc.name, plat.name) not in failed_tests: |
|
continue |
|
|
|
if arch_filter and arch_name not in arch_filter: |
|
continue |
|
|
|
if tc.arch_whitelist and arch.name not in tc.arch_whitelist: |
|
continue |
|
|
|
if tc.arch_exclude and arch.name in tc.arch_exclude: |
|
continue |
|
|
|
if tc.platform_exclude and plat.name in tc.platform_exclude: |
|
continue |
|
|
|
if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: |
|
continue |
|
|
|
if platform_filter and plat.name not in platform_filter: |
|
continue |
|
|
|
if plat.ram < tc.min_ram: |
|
continue |
|
|
|
if set(plat.ignore_tags) & tc.tags: |
|
continue |
|
|
|
if tc.depends_on: |
|
dep_intersection = tc.depends_on.intersection( |
|
set(plat.supported)) |
|
if dep_intersection != set(tc.depends_on): |
|
continue |
|
|
|
if plat.flash < tc.min_flash: |
|
continue |
|
|
|
if tc.platform_whitelist and plat.name not in tc.platform_whitelist: |
|
continue |
|
|
|
if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist: |
|
continue |
|
|
|
if (tc.tc_filter and (plat.default or all_plats or platform_filter) |
|
and toolchain in plat.supported_toolchains): |
|
args = tc.extra_args[:] |
|
args.append("BOARD={}".format(plat.name)) |
|
args.extend(extra_args) |
|
# FIXME would be nice to use a common outdir for this so that |
|
# conf, gen_idt, etc aren't rebuilt for every combination, |
|
# need a way to avoid different Make processes from clobbering |
|
# each other since they all try to build them |
|
# simultaneously |
|
|
|
o = os.path.join(self.outdir, plat.name, tc.path) |
|
dlist[tc, plat, tc.name.split( |
|
"/")[-1]] = os.path.join(o, "zephyr", ".config") |
|
goal = "_".join([plat.name, "_".join( |
|
tc.name.split("/")), "config-sanitycheck"]) |
|
mg.add_build_goal(goal, |
|
os.path.join(ZEPHYR_BASE, tc.code_location), |
|
o, args, |
|
"config-sanitycheck.log", make_args="config-sanitycheck") |
|
|
|
info("Building testcase defconfigs...") |
|
results = mg.execute(defconfig_cb) |
|
|
|
for name, goal in results.items(): |
|
if goal.failed: |
|
raise SanityRuntimeError("Couldn't build some defconfigs") |
|
|
|
for k, out_config in dlist.items(): |
|
test, plat, name = k |
|
defconfig = {} |
|
with open(out_config, "r") as fp: |
|
for line in fp.readlines(): |
|
m = TestSuite.config_re.match(line) |
|
if not m: |
|
if line.strip() and not line.startswith("#"): |
|
sys.stderr.write("Unrecognized line %s\n" % line) |
|
continue |
|
defconfig[m.group(1)] = m.group(2).strip() |
|
test.defconfig[plat] = defconfig |
|
|
|
for tc_name, tc in self.testcases.items(): |
|
for arch_name, arch in self.arches.items(): |
|
instance_list = [] |
|
for plat in arch.platforms: |
|
instance = TestInstance(tc, plat, self.outdir) |
|
|
|
if (arch_name == "unit") != (tc.type == "unit"): |
|
# Discard silently |
|
continue |
|
|
|
if tc.skip: |
|
discards[instance] = "Skip filter" |
|
continue |
|
|
|
if tc.build_on_all and not platform_filter: |
|
platform_filter = [] |
|
|
|
if tag_filter and not tc.tags.intersection(tag_filter): |
|
discards[instance] = "Command line testcase tag filter" |
|
continue |
|
|
|
if exclude_tag and tc.tags.intersection(exclude_tag): |
|
discards[instance] = "Command line testcase exclude filter" |
|
continue |
|
|
|
if testcase_filter and tc_name not in testcase_filter: |
|
discards[instance] = "Testcase name filter" |
|
continue |
|
|
|
if last_failed and ( |
|
tc.name, plat.name) not in failed_tests: |
|
discards[instance] = "Passed or skipped during last run" |
|
continue |
|
|
|
if arch_filter and arch_name not in arch_filter: |
|
discards[instance] = "Command line testcase arch filter" |
|
continue |
|
|
|
if tc.arch_whitelist and arch.name not in tc.arch_whitelist: |
|
discards[instance] = "Not in test case arch whitelist" |
|
continue |
|
|
|
if tc.arch_exclude and arch.name in tc.arch_exclude: |
|
discards[instance] = "In test case arch exclude" |
|
continue |
|
|
|
if tc.platform_exclude and plat.name in tc.platform_exclude: |
|
discards[instance] = "In test case platform exclude" |
|
continue |
|
|
|
if tc.toolchain_exclude and toolchain in tc.toolchain_exclude: |
|
discards[instance] = "In test case toolchain exclude" |
|
continue |
|
|
|
if platform_filter and plat.name not in platform_filter: |
|
discards[instance] = "Command line platform filter" |
|
continue |
|
|
|
if tc.platform_whitelist and plat.name not in tc.platform_whitelist: |
|
discards[instance] = "Not in testcase platform whitelist" |
|
continue |
|
|
|
if tc.toolchain_whitelist and toolchain not in tc.toolchain_whitelist: |
|
discards[instance] = "Not in testcase toolchain whitelist" |
|
continue |
|
|
|
if toolchain and toolchain not in plat.supported_toolchains and tc.type != 'unit': |
|
discards[instance] = "Not supported by the toolchain" |
|
continue |
|
|
|
if plat.ram < tc.min_ram: |
|
discards[instance] = "Not enough RAM" |
|
continue |
|
|
|
if tc.depends_on: |
|
dep_intersection = tc.depends_on.intersection( |
|
set(plat.supported)) |
|
if dep_intersection != set(tc.depends_on): |
|
discards[instance] = "No hardware support" |
|
continue |
|
|
|
if plat.flash < tc.min_flash: |
|
discards[instance] = "Not enough FLASH" |
|
continue |
|
|
|
if set(plat.ignore_tags) & tc.tags: |
|
discards[instance] = "Excluded tags per platform" |
|
continue |
|
|
|
defconfig = { |
|
"ASSERT": 1 if options.enable_asserts else 0, |
|
"ARCH": arch.name, |
|
"PLATFORM": plat.name |
|
} |
|
defconfig.update(os.environ) |
|
for p, tdefconfig in tc.defconfig.items(): |
|
if p == plat: |
|
defconfig.update(tdefconfig) |
|
break |
|
|
|
if tc.tc_filter: |
|
try: |
|
res = expr_parser.parse(tc.tc_filter, defconfig) |
|
except (ValueError, SyntaxError) as se: |
|
sys.stderr.write( |
|
"Failed processing %s\n" % tc.yamlfile) |
|
raise se |
|
if not res: |
|
discards[instance] = ( |
|
"defconfig doesn't satisfy expression '%s'" % |
|
tc.tc_filter) |
|
continue |
|
|
|
instance_list.append(instance) |
|
|
|
if not instance_list: |
|
# Every platform in this arch was rejected already |
|
continue |
|
|
|
if default_platforms and not tc.build_on_all: |
|
if not tc.platform_whitelist: |
|
instances = list( |
|
filter( |
|
lambda tc: tc.platform.default, |
|
instance_list)) |
|
self.add_instances(instances) |
|
else: |
|
self.add_instances(instance_list[:1]) |
|
|
|
for instance in list( |
|
filter(lambda tc: not tc.platform.default, instance_list)): |
|
discards[instance] = "Not a default test platform" |
|
else: |
|
self.add_instances(instance_list) |
|
|
|
for name, case in self.instances.items(): |
|
case.create_overlay() |
|
|
|
self.discards = discards |
|
return discards |
|
|
|
def add_instances(self, ti_list): |
|
for ti in ti_list: |
|
self.instances[ti.name] = ti |
|
|
|
def execute(self, cb, cb_context): |
|
|
|
def calc_one_elf_size(name, goal): |
|
if not goal.failed: |
|
i = self.instances[name] |
|
sc = i.calculate_sizes() |
|
goal.metrics["ram_size"] = sc.get_ram_size() |
|
goal.metrics["rom_size"] = sc.get_rom_size() |
|
goal.metrics["unrecognized"] = sc.unrecognized_sections() |
|
|
|
mg = MakeGenerator(self.outdir) |
|
for i in self.instances.values(): |
|
mg.add_test_instance(i, options.extra_args) |
|
self.goals = mg.execute(cb, cb_context) |
|
|
|
# Parallelize size calculation |
|
executor = concurrent.futures.ThreadPoolExecutor(CPU_COUNTS) |
|
futures = [executor.submit(calc_one_elf_size, name, goal) |
|
for name, goal in self.goals.items()] |
|
concurrent.futures.wait(futures) |
|
|
|
return self.goals |
|
|
|
def run_report(self, filename): |
|
with open(filename, "at") as csvfile: |
|
fieldnames = ['path', 'test', 'platform', 'arch'] |
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
|
for instance in self.instances.values(): |
|
rowdict = { |
|
"path": os.path.dirname(instance.test.name), |
|
"test": os.path.basename(instance.test.name), |
|
"platform": instance.platform.name, |
|
"arch": instance.platform.arch |
|
} |
|
cw.writerow(rowdict) |
|
|
|
def discard_report(self, filename): |
|
if self.discards is None: |
|
raise SanityRuntimeError("apply_filters() hasn't been run!") |
|
|
|
with open(filename, "wt") as csvfile: |
|
fieldnames = ["test", "arch", "platform", "reason"] |
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
|
cw.writeheader() |
|
for instance, reason in self.discards.items(): |
|
rowdict = {"test": instance.test.name, |
|
"arch": instance.platform.arch, |
|
"platform": instance.platform.name, |
|
"reason": reason} |
|
cw.writerow(rowdict) |
|
|
|
def compare_metrics(self, filename): |
|
# name, datatype, lower results better |
|
interesting_metrics = [("ram_size", int, True), |
|
("rom_size", int, True)] |
|
|
|
if self.goals is None: |
|
raise SanityRuntimeError("execute() hasn't been run!") |
|
|
|
if not os.path.exists(filename): |
|
info("Cannot compare metrics, %s not found" % filename) |
|
return [] |
|
|
|
results = [] |
|
saved_metrics = {} |
|
with open(filename) as fp: |
|
cr = csv.DictReader(fp) |
|
for row in cr: |
|
d = {} |
|
for m, _, _ in interesting_metrics: |
|
d[m] = row[m] |
|
saved_metrics[(row["test"], row["platform"])] = d |
|
|
|
for name, goal in self.goals.items(): |
|
i = self.instances[name] |
|
mkey = (i.test.name, i.platform.name) |
|
if mkey not in saved_metrics: |
|
continue |
|
sm = saved_metrics[mkey] |
|
for metric, mtype, lower_better in interesting_metrics: |
|
if metric not in goal.metrics: |
|
continue |
|
if sm[metric] == "": |
|
continue |
|
delta = goal.metrics[metric] - mtype(sm[metric]) |
|
if delta == 0: |
|
continue |
|
results.append((i, metric, goal.metrics[metric], delta, |
|
lower_better)) |
|
return results |
|
|
|
|
|
|
|
def encode_for_xml(self, unicode_data, encoding='ascii'): |
|
unicode_data = unicode_data.replace('\x00', '') |
|
return unicode_data |
|
|
|
def testcase_target_report(self, report_file): |
|
|
|
run = "Sanitycheck" |
|
eleTestsuite = None |
|
append = options.only_failed |
|
|
|
errors = 0 |
|
passes = 0 |
|
fails = 0 |
|
duration = 0 |
|
skips = 0 |
|
|
|
for identifier, ti in self.instances.items(): |
|
for k in ti.results.keys(): |
|
if ti.results[k] == 'PASS': |
|
passes += 1 |
|
elif ti.results[k] == 'BLOCK': |
|
errors += 1 |
|
elif ti.results[k] == 'SKIP': |
|
skips += 1 |
|
else: |
|
fails += 1 |
|
|
|
eleTestsuites = ET.Element('testsuites') |
|
eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', |
|
name=run, time="%d" % duration, |
|
tests="%d" % (errors + passes + fails), |
|
failures="%d" % fails, |
|
errors="%d" % errors, skipped="%d" %skips) |
|
|
|
handler_time = "0" |
|
|
|
# print out test results |
|
for identifier, ti in self.instances.items(): |
|
for k in ti.results.keys(): |
|
|
|
eleTestcase = ET.SubElement( |
|
eleTestsuite, 'testcase', classname="%s:%s" %(ti.platform.name, os.path.basename(ti.test.name)), |
|
name="%s" % (k), time=handler_time) |
|
if ti.results[k] in ['FAIL', 'BLOCK']: |
|
el = None |
|
|
|
if ti.results[k] == 'FAIL': |
|
el = ET.SubElement( |
|
eleTestcase, |
|
'failure', |
|
type="failure", |
|
message="failed") |
|
elif ti.results[k] == 'BLOCK': |
|
el = ET.SubElement( |
|
eleTestcase, |
|
'error', |
|
type="failure", |
|
message="failed") |
|
p = os.path.join(options.outdir, ti.platform.name, ti.test.name) |
|
bl = os.path.join(p, "handler.log") |
|
|
|
if os.path.exists(bl): |
|
with open(bl, "rb") as f: |
|
log = f.read().decode("utf-8") |
|
el.text = self.encode_for_xml(log) |
|
|
|
elif ti.results[k] == 'SKIP': |
|
el = ET.SubElement( |
|
eleTestcase, |
|
'skipped') |
|
|
|
result = ET.tostring(eleTestsuites) |
|
f = open(report_file, 'wb') |
|
f.write(result) |
|
f.close() |
|
|
|
|
|
def testcase_xunit_report(self, filename, duration): |
|
if self.goals is None: |
|
raise SanityRuntimeError("execute() hasn't been run!") |
|
|
|
fails = 0 |
|
passes = 0 |
|
errors = 0 |
|
|
|
for name, goal in self.goals.items(): |
|
if goal.failed: |
|
if goal.reason in ['build_error', 'handler_crash']: |
|
errors += 1 |
|
else: |
|
fails += 1 |
|
else: |
|
passes += 1 |
|
|
|
run = "Sanitycheck" |
|
eleTestsuite = None |
|
append = options.only_failed |
|
|
|
if os.path.exists(filename) and append: |
|
tree = ET.parse(filename) |
|
eleTestsuites = tree.getroot() |
|
eleTestsuite = tree.findall('testsuite')[0] |
|
else: |
|
eleTestsuites = ET.Element('testsuites') |
|
eleTestsuite = ET.SubElement(eleTestsuites, 'testsuite', |
|
name=run, time="%d" % duration, |
|
tests="%d" % (errors + passes + fails), |
|
failures="%d" % fails, |
|
errors="%d" % errors, skip="0") |
|
|
|
handler_time = "0" |
|
for name, goal in self.goals.items(): |
|
|
|
i = self.instances[name] |
|
if append: |
|
for tc in eleTestsuite.findall('testcase'): |
|
if tc.get('classname') == "%s:%s" % ( |
|
i.platform.name, i.test.name): |
|
eleTestsuite.remove(tc) |
|
|
|
if not goal.failed and goal.handler: |
|
handler_time = "%s" %(goal.metrics["handler_time"]) |
|
|
|
eleTestcase = ET.SubElement( |
|
eleTestsuite, 'testcase', classname="%s:%s" % |
|
(i.platform.name, i.test.name), name="%s" % |
|
(name), time=handler_time) |
|
if goal.failed: |
|
failure = ET.SubElement( |
|
eleTestcase, |
|
'failure', |
|
type="failure", |
|
message=goal.reason) |
|
p = ("%s/%s/%s" % (options.outdir, i.platform.name, i.test.name)) |
|
bl = os.path.join(p, "build.log") |
|
if goal.reason != 'build_error': |
|
bl = os.path.join(p, "handler.log") |
|
|
|
if os.path.exists(bl): |
|
with open(bl, "rb") as f: |
|
log = f.read().decode("utf-8") |
|
failure.text = log |
|
|
|
result = ET.tostring(eleTestsuites) |
|
f = open(filename, 'wb') |
|
f.write(result) |
|
f.close() |
|
|
|
def testcase_report(self, filename): |
|
if self.goals is None: |
|
raise SanityRuntimeError("execute() hasn't been run!") |
|
|
|
with open(filename, "wt") as csvfile: |
|
fieldnames = ["test", "arch", "platform", "passed", "status", |
|
"extra_args", "qemu", "handler_time", "ram_size", |
|
"rom_size"] |
|
cw = csv.DictWriter(csvfile, fieldnames, lineterminator=os.linesep) |
|
cw.writeheader() |
|
for name, goal in self.goals.items(): |
|
i = self.instances[name] |
|
rowdict = {"test": i.test.name, |
|
"arch": i.platform.arch, |
|
"platform": i.platform.name, |
|
"extra_args": " ".join(i.test.extra_args), |
|
"qemu": i.platform.qemu_support} |
|
if goal.failed: |
|
rowdict["passed"] = False |
|
rowdict["status"] = goal.reason |
|
else: |
|
rowdict["passed"] = True |
|
if goal.handler: |
|
rowdict["handler_time"] = goal.metrics["handler_time"] |
|
rowdict["ram_size"] = goal.metrics["ram_size"] |
|
rowdict["rom_size"] = goal.metrics["rom_size"] |
|
cw.writerow(rowdict) |
|
|
|
|
|
def parse_arguments(): |
|
|
|
parser = argparse.ArgumentParser( |
|
description=__doc__, |
|
formatter_class=argparse.RawDescriptionHelpFormatter) |
|
parser.fromfile_prefix_chars = "+" |
|
|
|
parser.add_argument( |
|
"-p", "--platform", action="append", |
|
help="Platform filter for testing. This option may be used multiple " |
|
"times. Testcases will only be built/run on the platforms " |
|
"specified. If this option is not used, then platforms marked " |
|
"as default in the platform metadata file will be chosen " |
|
"to build and test. ") |
|
parser.add_argument( |
|
"-a", "--arch", action="append", |
|
help="Arch filter for testing. Takes precedence over --platform. " |
|
"If unspecified, test all arches. Multiple invocations " |
|
"are treated as a logical 'or' relationship") |
|
parser.add_argument( |
|
"-t", "--tag", action="append", |
|
help="Specify tags to restrict which tests to run by tag value. " |
|
"Default is to not do any tag filtering. Multiple invocations " |
|
"are treated as a logical 'or' relationship") |
|
parser.add_argument("-e", "--exclude-tag", action="append", |
|
help="Specify tags of tests that should not run. " |
|
"Default is to run all tests with all tags.") |
|
parser.add_argument( |
|
"-f", |
|
"--only-failed", |
|
action="store_true", |
|
help="Run only those tests that failed the previous sanity check " |
|
"invocation.") |
|
parser.add_argument( |
|
"-c", "--config", action="append", |
|
help="Specify platform configuration values filtering. This can be " |
|
"specified two ways: <config>=<value> or just <config>. The " |
|
"defconfig for all platforms will be " |
|
"checked. For the <config>=<value> case, only match defconfig " |
|
"that have that value defined. For the <config> case, match " |
|
"defconfig that have that value assigned to any value. " |
|
"Prepend a '!' to invert the match.") |
|
parser.add_argument( |
|
"-s", "--test", action="append", |
|
help="Run only the specified test cases. These are named by " |
|
"<path to test project relative to " |
|
"--testcase-root>/<testcase.yaml section name>") |
|
parser.add_argument( |
|
"-l", "--all", action="store_true", |
|
help="Build/test on all platforms. Any --platform arguments " |
|
"ignored.") |
|
|
|
parser.add_argument( |
|
"-o", "--testcase-report", |
|
help="Output a CSV spreadsheet containing results of the test run") |
|
parser.add_argument( |
|
"-d", "--discard-report", |
|
help="Output a CSV spreadsheet showing tests that were skipped " |
|
"and why") |
|
parser.add_argument("--compare-report", |
|
help="Use this report file for size comparison") |
|
|
|
parser.add_argument( |
|
"-B", "--subset", |
|
help="Only run a subset of the tests, 1/4 for running the first 25%%, " |
|
"3/5 means run the 3rd fifth of the total. " |
|
"This option is useful when running a large number of tests on " |
|
"different hosts to speed up execution time.") |
|
|
|
parser.add_argument( |
|
"-N", "--ninja", action="store_true", |
|
help="Use the Ninja generator with CMake") |
|
|
|
parser.add_argument( |
|
"-y", "--dry-run", action="store_true", |
|
help="Create the filtered list of test cases, but don't actually " |
|
"run them. Useful if you're just interested in " |
|
"--discard-report") |
|
|
|
parser.add_argument("--list-tags", action="store_true", |
|
help="list all tags in selected tests") |
|
|
|
parser.add_argument("--list-tests", action="store_true", |
|
help="list all tests.") |
|
|
|
parser.add_argument("--detailed-report", |
|
action="store", |
|
metavar="FILENAME", |
|
help="Generate a junit report with detailed testcase results.") |
|
|
|
parser.add_argument( |
|
"-r", "--release", action="store_true", |
|
help="Update the benchmark database with the results of this test " |
|
"run. Intended to be run by CI when tagging an official " |
|
"release. This database is used as a basis for comparison " |
|
"when looking for deltas in metrics such as footprint") |
|
parser.add_argument("-w", "--warnings-as-errors", action="store_true", |
|
help="Treat warning conditions as errors") |
|
parser.add_argument( |
|
"-v", |
|
"--verbose", |
|
action="count", |
|
default=0, |
|
help="Emit debugging information, call multiple times to increase " |
|
"verbosity") |
|
parser.add_argument( |
|
"-i", "--inline-logs", action="store_true", |
|
help="Upon test failure, print relevant log data to stdout " |
|
"instead of just a path to it") |
|
parser.add_argument("--log-file", metavar="FILENAME", action="store", |
|
help="log also to file") |
|
parser.add_argument( |
|
"-m", "--last-metrics", action="store_true", |
|
help="Instead of comparing metrics from the last --release, " |
|
"compare with the results of the previous sanity check " |
|
"invocation") |
|
parser.add_argument( |
|
"-u", |
|
"--no-update", |
|
action="store_true", |
|
help="do not update the results of the last run of the sanity " |
|
"checks") |
|
parser.add_argument( |
|
"-F", |
|
"--load-tests", |
|
metavar="FILENAME", |
|
action="store", |
|
help="Load list of tests to be run from file.") |
|
|
|
parser.add_argument( |
|
"-E", |
|
"--save-tests", |
|
metavar="FILENAME", |
|
action="store", |
|
help="Save list of tests to be run to file.") |
|
|
|
parser.add_argument( |
|
"-b", "--build-only", action="store_true", |
|
help="Only build the code, do not execute any of it in QEMU") |
|
parser.add_argument( |
|
"-j", "--jobs", type=int, |
|
help="Number of cores to use when building, defaults to " |
|
"number of CPUs * 2") |
|
|
|
parser.add_argument( |
|
"--device-testing", action="store_true", |
|
help="Test on device directly. You need to specify the serial" |
|
"using --device-serial option") |
|
parser.add_argument( |
|
"--device-serial", |
|
help="Serial device the board can be accessed through") |
|
parser.add_argument( |
|
"--show-footprint", action="store_true", |
|
help="Show footprint statistics and deltas since last release." |
|
) |
|
parser.add_argument( |
|
"-H", "--footprint-threshold", type=float, default=5, |
|
help="When checking test case footprint sizes, warn the user if " |
|
"the new app size is greater then the specified percentage " |
|
"from the last release. Default is 5. 0 to warn on any " |
|
"increase on app size") |
|
parser.add_argument( |
|
"-D", "--all-deltas", action="store_true", |
|
help="Show all footprint deltas, positive or negative. Implies " |
|
"--footprint-threshold=0") |
|
parser.add_argument( |
|
"-O", "--outdir", |
|
default="%s/sanity-out" % ZEPHYR_BASE, |
|
help="Output directory for logs and binaries. " |
|
"This directory will be deleted unless '--no-clean' is set.") |
|
parser.add_argument( |
|
"-n", "--no-clean", action="store_true", |
|
help="Do not delete the outdir before building. Will result in " |
|
"faster compilation since builds will be incremental") |
|
parser.add_argument( |
|
"-T", "--testcase-root", action="append", default=[], |
|
help="Base directory to recursively search for test cases. All " |
|
"testcase.yaml files under here will be processed. May be " |
|
"called multiple times. Defaults to the 'samples' and " |
|
"'tests' directories in the Zephyr tree.") |
|
board_root_list = ["%s/boards" % ZEPHYR_BASE, |
|
"%s/scripts/sanity_chk/boards" % ZEPHYR_BASE] |
|
parser.add_argument( |
|
"-A", "--board-root", action="append", default=board_root_list, |
|
help="Directory to search for board configuration files. All .yaml " |
|
"files in the directory will be processed.") |
|
parser.add_argument( |
|
"-z", "--size", action="append", |
|
help="Don't run sanity checks. Instead, produce a report to " |
|
"stdout detailing RAM/ROM sizes on the specified filenames. " |
|
"All other command line arguments ignored.") |
|
parser.add_argument( |
|
"-S", "--enable-slow", action="store_true", |
|
help="Execute time-consuming test cases that have been marked " |
|
"as 'slow' in testcase.yaml. Normally these are only built.") |
|
parser.add_argument("-R", "--enable-asserts", action="store_true", |
|
default=True, |
|
help="Build all test cases with assertions enabled. " |
|
"Default to assertions being enabled.") |
|
parser.add_argument("--disable-asserts", action="store_false", |
|
dest="enable_asserts", |
|
help="Build all test cases with assertions disabled.") |
|
parser.add_argument("-Q", "--error-on-deprecations", action="store_false", |
|
help="Error on deprecation warnings.") |
|
|
|
parser.add_argument( |
|
"-x", "--extra-args", action="append", default=[], |
|
help="""Extra CMake cache entries to define when building test cases. |
|
May be called multiple times. The key-value entries will be |
|
prefixed with -D before being passed to CMake. |
|
|
|
E.g |
|
"sanitycheck -x=USE_CCACHE=0" |
|
will translate to |
|
"cmake -DUSE_CCACHE=0" |
|
|
|
which will ultimately disable ccache. |
|
""" |
|
) |
|
|
|
parser.add_argument("-C", "--coverage", action="store_true", |
|
help="Generate coverage report for unit tests, and" |
|
" tests and samples run in native_posix.") |
|
|
|
return parser.parse_args() |
|
|
|
|
|
def log_info(filename): |
|
filename = os.path.relpath(os.path.realpath(filename)) |
|
if INLINE_LOGS: |
|
info("{:-^100}".format(filename)) |
|
|
|
try: |
|
with open(filename) as fp: |
|
data = fp.read() |
|
except Exception as e: |
|
data = "Unable to read log data (%s)\n" % (str(e)) |
|
|
|
sys.stdout.write(data) |
|
if log_file: |
|
log_file.write(data) |
|
info("{:-^100}".format(filename)) |
|
else: |
|
info("\tsee: " + COLOR_YELLOW + filename + COLOR_NORMAL) |
|
|
|
|
|
def terse_test_cb(instances, goals, goal): |
|
total_tests = len(goals) |
|
total_done = 0 |
|
total_failed = 0 |
|
|
|
for k, g in goals.items(): |
|
if g.finished: |
|
total_done += 1 |
|
if g.failed: |
|
total_failed += 1 |
|
|
|
if goal.failed: |
|
i = instances[goal.name] |
|
info( |
|
"\n\n{:<25} {:<50} {}FAILED{}: {}".format( |
|
i.platform.name, |
|
i.test.name, |
|
COLOR_RED, |
|
COLOR_NORMAL, |
|
goal.reason)) |
|
log_info(goal.get_error_log()) |
|
info("") |
|
|
|
sys.stdout.write( |
|
"\rtotal complete: %s%4d/%4d%s %2d%% failed: %s%4d%s" % |
|
(COLOR_GREEN, total_done, total_tests, COLOR_NORMAL, |
|
int((float(total_done) / total_tests) * 100), |
|
COLOR_RED if total_failed > 0 else COLOR_NORMAL, total_failed, |
|
COLOR_NORMAL)) |
|
sys.stdout.flush() |
|
|
|
|
|
def chatty_test_cb(instances, goals, goal): |
|
i = instances[goal.name] |
|
|
|
if VERBOSE < 2 and not goal.finished: |
|
return |
|
|
|
if goal.failed: |
|
status = COLOR_RED + "FAILED" + COLOR_NORMAL + ": " + goal.reason |
|
elif goal.finished: |
|
status = COLOR_GREEN + "PASSED" + COLOR_NORMAL |
|
else: |
|
status = goal.make_state |
|
|
|
info("{:<25} {:<50} {}".format(i.platform.name, i.test.name, status)) |
|
if goal.failed: |
|
log_info(goal.get_error_log()) |
|
|
|
|
|
def size_report(sc): |
|
info(sc.filename) |
|
info("SECTION NAME VMA LMA SIZE HEX SZ TYPE") |
|
for i in range(len(sc.sections)): |
|
v = sc.sections[i] |
|
|
|
info("%-17s 0x%08x 0x%08x %8d 0x%05x %-7s" % |
|
(v["name"], v["virt_addr"], v["load_addr"], v["size"], v["size"], |
|
v["type"])) |
|
|
|
info("Totals: %d bytes (ROM), %d bytes (RAM)" % |
|
(sc.rom_size, sc.ram_size)) |
|
info("") |
|
|
|
|
|
def generate_coverage(outdir, ignores): |
|
with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog: |
|
coveragefile = os.path.join(outdir, "coverage.info") |
|
ztestfile = os.path.join(outdir, "ztest.info") |
|
subprocess.call(["lcov", "--capture", "--directory", outdir, |
|
"--rc", "lcov_branch_coverage=1", |
|
"--output-file", coveragefile], stdout=coveragelog) |
|
# We want to remove tests/* and tests/ztest/test/* but save tests/ztest |
|
subprocess.call(["lcov", "--extract", coveragefile, |
|
os.path.join(ZEPHYR_BASE, "tests", "ztest", "*"), |
|
"--output-file", ztestfile, |
|
"--rc", "lcov_branch_coverage=1"], stdout=coveragelog) |
|
|
|
if os.path.getsize(ztestfile) > 0: |
|
subprocess.call(["lcov", "--remove", ztestfile, |
|
os.path.join(ZEPHYR_BASE, "tests/ztest/test/*"), |
|
"--output-file", ztestfile, |
|
"--rc", "lcov_branch_coverage=1"], |
|
stdout=coveragelog) |
|
files = [coveragefile, ztestfile]; |
|
else: |
|
files = [coveragefile]; |
|
|
|
for i in ignores: |
|
subprocess.call( |
|
["lcov", "--remove", coveragefile, i, "--output-file", |
|
coveragefile, "--rc", "lcov_branch_coverage=1"], |
|
stdout=coveragelog) |
|
|
|
ret = subprocess.call(["genhtml", "--legend", "--branch-coverage", |
|
"-output-directory", |
|
os.path.join(outdir, "coverage")] + files, |
|
stdout=coveragelog) |
|
if ret==0: |
|
info("HTML report generated: %s"% |
|
os.path.join(outdir, "coverage","index.html")); |
|
|
|
|
|
def main(): |
|
start_time = time.time() |
|
global VERBOSE, INLINE_LOGS, CPU_COUNTS, log_file |
|
global options |
|
options = parse_arguments() |
|
|
|
if options.size: |
|
for fn in options.size: |
|
size_report(SizeCalculator(fn, [])) |
|
sys.exit(0) |
|
|
|
|
|
if options.device_testing: |
|
if options.device_serial is None or len(options.platform) != 1: |
|
sys.exit(1) |
|
|
|
VERBOSE += options.verbose |
|
INLINE_LOGS = options.inline_logs |
|
if options.log_file: |
|
log_file = open(options.log_file, "w") |
|
if options.jobs: |
|
CPU_COUNTS = options.jobs |
|
|
|
if options.subset: |
|
subset, sets = options.subset.split("/") |
|
if int(subset) > 0 and int(sets) >= int(subset): |
|
info("Running only a subset: %s/%s" % (subset, sets)) |
|
else: |
|
error("You have provided a wrong subset value: %s." % options.subset) |
|
return |
|
|
|
if os.path.exists(options.outdir) and not options.no_clean: |
|
info("Cleaning output directory " + options.outdir) |
|
shutil.rmtree(options.outdir) |
|
|
|
if not options.testcase_root: |
|
options.testcase_root = [os.path.join(ZEPHYR_BASE, "tests"), |
|
os.path.join(ZEPHYR_BASE, "samples")] |
|
|
|
ts = TestSuite(options.board_root, options.testcase_root, options.outdir) |
|
|
|
if ts.load_errors: |
|
sys.exit(1) |
|
|
|
if options.list_tags: |
|
tags = set() |
|
for n,tc in ts.testcases.items(): |
|
tags = tags.union(tc.tags) |
|
|
|
for t in tags: |
|
print("- {}".format(t)) |
|
|
|
return |
|
|
|
if options.list_tests: |
|
cnt = 0 |
|
unq = [] |
|
for n,tc in ts.testcases.items(): |
|
for c in tc.cases: |
|
unq.append(c) |
|
|
|
for u in sorted(set(unq)): |
|
cnt = cnt + 1 |
|
print(" - {}".format(u)) |
|
print("{} total.".format(cnt)) |
|
return |
|
|
|
discards = [] |
|
if options.load_tests: |
|
ts.load_from_file(options.load_tests) |
|
else: |
|
discards = ts.apply_filters() |
|
|
|
if options.discard_report: |
|
ts.discard_report(options.discard_report) |
|
|
|
if VERBOSE > 1 and discards: |
|
# if we are using command line platform filter, no need to list every |
|
# other platform as excluded, we know that already. |
|
# Show only the discards that apply to the selected platforms on the |
|
# command line |
|
|
|
for i, reason in discards.items(): |
|
if options.platform and i.platform.name not in options.platform: |
|
continue |
|
debug( |
|
"{:<25} {:<50} {}SKIPPED{}: {}".format( |
|
i.platform.name, |
|
i.test.name, |
|
COLOR_YELLOW, |
|
COLOR_NORMAL, |
|
reason)) |
|
|
|
|
|
def native_posix_and_unit_first(a, b): |
|
if a[0].startswith('native_posix') or a[0].startswith('unit_testing'): |
|
return -1 |
|
if b[0].startswith('native_posix') or b[0].startswith('unit_testing'): |
|
return 1 |
|
return (a > b) - (a < b) |
|
|
|
ts.instances = OrderedDict(sorted(ts.instances.items(), |
|
key=cmp_to_key(native_posix_and_unit_first))) |
|
|
|
if options.save_tests: |
|
ts.run_report(options.save_tests) |
|
return |
|
|
|
if options.subset: |
|
|
|
subset, sets = options.subset.split("/") |
|
total = len(ts.instances) |
|
per_set = round(total / int(sets)) |
|
start = (int(subset) - 1) * per_set |
|
if subset == sets: |
|
end = total |
|
else: |
|
end = start + per_set |
|
|
|
sliced_instances = islice(ts.instances.items(), start, end) |
|
ts.instances = OrderedDict(sliced_instances) |
|
|
|
info("%d tests selected, %d tests discarded due to filters" % |
|
(len(ts.instances), len(discards))) |
|
|
|
if options.dry_run: |
|
return |
|
|
|
if VERBOSE or not TERMINAL: |
|
goals = ts.execute( |
|
chatty_test_cb, |
|
ts.instances) |
|
else: |
|
goals = ts.execute( |
|
terse_test_cb, |
|
ts.instances) |
|
info("") |
|
|
|
if options.detailed_report: |
|
ts.testcase_target_report(options.detailed_report) |
|
|
|
# figure out which report to use for size comparison |
|
if options.compare_report: |
|
report_to_use = options.compare_report |
|
elif options.last_metrics: |
|
report_to_use = LAST_SANITY |
|
else: |
|
report_to_use = RELEASE_DATA |
|
|
|
deltas = ts.compare_metrics(report_to_use) |
|
warnings = 0 |
|
if deltas and options.show_footprint: |
|
for i, metric, value, delta, lower_better in deltas: |
|
if not options.all_deltas and ((delta < 0 and lower_better) or |
|
(delta > 0 and not lower_better)): |
|
continue |
|
|
|
percentage = (float(delta) / float(value - delta)) |
|
if not options.all_deltas and (percentage < |
|
(options.footprint_threshold / 100.0)): |
|
continue |
|
|
|
info("{:<25} {:<60} {}{}{}: {} {:<+4}, is now {:6} {:+.2%}".format( |
|
i.platform.name, i.test.name, COLOR_YELLOW, |
|
"INFO" if options.all_deltas else "WARNING", COLOR_NORMAL, |
|
metric, delta, value, percentage)) |
|
warnings += 1 |
|
|
|
if warnings: |
|
info("Deltas based on metrics from last %s" % |
|
("release" if not options.last_metrics else "run")) |
|
|
|
failed = 0 |
|
for name, goal in goals.items(): |
|
if goal.failed: |
|
failed += 1 |
|
elif goal.metrics.get("unrecognized"): |
|
info("%sFAILED%s: %s has unrecognized binary sections: %s" % |
|
(COLOR_RED, COLOR_NORMAL, goal.name, |
|
str(goal.metrics["unrecognized"]))) |
|
failed += 1 |
|
|
|
if options.coverage: |
|
info("Generating coverage files...") |
|
generate_coverage(options.outdir, ["tests/*", "samples/*"]) |
|
|
|
duration = time.time() - start_time |
|
info("%s%d of %d%s tests passed with %s%d%s warnings in %d seconds" % |
|
(COLOR_RED if failed else COLOR_GREEN, len(goals) - failed, |
|
len(goals), COLOR_NORMAL, COLOR_YELLOW if warnings else COLOR_NORMAL, |
|
warnings, COLOR_NORMAL, duration)) |
|
|
|
if options.testcase_report: |
|
ts.testcase_report(options.testcase_report) |
|
if not options.no_update: |
|
ts.testcase_xunit_report(LAST_SANITY_XUNIT, duration) |
|
ts.testcase_report(LAST_SANITY) |
|
if options.release: |
|
ts.testcase_report(RELEASE_DATA) |
|
if log_file: |
|
log_file.close() |
|
if failed or (warnings and options.warnings_as_errors): |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|