You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
297 lines
11 KiB
297 lines
11 KiB
#!/usr/bin/env python3 |
|
# |
|
# Copyright (c) 2024 Intel Corporation |
|
# |
|
# SPDX-License-Identifier: Apache-2.0 |
|
|
|
""" |
|
This script converts memory footprint data prepared by `./footprint/scripts/track.py` |
|
into a JSON files compatible with Twister report schema making them ready for upload |
|
to the same ElasticSearch data storage together with other Twister reports |
|
for analysis, visualization, etc. |
|
|
|
The memory footprint input data files (rom.json, ram.json) are expected in directories |
|
sturctured as 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' under the input path(s). |
|
The BOARD name itself can be in HWMv2 format as 'BOARD/SOC' or 'BOARD/SOC/VARIANT' |
|
with the corresponding sub-directories. |
|
|
|
For example, an input path `./**/*v3.6.0-rc3-*/footprints/**/frdm_k64f/` will be |
|
expanded by bash to all sub-directories with the 'footprints' data `v3.6.0-rc3` |
|
release commits collected for `frdm_k64f` board. |
|
Note: for the above example to work the bash recursive globbing should be active: |
|
`shopt -s globstar`. |
|
|
|
The output `twister_footprint.json` files will be placed into the same directories |
|
as the corresponding input files. |
|
|
|
In Twister report a test instance has either long or short name, each needs test |
|
suite name from the test configuration yaml file. |
|
This scripts has `--test-name` parameter to customize how to compose test names |
|
from the plan.txt columns including an additional (last) one whth explicit |
|
test suite name ('dot separated' format). |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
from datetime import datetime, timezone |
|
import argparse |
|
import os |
|
import sys |
|
import re |
|
import csv |
|
import logging |
|
import json |
|
from git import Repo |
|
from git.exc import BadName |
|
|
|
|
|
VERSION_COMMIT_RE = re.compile(r".*-g([a-f0-9]{12})$") |
|
PLAN_HEADERS = ['name', 'feature', 'board', 'application', 'options', 'suite_name'] |
|
TESTSUITE_FILENAME = { 'tests': 'testcase.yaml', 'samples': 'sample.yaml' } |
|
FOOTPRINT_FILES = { 'ROM': 'rom.json', 'RAM': 'ram.json' } |
|
RESULT_FILENAME = 'twister_footprint.json' |
|
HWMv2_LEVELS = 3 |
|
|
|
logger = None |
|
LOG_LEVELS = { |
|
'DEBUG': (logging.DEBUG, 3), |
|
'INFO': (logging.INFO, 2), |
|
'WARNING': (logging.WARNING, 1), |
|
'ERROR': (logging.ERROR, 0) |
|
} |
|
|
|
|
|
def init_logs(logger_name=''): |
|
global logger |
|
|
|
log_level = os.environ.get('LOG_LEVEL', 'ERROR') |
|
log_level = LOG_LEVELS[log_level][0] if log_level in LOG_LEVELS else logging.ERROR |
|
|
|
console = logging.StreamHandler(sys.stdout) |
|
console.setFormatter(logging.Formatter('%(asctime)s - %(levelname)-8s - %(message)s')) |
|
|
|
logger = logging.getLogger(logger_name) |
|
logger.setLevel(log_level) |
|
logger.addHandler(console) |
|
|
|
def set_verbose(verbose: int): |
|
levels = { lvl[1]: lvl[0] for lvl in LOG_LEVELS.values() } |
|
if verbose > len(levels): |
|
verbose = len(levels) |
|
if verbose <= 0: |
|
verbose = 0 |
|
logger.setLevel(levels[verbose]) |
|
|
|
|
|
def parse_args(): |
|
parser = argparse.ArgumentParser(allow_abbrev=False, |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
description=__doc__) |
|
|
|
parser.add_argument('input_paths', metavar='INPUT_PATHS', nargs='+', |
|
help="Directories with the memory footprint data to convert. " |
|
"Each directory must have 'ZEPHYR_VERSION/APPLICATION/FEATURE/BOARD' path structure.") |
|
|
|
parser.add_argument('-p', '--plan', metavar='PLAN_FILE_CSV', required=True, |
|
help="An execution plan (CSV file) with details of what footprint applications " |
|
"and platforms were chosen to generate the input data. " |
|
"It is also applied to filter input directories and check their names.") |
|
|
|
parser.add_argument('-o', '--output-fname', metavar='OUTPUT_FNAME', required=False, |
|
default=RESULT_FILENAME, |
|
help="Destination JSON file name to create at each of INPUT_PATHS. " |
|
"Default: '%(default)s'") |
|
|
|
parser.add_argument('-z', '--zephyr_base', metavar='ZEPHYR_BASE', required=False, |
|
default = os.environ.get('ZEPHYR_BASE'), |
|
help="Zephyr code base path to use instead of the current ZEPHYR_BASE environment variable. " |
|
"The script needs Zephyr repository there to read SHA and commit time of builds. " |
|
"Current default: '%(default)s'") |
|
|
|
parser.add_argument("--test-name", |
|
choices=['application/suite_name', 'suite_name', 'application', 'name.feature'], |
|
default='name.feature', |
|
help="How to compose Twister test instance names using plan.txt columns. " |
|
"Default: '%(default)s'" ) |
|
|
|
parser.add_argument("--no-testsuite-check", |
|
dest='testsuite_check', action="store_false", |
|
help="Don't check for applications' testsuite configs in ZEPHYR_BASE.") |
|
|
|
parser.add_argument('-v', '--verbose', required=False, action='count', default=0, |
|
help="Increase the logging level for each occurrence. Default level: ERROR") |
|
|
|
return parser.parse_args() |
|
|
|
|
|
def read_plan(fname: str) -> list[dict]: |
|
plan = [] |
|
with open(fname) as plan_file: |
|
plan_rows = csv.reader(plan_file) |
|
plan_vals = [ dict(zip(PLAN_HEADERS, row)) for row in plan_rows ] |
|
plan = { f"{p['name']}/{p['feature']}/{p['board']}" : p for p in plan_vals } |
|
return plan |
|
|
|
|
|
def get_id_from_path(plan, in_path, max_levels=HWMv2_LEVELS): |
|
data_id = {} |
|
(in_path, data_id['board']) = os.path.split(in_path) |
|
if not data_id['board']: |
|
# trailing '/' |
|
(in_path, data_id['board']) = os.path.split(in_path) |
|
|
|
for _ in range(max_levels): |
|
(in_path, data_id['feature']) = os.path.split(in_path) |
|
(c_head, data_id['app']) = os.path.split(in_path) |
|
(c_head, data_id['version']) = os.path.split(c_head) |
|
if not all(data_id.values()): |
|
# incorrect plan id |
|
return None |
|
if f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" in plan: |
|
return data_id |
|
else: |
|
# try with HWMv2 board name one more level deep |
|
data_id['board'] = f"{data_id['feature']}/{data_id['board']}" |
|
|
|
# not found |
|
return {} |
|
|
|
|
|
def main(): |
|
errors = 0 |
|
converted = 0 |
|
skipped = 0 |
|
filtered = 0 |
|
|
|
run_date = datetime.now(timezone.utc).isoformat(timespec='seconds') |
|
|
|
init_logs() |
|
|
|
args = parse_args() |
|
|
|
set_verbose(args.verbose) |
|
|
|
if not args.zephyr_base: |
|
logging.error("ZEPHYR_BASE is not defined.") |
|
sys.exit(1) |
|
|
|
zephyr_base = os.path.abspath(args.zephyr_base) |
|
zephyr_base_repo = Repo(zephyr_base) |
|
|
|
logging.info(f"scanning {len(args.input_paths)} directories ...") |
|
|
|
logging.info(f"use plan '{args.plan}'") |
|
plan = read_plan(args.plan) |
|
|
|
test_name_sep = '/' if '/' in args.test_name else '.' |
|
test_name_parts = args.test_name.split(test_name_sep) |
|
|
|
for report_path in args.input_paths: |
|
logging.info(f"convert {report_path}") |
|
# print(p) |
|
p_head = os.path.normcase(report_path) |
|
p_head = os.path.normpath(p_head) |
|
if not os.path.isdir(p_head): |
|
logging.error(f"not a directory '{p_head}'") |
|
errors += 1 |
|
continue |
|
|
|
data_id = get_id_from_path(plan, p_head) |
|
if data_id is None: |
|
logging.warning(f"skipped '{report_path}' - not a correct report directory") |
|
skipped += 1 |
|
continue |
|
elif not data_id: |
|
logging.info(f"filtered '{report_path}' - not in the plan") |
|
filtered += 1 |
|
continue |
|
|
|
r_plan = f"{data_id['app']}/{data_id['feature']}/{data_id['board']}" |
|
|
|
if 'suite_name' in test_name_parts and 'suite_name' not in plan[r_plan]: |
|
logging.info(f"filtered '{report_path}' - no Twister suite name in the plan.") |
|
filtered += 1 |
|
continue |
|
|
|
suite_name = test_name_sep.join([plan[r_plan][n] if n in plan[r_plan] else '' for n in test_name_parts]) |
|
|
|
# Just some sanity checks of the 'application' in the current ZEPHYR_BASE |
|
if args.testsuite_check: |
|
suite_type = plan[r_plan]['application'].split('/') |
|
if len(suite_type) and suite_type[0] in TESTSUITE_FILENAME: |
|
suite_conf_name = TESTSUITE_FILENAME[suite_type[0]] |
|
else: |
|
logging.error(f"unknown app type to get configuration in '{report_path}'") |
|
errors += 1 |
|
continue |
|
|
|
suite_conf_fname = os.path.join(zephyr_base, plan[r_plan]['application'], suite_conf_name) |
|
if not os.path.isfile(suite_conf_fname): |
|
logging.error(f"test configuration not found for '{report_path}' at '{suite_conf_fname}'") |
|
errors += 1 |
|
continue |
|
|
|
|
|
# Check SHA presence in the current ZEPHYR_BASE |
|
sha_match = VERSION_COMMIT_RE.search(data_id['version']) |
|
version_sha = sha_match.group(1) if sha_match else data_id['version'] |
|
try: |
|
git_commit = zephyr_base_repo.commit(version_sha) |
|
except BadName: |
|
logging.error(f"SHA:'{version_sha}' is not found in ZEPHYR_BASE for '{report_path}'") |
|
errors += 1 |
|
continue |
|
|
|
|
|
# Compose twister_footprint.json record - each application (test suite) will have its own |
|
# simplified header with options, SHA, etc. |
|
|
|
res = {} |
|
|
|
res['environment'] = { |
|
'zephyr_version': data_id['version'], |
|
'commit_date': |
|
git_commit.committed_datetime.astimezone(timezone.utc).isoformat(timespec='seconds'), |
|
'run_date': run_date, |
|
'options': { |
|
'testsuite_root': [ plan[r_plan]['application'] ], |
|
'build_only': True, |
|
'create_rom_ram_report': True, |
|
'footprint_report': 'all', |
|
'platform': [ plan[r_plan]['board'] ] |
|
} |
|
} |
|
|
|
test_suite = { |
|
'name': suite_name, |
|
'arch': None, |
|
'platform': plan[r_plan]['board'], |
|
'status': 'passed', |
|
'footprint': {} |
|
} |
|
|
|
for k,v in FOOTPRINT_FILES.items(): |
|
footprint_fname = os.path.join(report_path, v) |
|
try: |
|
with open(footprint_fname, "rt") as footprint_json: |
|
logger.debug(f"reading {footprint_fname}") |
|
test_suite['footprint'][k] = json.load(footprint_json) |
|
except FileNotFoundError: |
|
logger.warning(f"{report_path} missing {v}") |
|
|
|
res['testsuites'] = [test_suite] |
|
|
|
report_fname = os.path.join(report_path, args.output_fname) |
|
with open(report_fname, "wt") as json_file: |
|
logger.debug(f"writing {report_fname}") |
|
json.dump(res, json_file, indent=4, separators=(',',':')) |
|
|
|
converted += 1 |
|
|
|
logging.info(f'found={len(args.input_paths)}, converted={converted}, ' |
|
f'skipped={skipped}, filtered={filtered}, errors={errors}') |
|
sys.exit(errors != 0) |
|
|
|
|
|
if __name__ == '__main__': |
|
main()
|
|
|