diff --git a/scripts/utils/tls_creds_installer.py b/scripts/utils/tls_creds_installer.py index 245520ceac4..82788523031 100755 --- a/scripts/utils/tls_creds_installer.py +++ b/scripts/utils/tls_creds_installer.py @@ -65,13 +65,28 @@ class TLSCredShellInterface: for c in range(chunks): chunk = encoded[c * TLS_CRED_CHUNK_SIZE : (c + 1) * TLS_CRED_CHUNK_SIZE] self.write_raw(f"cred buf {chunk}") - self.serial_wait_for_response("Stored") + result, output = self.serial_wait_for_response("Stored", "RX ring buffer full") + if not result: + logging.error("Failed to store chunk in the device: unknown error") + if output and b"RX ring buffer full" in output: + logging.error(f"Failed to store chunk in the device: {output}") + return False + if not 0 <= cred_type < len(TLS_CRED_TYPES): + logger.error( + f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." + ) + return False self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint") - result, _ = self.serial_wait_for_response("Added TLS credential") + result, _ = self.serial_wait_for_response("Added TLS credential", "already exists") time.sleep(1) return result def delete_credential(self, sectag, cred_type): + if not 0 <= cred_type < len(TLS_CRED_TYPES): + logger.error( + f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." + ) + return False self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}') result, _ = self.serial_wait_for_response( "Deleted TLS credential", "There is no TLS credential" @@ -94,7 +109,11 @@ class TLSCredShellInterface: return True, None data = output.decode().split(",") - hash = data[2].strip() + logger.debug(f"Cred list output: {data}") + if len(data) < 4: + logger.error("Invalid output format from device, skipping hash check.") + return False, None + cred_hash = data[2].strip() status_code = data[3].strip() if status_code != "0": @@ -102,17 +121,23 @@ class TLSCredShellInterface: logger.warning("Device might not support credential digests.") return True, None - return True, hash + return True, cred_hash def calculate_expected_hash(self, cred_text): - hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') - return base64.b64encode(hash.digest()).decode() + cred_hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') + return base64.b64encode(cred_hash.digest()).decode() def check_cred_command(self): logger.info("Checking for 'cred' command existence...") self.serial_write_line("cred") - result, output = self.serial_wait_for_response(timeout=5) - if not result or (output and b"command not found" in output): + result, output = self.serial_wait_for_response( + "TLS Credentials Commands", "command not found", store="cred" + ) + logger.debug(f"Result: {result}, Output: {output}") + if not result: + logger.error("Device did not respond to 'cred' command.") + return False + if output and b"command not found" in output: logger.error("Device does not support 'cred' command.") logger.error("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file.") return False @@ -294,20 +319,25 @@ def main(in_args): logger.info(f'Deleting sectag {args.sectag}...') cred_if.delete_credential(args.sectag, args.cert_type) - cred_if.write_credential(args.sectag, args.cert_type, dev_bytes) + result = cred_if.write_credential(args.sectag, args.cert_type, dev_bytes) + if not result: + logger.error(f'Failed to write credential for sectag {args.sectag}, it may already exist') + sys.exit(5) logger.info(f'Writing sectag {args.sectag}...') - result, hash = cred_if.check_credential_exists(args.sectag, args.cert_type, args.check_hash) + result, cred_hash = cred_if.check_credential_exists( + args.sectag, args.cert_type, args.check_hash + ) if args.check_hash: logger.debug(f'Checking hash for sectag {args.sectag}...') if not result: logger.error(f'Failed to check credential existence for sectag {args.sectag}') sys.exit(4) - if hash: - logger.debug(f'Credential hash: {hash}') + if cred_hash: + logger.debug(f'Credential hash: {cred_hash}') expected_hash = cred_if.calculate_expected_hash(dev_bytes) - if hash != expected_hash: + if cred_hash != expected_hash: logger.error( - f'Hash mismatch for sectag {args.sectag}. Expected: {expected_hash}, got: {hash}' + f'Hash mismatch for sectag {args.sectag}. Exp: {expected_hash}, got: {cred_hash}' ) sys.exit(6) logger.info(f'Credential for sectag {args.sectag} written successfully')