You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
456 lines
16 KiB
456 lines
16 KiB
#!/usr/bin/env python3 |
|
# vim: set syntax=python ts=4 : |
|
# |
|
# Copyright (c) 2022 Intel Corporation |
|
# SPDX-License-Identifier: Apache-2.0 |
|
|
|
import logging |
|
import os |
|
import platform |
|
import re |
|
from multiprocessing import Lock, Value |
|
from pathlib import Path |
|
|
|
import scl |
|
import yaml |
|
from natsort import natsorted |
|
from twisterlib.environment import ZEPHYR_BASE |
|
|
|
try: |
|
# Use the C LibYAML parser if available, rather than the Python parser. |
|
# It's much faster. |
|
from yaml import CDumper as Dumper |
|
from yaml import CSafeLoader as SafeLoader |
|
except ImportError: |
|
from yaml import Dumper, SafeLoader |
|
|
|
try: |
|
from tabulate import tabulate |
|
except ImportError: |
|
print("Install tabulate python module with pip to use --device-testing option.") |
|
|
|
logger = logging.getLogger('twister') |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
class DUT: |
|
def __init__(self, |
|
id=None, |
|
serial=None, |
|
serial_baud=None, |
|
platform=None, |
|
product=None, |
|
serial_pty=None, |
|
connected=False, |
|
runner_params=None, |
|
pre_script=None, |
|
post_script=None, |
|
post_flash_script=None, |
|
script_param=None, |
|
runner=None, |
|
flash_timeout=60, |
|
flash_with_test=False, |
|
flash_before=False): |
|
|
|
self.serial = serial |
|
self.baud = serial_baud or 115200 |
|
self.platform = platform |
|
self.serial_pty = serial_pty |
|
self._counter = Value("i", 0) |
|
self._available = Value("i", 1) |
|
self._failures = Value("i", 0) |
|
self.connected = connected |
|
self.pre_script = pre_script |
|
self.id = id |
|
self.product = product |
|
self.runner = runner |
|
self.runner_params = runner_params |
|
self.flash_before = flash_before |
|
self.fixtures = [] |
|
self.post_flash_script = post_flash_script |
|
self.post_script = post_script |
|
self.pre_script = pre_script |
|
self.script_param = script_param |
|
self.probe_id = None |
|
self.notes = None |
|
self.lock = Lock() |
|
self.match = False |
|
self.flash_timeout = flash_timeout |
|
self.flash_with_test = flash_with_test |
|
|
|
@property |
|
def available(self): |
|
with self._available.get_lock(): |
|
return self._available.value |
|
|
|
@available.setter |
|
def available(self, value): |
|
with self._available.get_lock(): |
|
self._available.value = value |
|
|
|
@property |
|
def counter(self): |
|
with self._counter.get_lock(): |
|
return self._counter.value |
|
|
|
@counter.setter |
|
def counter(self, value): |
|
with self._counter.get_lock(): |
|
self._counter.value = value |
|
|
|
def counter_increment(self, value=1): |
|
with self._counter.get_lock(): |
|
self._counter.value += value |
|
|
|
@property |
|
def failures(self): |
|
with self._failures.get_lock(): |
|
return self._failures.value |
|
|
|
@failures.setter |
|
def failures(self, value): |
|
with self._failures.get_lock(): |
|
self._failures.value = value |
|
|
|
def failures_increment(self, value=1): |
|
with self._failures.get_lock(): |
|
self._failures.value += value |
|
|
|
def to_dict(self): |
|
d = {} |
|
exclude = ['_available', '_counter', '_failures', 'match'] |
|
v = vars(self) |
|
for k in v: |
|
if k not in exclude and v[k]: |
|
d[k] = v[k] |
|
return d |
|
|
|
|
|
def __repr__(self): |
|
return f"<{self.platform} ({self.product}) on {self.serial}>" |
|
|
|
class HardwareMap: |
|
schema_path = os.path.join(ZEPHYR_BASE, "scripts", "schemas", "twister", "hwmap-schema.yaml") |
|
|
|
manufacturer = [ |
|
'ARM', |
|
'SEGGER', |
|
'MBED', |
|
'STMicroelectronics', |
|
'Atmel Corp.', |
|
'Texas Instruments', |
|
'Silicon Labs', |
|
'NXP', |
|
'NXP Semiconductors', |
|
'Microchip Technology Inc.', |
|
'FTDI', |
|
'Digilent', |
|
'Microsoft', |
|
'Nuvoton', |
|
'Espressif', |
|
] |
|
|
|
runner_mapping = { |
|
'pyocd': [ |
|
'DAPLink CMSIS-DAP', |
|
'MBED CMSIS-DAP' |
|
], |
|
'jlink': [ |
|
'J-Link', |
|
'J-Link OB' |
|
], |
|
'openocd': [ |
|
'STM32 STLink', '^XDS110.*', 'STLINK-V3' |
|
], |
|
'dediprog': [ |
|
'TTL232R-3V3', |
|
'MCP2200 USB Serial Port Emulator' |
|
] |
|
} |
|
|
|
def __init__(self, env=None): |
|
self.detected = [] |
|
self.duts = [] |
|
self.options = env.options |
|
|
|
def discover(self): |
|
|
|
if self.options.generate_hardware_map: |
|
self.scan(persistent=self.options.persistent_hardware_map) |
|
self.save(self.options.generate_hardware_map) |
|
return 0 |
|
|
|
if not self.options.device_testing and self.options.hardware_map: |
|
self.load(self.options.hardware_map) |
|
logger.info("Available devices:") |
|
self.dump(connected_only=True) |
|
return 0 |
|
|
|
if self.options.device_testing: |
|
if self.options.hardware_map: |
|
self.load(self.options.hardware_map) |
|
if not self.options.platform: |
|
self.options.platform = [] |
|
for d in self.duts: |
|
if d.connected and d.platform != 'unknown': |
|
self.options.platform.append(d.platform) |
|
|
|
elif self.options.device_serial: |
|
self.add_device(self.options.device_serial, |
|
self.options.platform[0], |
|
self.options.pre_script, |
|
False, |
|
baud=self.options.device_serial_baud, |
|
flash_timeout=self.options.device_flash_timeout, |
|
flash_with_test=self.options.device_flash_with_test, |
|
flash_before=self.options.flash_before, |
|
) |
|
|
|
elif self.options.device_serial_pty: |
|
self.add_device(self.options.device_serial_pty, |
|
self.options.platform[0], |
|
self.options.pre_script, |
|
True, |
|
flash_timeout=self.options.device_flash_timeout, |
|
flash_with_test=self.options.device_flash_with_test, |
|
flash_before=False, |
|
) |
|
|
|
# the fixtures given by twister command explicitly should be assigned to each DUT |
|
if self.options.fixture: |
|
for d in self.duts: |
|
d.fixtures.extend(self.options.fixture) |
|
return 1 |
|
|
|
|
|
def summary(self, selected_platforms): |
|
print("\nHardware distribution summary:\n") |
|
table = [] |
|
header = ['Board', 'ID', 'Counter', 'Failures'] |
|
for d in self.duts: |
|
if d.connected and d.platform in selected_platforms: |
|
row = [d.platform, d.id, d.counter, d.failures] |
|
table.append(row) |
|
print(tabulate(table, headers=header, tablefmt="github")) |
|
|
|
|
|
def add_device(self, serial, platform, pre_script, is_pty, baud=None, flash_timeout=60, flash_with_test=False, flash_before=False): |
|
device = DUT(platform=platform, connected=True, pre_script=pre_script, serial_baud=baud, |
|
flash_timeout=flash_timeout, flash_with_test=flash_with_test, flash_before=flash_before |
|
) |
|
if is_pty: |
|
device.serial_pty = serial |
|
else: |
|
device.serial = serial |
|
|
|
self.duts.append(device) |
|
|
|
def load(self, map_file): |
|
hwm_schema = scl.yaml_load(self.schema_path) |
|
duts = scl.yaml_load_verify(map_file, hwm_schema) |
|
for dut in duts: |
|
pre_script = dut.get('pre_script') |
|
script_param = dut.get('script_param') |
|
post_script = dut.get('post_script') |
|
post_flash_script = dut.get('post_flash_script') |
|
flash_timeout = dut.get('flash_timeout') or self.options.device_flash_timeout |
|
flash_with_test = dut.get('flash_with_test') |
|
if flash_with_test is None: |
|
flash_with_test = self.options.device_flash_with_test |
|
serial_pty = dut.get('serial_pty') |
|
flash_before = dut.get('flash_before') |
|
if flash_before is None: |
|
flash_before = self.options.flash_before and (not (flash_with_test or serial_pty)) |
|
platform = dut.get('platform') |
|
if isinstance(platform, str): |
|
platforms = platform.split() |
|
elif isinstance(platform, list): |
|
platforms = platform |
|
else: |
|
raise ValueError(f"Invalid platform value: {platform}") |
|
id = dut.get('id') |
|
runner = dut.get('runner') |
|
runner_params = dut.get('runner_params') |
|
serial = dut.get('serial') |
|
baud = dut.get('baud', None) |
|
product = dut.get('product') |
|
fixtures = dut.get('fixtures', []) |
|
connected = dut.get('connected') and ((serial or serial_pty) is not None) |
|
if not connected: |
|
continue |
|
for plat in platforms: |
|
new_dut = DUT(platform=plat, |
|
product=product, |
|
runner=runner, |
|
runner_params=runner_params, |
|
id=id, |
|
serial_pty=serial_pty, |
|
serial=serial, |
|
serial_baud=baud, |
|
connected=connected, |
|
pre_script=pre_script, |
|
flash_before=flash_before, |
|
post_script=post_script, |
|
post_flash_script=post_flash_script, |
|
script_param=script_param, |
|
flash_timeout=flash_timeout, |
|
flash_with_test=flash_with_test) |
|
new_dut.fixtures = fixtures |
|
new_dut.counter = 0 |
|
self.duts.append(new_dut) |
|
|
|
def scan(self, persistent=False): |
|
from serial.tools import list_ports |
|
|
|
if persistent and platform.system() == 'Linux': |
|
# On Linux, /dev/serial/by-id provides symlinks to |
|
# '/dev/ttyACMx' nodes using names which are unique as |
|
# long as manufacturers fill out USB metadata nicely. |
|
# |
|
# This creates a map from '/dev/ttyACMx' device nodes |
|
# to '/dev/serial/by-id/usb-...' symlinks. The symlinks |
|
# go into the hardware map because they stay the same |
|
# even when the user unplugs / replugs the device. |
|
# |
|
# Some inexpensive USB/serial adapters don't result |
|
# in unique names here, though, so use of this feature |
|
# requires explicitly setting persistent=True. |
|
by_id = Path('/dev/serial/by-id') |
|
def readlink(link): |
|
return str((by_id / link).resolve()) |
|
|
|
if by_id.exists(): |
|
persistent_map = {readlink(link): str(link) |
|
for link in by_id.iterdir()} |
|
else: |
|
persistent_map = {} |
|
else: |
|
persistent_map = {} |
|
|
|
serial_devices = list_ports.comports() |
|
logger.info("Scanning connected hardware...") |
|
for d in serial_devices: |
|
if d.manufacturer and d.manufacturer.casefold() in [m.casefold() for m in self.manufacturer]: |
|
|
|
# TI XDS110 can have multiple serial devices for a single board |
|
# assume endpoint 0 is the serial, skip all others |
|
if d.manufacturer == 'Texas Instruments' and not d.location.endswith('0'): |
|
continue |
|
|
|
if d.product is None: |
|
d.product = 'unknown' |
|
|
|
s_dev = DUT(platform="unknown", |
|
id=d.serial_number, |
|
serial=persistent_map.get(d.device, d.device), |
|
product=d.product, |
|
runner='unknown', |
|
connected=True) |
|
|
|
for runner, _ in self.runner_mapping.items(): |
|
products = self.runner_mapping.get(runner) |
|
if d.product in products: |
|
s_dev.runner = runner |
|
continue |
|
# Try regex matching |
|
for p in products: |
|
if re.match(p, d.product): |
|
s_dev.runner = runner |
|
|
|
s_dev.connected = True |
|
s_dev.lock = None |
|
self.detected.append(s_dev) |
|
else: |
|
logger.warning("Unsupported device (%s): %s" % (d.manufacturer, d)) |
|
|
|
def save(self, hwm_file): |
|
# use existing map |
|
self.detected = natsorted(self.detected, key=lambda x: x.serial or '') |
|
if os.path.exists(hwm_file): |
|
with open(hwm_file, 'r') as yaml_file: |
|
hwm = yaml.load(yaml_file, Loader=SafeLoader) |
|
if hwm: |
|
hwm.sort(key=lambda x: x.get('id', '')) |
|
|
|
# disconnect everything |
|
for h in hwm: |
|
h['connected'] = False |
|
h['serial'] = None |
|
|
|
for _detected in self.detected: |
|
for h in hwm: |
|
if all([ |
|
_detected.id == h['id'], |
|
_detected.product == h['product'], |
|
_detected.match is False, |
|
h['connected'] is False |
|
]): |
|
h['connected'] = True |
|
h['serial'] = _detected.serial |
|
_detected.match = True |
|
break |
|
|
|
new_duts = list(filter(lambda d: not d.match, self.detected)) |
|
new = [] |
|
for d in new_duts: |
|
new.append(d.to_dict()) |
|
|
|
if hwm: |
|
hwm = hwm + new |
|
else: |
|
hwm = new |
|
|
|
with open(hwm_file, 'w') as yaml_file: |
|
yaml.dump(hwm, yaml_file, Dumper=Dumper, default_flow_style=False) |
|
|
|
self.load(hwm_file) |
|
logger.info("Registered devices:") |
|
self.dump() |
|
|
|
else: |
|
# create new file |
|
dl = [] |
|
for _connected in self.detected: |
|
platform = _connected.platform |
|
id = _connected.id |
|
runner = _connected.runner |
|
serial = _connected.serial |
|
product = _connected.product |
|
d = { |
|
'platform': platform, |
|
'id': id, |
|
'runner': runner, |
|
'serial': serial, |
|
'product': product, |
|
'connected': _connected.connected |
|
} |
|
dl.append(d) |
|
with open(hwm_file, 'w') as yaml_file: |
|
yaml.dump(dl, yaml_file, Dumper=Dumper, default_flow_style=False) |
|
logger.info("Detected devices:") |
|
self.dump(detected=True) |
|
|
|
def dump(self, filtered=None, header=None, connected_only=False, detected=False): |
|
if filtered is None: |
|
filtered = [] |
|
if header is None: |
|
header = [] |
|
print("") |
|
table = [] |
|
if detected: |
|
to_show = self.detected |
|
else: |
|
to_show = self.duts |
|
|
|
if not header: |
|
header = ["Platform", "ID", "Serial device"] |
|
for p in to_show: |
|
platform = p.platform |
|
connected = p.connected |
|
if filtered and platform not in filtered: |
|
continue |
|
|
|
if not connected_only or connected: |
|
table.append([platform, p.id, p.serial]) |
|
|
|
print(tabulate(table, headers=header, tablefmt="github"))
|
|
|