You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
53 lines
1.6 KiB
53 lines
1.6 KiB
#!/usr/bin/env python3 |
|
# Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com> |
|
# SPDX-License-Identifier: Apache-2.0 |
|
import argparse |
|
import json |
|
from time import sleep |
|
|
|
import serial |
|
|
|
j = """ |
|
[ |
|
{"name":"version","on_changed": false, "read_only": true, "message_size": 4}, |
|
{"name":"sensor_data","on_changed": true, "read_only": false, "message_size": 4}, |
|
{"name":"start_measurement","on_changed": false, "read_only": false, "message_size": 1} |
|
]""" |
|
|
|
channels = json.loads(j) |
|
parser = argparse.ArgumentParser(description='Read zbus events via serial.', allow_abbrev=False) |
|
parser.add_argument("port", type=str, help='The tty or COM port to be used') |
|
|
|
args = parser.parse_args() |
|
|
|
|
|
def fetch_sentence(ser): |
|
channel_id = int.from_bytes(ser.read(), "little") |
|
channel_name = channels[channel_id]['name'] |
|
msg_size = channels[channel_id]['message_size'] |
|
msg = ser.read(msg_size) |
|
ser.read(1) # skip '*' |
|
return (channel_name, msg_size, msg) |
|
|
|
|
|
def pub_start_measurement(ser, action: bool): |
|
print( |
|
f"Proxy PUB [{channels[2]['name']}] -> start measurement") |
|
ser.write(b'$') |
|
ser.write(b'\x02') # idx |
|
ser.write(b'\x01') |
|
ser.write(b'*') |
|
ser.flush() |
|
|
|
|
|
ser = serial.Serial(args.port) |
|
pub_start_measurement(ser, True) |
|
while True: |
|
d = ser.read() |
|
if d == b'$': |
|
channel_name, msg_size, msg = fetch_sentence(ser) |
|
if channel_name == "sensor_data": |
|
print( |
|
f"Proxy NOTIFY: [{channel_name}] -> sensor value {int.from_bytes(msg, 'little')}") |
|
sleep(1) |
|
pub_start_measurement(ser, True)
|
|
|