|
|
@ -65,13 +65,28 @@ class TLSCredShellInterface: |
|
|
|
for c in range(chunks): |
|
|
|
for c in range(chunks): |
|
|
|
chunk = encoded[c * TLS_CRED_CHUNK_SIZE : (c + 1) * TLS_CRED_CHUNK_SIZE] |
|
|
|
chunk = encoded[c * TLS_CRED_CHUNK_SIZE : (c + 1) * TLS_CRED_CHUNK_SIZE] |
|
|
|
self.write_raw(f"cred buf {chunk}") |
|
|
|
self.write_raw(f"cred buf {chunk}") |
|
|
|
self.serial_wait_for_response("Stored") |
|
|
|
result, output = self.serial_wait_for_response("Stored", "RX ring buffer full") |
|
|
|
|
|
|
|
if not result: |
|
|
|
|
|
|
|
logging.error("Failed to store chunk in the device: unknown error") |
|
|
|
|
|
|
|
if output and b"RX ring buffer full" in output: |
|
|
|
|
|
|
|
logging.error(f"Failed to store chunk in the device: {output}") |
|
|
|
|
|
|
|
return False |
|
|
|
|
|
|
|
if not 0 <= cred_type < len(TLS_CRED_TYPES): |
|
|
|
|
|
|
|
logger.error( |
|
|
|
|
|
|
|
f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
return False |
|
|
|
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint") |
|
|
|
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint") |
|
|
|
result, _ = self.serial_wait_for_response("Added TLS credential") |
|
|
|
result, _ = self.serial_wait_for_response("Added TLS credential", "already exists") |
|
|
|
time.sleep(1) |
|
|
|
time.sleep(1) |
|
|
|
return result |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def delete_credential(self, sectag, cred_type): |
|
|
|
def delete_credential(self, sectag, cred_type): |
|
|
|
|
|
|
|
if not 0 <= cred_type < len(TLS_CRED_TYPES): |
|
|
|
|
|
|
|
logger.error( |
|
|
|
|
|
|
|
f"Invalid credential type: {cred_type}. Range [0, {len(TLS_CRED_TYPES) - 1}]." |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
return False |
|
|
|
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}') |
|
|
|
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}') |
|
|
|
result, _ = self.serial_wait_for_response( |
|
|
|
result, _ = self.serial_wait_for_response( |
|
|
|
"Deleted TLS credential", "There is no TLS credential" |
|
|
|
"Deleted TLS credential", "There is no TLS credential" |
|
|
@ -94,7 +109,11 @@ class TLSCredShellInterface: |
|
|
|
return True, None |
|
|
|
return True, None |
|
|
|
|
|
|
|
|
|
|
|
data = output.decode().split(",") |
|
|
|
data = output.decode().split(",") |
|
|
|
hash = data[2].strip() |
|
|
|
logger.debug(f"Cred list output: {data}") |
|
|
|
|
|
|
|
if len(data) < 4: |
|
|
|
|
|
|
|
logger.error("Invalid output format from device, skipping hash check.") |
|
|
|
|
|
|
|
return False, None |
|
|
|
|
|
|
|
cred_hash = data[2].strip() |
|
|
|
status_code = data[3].strip() |
|
|
|
status_code = data[3].strip() |
|
|
|
|
|
|
|
|
|
|
|
if status_code != "0": |
|
|
|
if status_code != "0": |
|
|
@ -102,17 +121,23 @@ class TLSCredShellInterface: |
|
|
|
logger.warning("Device might not support credential digests.") |
|
|
|
logger.warning("Device might not support credential digests.") |
|
|
|
return True, None |
|
|
|
return True, None |
|
|
|
|
|
|
|
|
|
|
|
return True, hash |
|
|
|
return True, cred_hash |
|
|
|
|
|
|
|
|
|
|
|
def calculate_expected_hash(self, cred_text): |
|
|
|
def calculate_expected_hash(self, cred_text): |
|
|
|
hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') |
|
|
|
cred_hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') |
|
|
|
return base64.b64encode(hash.digest()).decode() |
|
|
|
return base64.b64encode(cred_hash.digest()).decode() |
|
|
|
|
|
|
|
|
|
|
|
def check_cred_command(self): |
|
|
|
def check_cred_command(self): |
|
|
|
logger.info("Checking for 'cred' command existence...") |
|
|
|
logger.info("Checking for 'cred' command existence...") |
|
|
|
self.serial_write_line("cred") |
|
|
|
self.serial_write_line("cred") |
|
|
|
result, output = self.serial_wait_for_response(timeout=5) |
|
|
|
result, output = self.serial_wait_for_response( |
|
|
|
if not result or (output and b"command not found" in output): |
|
|
|
"TLS Credentials Commands", "command not found", store="cred" |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
logger.debug(f"Result: {result}, Output: {output}") |
|
|
|
|
|
|
|
if not result: |
|
|
|
|
|
|
|
logger.error("Device did not respond to 'cred' command.") |
|
|
|
|
|
|
|
return False |
|
|
|
|
|
|
|
if output and b"command not found" in output: |
|
|
|
logger.error("Device does not support 'cred' command.") |
|
|
|
logger.error("Device does not support 'cred' command.") |
|
|
|
logger.error("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file.") |
|
|
|
logger.error("Hint: Add 'CONFIG_TLS_CREDENTIALS_SHELL=y' to your prj.conf file.") |
|
|
|
return False |
|
|
|
return False |
|
|
@ -294,20 +319,25 @@ def main(in_args): |
|
|
|
logger.info(f'Deleting sectag {args.sectag}...') |
|
|
|
logger.info(f'Deleting sectag {args.sectag}...') |
|
|
|
cred_if.delete_credential(args.sectag, args.cert_type) |
|
|
|
cred_if.delete_credential(args.sectag, args.cert_type) |
|
|
|
|
|
|
|
|
|
|
|
cred_if.write_credential(args.sectag, args.cert_type, dev_bytes) |
|
|
|
result = cred_if.write_credential(args.sectag, args.cert_type, dev_bytes) |
|
|
|
|
|
|
|
if not result: |
|
|
|
|
|
|
|
logger.error(f'Failed to write credential for sectag {args.sectag}, it may already exist') |
|
|
|
|
|
|
|
sys.exit(5) |
|
|
|
logger.info(f'Writing sectag {args.sectag}...') |
|
|
|
logger.info(f'Writing sectag {args.sectag}...') |
|
|
|
result, hash = cred_if.check_credential_exists(args.sectag, args.cert_type, args.check_hash) |
|
|
|
result, cred_hash = cred_if.check_credential_exists( |
|
|
|
|
|
|
|
args.sectag, args.cert_type, args.check_hash |
|
|
|
|
|
|
|
) |
|
|
|
if args.check_hash: |
|
|
|
if args.check_hash: |
|
|
|
logger.debug(f'Checking hash for sectag {args.sectag}...') |
|
|
|
logger.debug(f'Checking hash for sectag {args.sectag}...') |
|
|
|
if not result: |
|
|
|
if not result: |
|
|
|
logger.error(f'Failed to check credential existence for sectag {args.sectag}') |
|
|
|
logger.error(f'Failed to check credential existence for sectag {args.sectag}') |
|
|
|
sys.exit(4) |
|
|
|
sys.exit(4) |
|
|
|
if hash: |
|
|
|
if cred_hash: |
|
|
|
logger.debug(f'Credential hash: {hash}') |
|
|
|
logger.debug(f'Credential hash: {cred_hash}') |
|
|
|
expected_hash = cred_if.calculate_expected_hash(dev_bytes) |
|
|
|
expected_hash = cred_if.calculate_expected_hash(dev_bytes) |
|
|
|
if hash != expected_hash: |
|
|
|
if cred_hash != expected_hash: |
|
|
|
logger.error( |
|
|
|
logger.error( |
|
|
|
f'Hash mismatch for sectag {args.sectag}. Expected: {expected_hash}, got: {hash}' |
|
|
|
f'Hash mismatch for sectag {args.sectag}. Exp: {expected_hash}, got: {cred_hash}' |
|
|
|
) |
|
|
|
) |
|
|
|
sys.exit(6) |
|
|
|
sys.exit(6) |
|
|
|
logger.info(f'Credential for sectag {args.sectag} written successfully') |
|
|
|
logger.info(f'Credential for sectag {args.sectag} written successfully') |
|
|
|