hil2.commands

  1from typing import Optional
  2
  3import time
  4import logging
  5
  6import cantools.database.can.database as cantools_db
  7import serial
  8
  9from . import can_helper
 10from . import hil_errors
 11from . import serial_helper
 12
 13
 14# Command constants -------------------------------------------------------------------#
 15# fmt: off
 16READ_ID    = 0  # command                    -> SYNC_BYTES (4), READ_ID, id
 17WRITE_GPIO = 1  # command, pin, value        -> []
 18HIZ_GPIO   = 2  # command, pin               -> []
 19READ_GPIO  = 3  # command, pin               -> READ_GPIO, value
 20WRITE_DAC  = 4  # command, pin/offset, value -> []
 21HIZ_DAC    = 5  # command, pin/offset        -> []
 22READ_ADC   = 6  # command, pin               -> READ_ADC, value high, value low
 23WRITE_POT  = 7  # command, pin/offset, value -> []
 24SEND_CAN   = 8  # command, bus, signal bytes: 3-0, length, data (8 bytes) -> []
 25RECV_CAN   = 9  # <async>                    -> RECV_CAN, bus, signal bytes: 3-0,
 26                #                               length, data (length bytes)
 27ERROR      = 10 # <async/any>                -> ERROR, command
 28# fmt: on
 29
 30READ_ID_RESPONSE_LENGTH = 6  # SYNC_BYTES (4) + READ_ID (1) + ID (1)
 31SYNC_BYTES = [0xDE, 0xAD, 0xBE, 0xEF]
 32
 33SERIAL_RESPONSES = [READ_ID, READ_GPIO, READ_ADC, RECV_CAN, ERROR]
 34
 35
 36# Simple commands ---------------------------------------------------------------------#
 37def read_id(
 38    ser_raw: serial.Serial, response_wait: float
 39) -> Optional[tuple[int, list[int]]]:
 40    """
 41    Attempts to connect and read the HIL ID from a device.
 42    Sends a READ_ID command and waits for a response.
 43
 44    :param ser_raw: The raw serial connection to use (raw Serial object).
 45    :return: The HIL ID and any bytes received after the ID, or None if not a HIL device.
 46    """
 47    command = [READ_ID]
 48    logging.debug(f"Sending - READ_ID: {command}")
 49    ser_raw.reset_input_buffer()
 50    ser_raw.write(bytearray(command))
 51    read_buffer = []
 52
 53    try:
 54        start_time = time.time()
 55
 56        while time.time() - start_time < response_wait:
 57            to_read = max(ser_raw.in_waiting, 1)
 58            chunk = ser_raw.read(to_read)
 59            if not chunk:
 60                continue
 61            read_buffer.extend([int(b) for b in chunk])
 62            logging.debug(f"Read buffer: {read_buffer}")
 63
 64            if len(read_buffer) >= READ_ID_RESPONSE_LENGTH:
 65                # Look for SYNC_BYTES in the buffer
 66                for i in range(len(read_buffer) - 5):
 67                    if read_buffer[i : i + 4] == SYNC_BYTES:
 68                        logging.debug(f"SYNC_BYTES found at position {i}")
 69                        # Check if the next byte is READ_ID
 70                        if read_buffer[i + 4] == READ_ID:
 71                            read_hil_id = read_buffer[i + 5]
 72                            remaining_bytes = read_buffer[i + 6 :]
 73                            logging.debug(
 74                                f"Received - READ_ID: {read_hil_id},"
 75                                f"remaining bytes: {len(remaining_bytes)}"
 76                            )
 77                            return read_hil_id, remaining_bytes
 78                # If SYNC_BYTES not found, remove bytes before the last 3 bytes
 79                read_buffer = read_buffer[-3:]
 80    except serial.SerialException as e:
 81        logging.error(f"Serial exception occurred: {e}")
 82        return None
 83    
 84    return None
 85
 86
 87def write_gpio(ser: serial_helper.ThreadedSerial, pin: int, value: bool) -> None:
 88    """
 89    Writes a GPIO value to a device.
 90    Sends a WRITE_GPIO command with the specified pin and value.
 91
 92    :param ser: The serial connection to use.
 93    :param pin: The GPIO pin number.
 94    :param value: The value to write (low = false, high = true).
 95    """
 96    command = [WRITE_GPIO, pin, int(value)]
 97    logging.debug(f"Sending - WRITE_GPIO: {command}")
 98    ser.write(bytearray(command))
 99
100
101def hiZ_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> None:
102    """
103    Set a GPIO pin to high impedance (HiZ).
104    (This is equivalent to setting the pin as an input.)
105    Sends a HIZ_GPIO command with the specified pin.
106
107    :param ser: The serial connection to use.
108    :param pin: The GPIO pin number.
109    """
110    command = [HIZ_GPIO, pin]
111    logging.debug(f"Sending - HIZ_GPIO: {command}")
112    ser.write(bytearray(command))
113
114
115def read_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> bool:
116    """
117    Reads a GPIO value from a device.
118    Sends a READ_GPIO command with the specified pin and waits for a response.
119
120    :param ser: The serial connection to use.
121    :param pin: The GPIO pin number.
122    :return: The value read from the GPIO pin (low = false, high = true).
123    """
124
125    command = [READ_GPIO, pin]
126    logging.debug(f"Sending - READ_GPIO: {command}")
127    ser.write(bytearray(command))
128    match ser.get_readings_with_timeout(READ_GPIO):
129        case None:
130            raise hil_errors.SerialError("Failed to read GPIO value, no response")
131        case [read_value]:
132            logging.debug(f"Received - READ_GPIO: {read_value}")
133            return bool(read_value)
134        case x:
135            error_msg = f"Failed to read GPIO value, expected 1 byte: {x}"
136            raise hil_errors.EngineError(error_msg)
137
138
139def write_dac(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
140    """
141    Writes a DAC value to a device.
142    Sends a WRITE_DAC command with the specified pin and raw value.
143
144    :param ser: The serial connection to use.
145    :param pin: The DAC pin number.
146    :param raw_value: The raw value to write (0-255).
147    """
148    command = [WRITE_DAC, pin, raw_value]
149    logging.debug(f"Sending - WRITE_DAC: {command}")
150    ser.write(bytearray(command))
151
152
153def hiZ_dac(ser: serial_helper.ThreadedSerial, pin: int) -> None:
154    """
155    Sets a DAC pin to high impedance mode.
156    Sends a HIZ_DAC command with the specified pin.
157
158    :param ser: The serial connection to use.
159    :param pin: The DAC pin number.
160    """
161    command = [HIZ_DAC, pin]
162    logging.debug(f"Sending - HIZ_DAC: {command}")
163    ser.write(bytearray(command))
164
165
166def read_adc(ser: serial_helper.ThreadedSerial, pin: int) -> int:
167    """
168    Reads an ADC value from a device.
169    Sends a READ_ADC command with the specified pin and waits for a response.
170
171    :param ser: The serial connection to use.
172    :param pin: The ADC pin number.
173    :return: The raw ADC value read from the specified pin.
174    """
175
176    command = [READ_ADC, pin]
177    logging.debug(f"Sending - READ_ADC: {command}")
178    ser.write(bytearray(command))
179    match ser.get_readings_with_timeout(READ_ADC):
180        case None:
181            raise hil_errors.SerialError("Failed to read ADC value, no response")
182        case [read_value_high, read_value_low]:
183            logging.debug(f"Received - READ_ADC: {read_value_high}, {read_value_low}")
184            return (read_value_high << 8) | read_value_low
185        case x:
186            error_msg = f"Failed to read ADC value, expected 2 bytes: {x}"
187            raise hil_errors.EngineError(error_msg)
188
189
190def write_pot(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
191    """
192    Writes a potentiometer value to a device.
193    Sends a WRITE_POT command with the specified pin and raw value.
194
195    :param ser: The serial connection to use.
196    :param pin: The potentiometer pin number.
197    :param raw_value: The raw value to write (0-255).
198    """
199    command = [WRITE_POT, pin, raw_value]
200    logging.debug(f"Sending - WRITE_POT: {command}")
201    ser.write(bytearray(command))
202
203
204# CAN commands/parsing ----------------------------------------------------------------#
205def send_can(
206    ser: serial_helper.ThreadedSerial,
207    bus: int,
208    signal: int,
209    data: list[int],
210) -> None:
211    """
212    Sends a CAN message over the specified bus.
213
214    :param ser: The serial connection to use.
215    :param bus: The CAN bus number.
216    :param signal: The CAN signal ID.
217    :param data: The data to send (up to 8 bytes). When sent, will be padded with zeros.
218    """
219    signal_3 = (signal >> 24) & 0xFF
220    signal_2 = (signal >> 16) & 0xFF
221    signal_1 = (signal >> 8) & 0xFF
222    signal_0 = signal & 0xFF
223    length = len(data)
224    padding = [0] * (8 - len(data))
225    command = [
226        SEND_CAN,
227        bus,
228        signal_3,
229        signal_2,
230        signal_1,
231        signal_0,
232        length,
233        *data,
234        *padding,
235    ]
236    logging.debug(f"Sending - SEND_CAN: {command}")
237    print(f"Sending - SEND_CAN: {command}")
238    ser.write(bytearray(command))
239
240
241def parse_can_messages(
242    ser: serial_helper.ThreadedSerial, bus: int, can_dbc: cantools_db.Database
243) -> list[can_helper.CanMessage]:
244    """
245    Parses received CAN messages from the serial connection for the specified bus.
246
247    :param ser: The serial connection to use.
248    :param bus: The CAN bus number.
249    :param can_dbc: The DBC database to use for decoding messages.
250    :return: A list of parsed CAN messages.
251    """
252
253    def decode(values):
254        signal = (
255            (values[1] << 24) | (values[2] << 16) | (values[3] << 8) | values[4]
256        ) & 0x1FFFFFFF
257        data = bytes(values[6 : 6 + values[5]])
258        try:
259            decoded = can_dbc.decode_message(signal, data)
260            return can_helper.CanMessage(signal, decoded)
261        except Exception as e:
262            logging.error(f"Failed to decode CAN message with ID {signal} ({e})")
263            return None
264
265    return [
266        msg
267        for values in ser.get_parsed_can_messages(bus)
268        if (msg := decode(values)) is not None
269    ]
270
271
272# Serial parsing/spliting -------------------------------------------------------------#
273def parse_readings(
274    readings: list[int],
275    parsed_readings: dict[int, list[int]],
276    parsed_can_messages: dict[int, list[list[int]]],
277) -> tuple[bool, list[int]]:
278    """
279    Parse the first serial reading if possible.
280    Does not do conversion, just separates and saves the raw bytes for the corresponding
281    reading.
282
283    :param readings: The entire list of readings (bytes received from serial) to parse
284    :param parsed_readings: The dictionary to store parsed readings.
285    :param parsed_can_messages: The dictionary to store parsed CAN messages.
286    :return: A tuple:
287             - A boolean indicating if anything was parsed (and maybe this function
288               should be called again)
289             - The remaining unparsed readings.
290    """
291
292    logging.debug(f"Current readings to parse: {readings}")
293    match readings:
294        case []:
295            return False, []
296        case [cmd, value, *rest] if cmd == READ_ID:
297            logging.debug(f"Parsed - READ_ID: {value}")
298            parsed_readings[READ_ID] = [value]
299            return True, rest
300        case [cmd, value, *rest] if cmd == READ_GPIO:
301            logging.debug(f"Parsed - READ_GPIO: {value}")
302            parsed_readings[READ_GPIO] = [value]
303            return True, rest
304        case [cmd, value_high, value_low, *rest] if cmd == READ_ADC:
305            logging.debug(f"Parsed - READ_ADC: {value_high}, {value_low}")
306            parsed_readings[READ_ADC] = [value_high, value_low]
307            return True, rest
308        case [cmd, bus, signal_3, signal_2, signal_1, signal_0, length, *rest] if (
309            cmd == RECV_CAN and len(rest) >= length
310        ):
311            logging.debug(
312                f"Parsed - RECV_CAN: {bus}, {signal_3}, {signal_2}, {signal_1}, "
313                + f"{signal_0}, {length}, {rest[:length]}"
314            )
315            data, remaining = rest[:length], rest[length:]
316            if bus not in parsed_can_messages:
317                parsed_can_messages[bus] = []
318            parsed_can_messages[bus].append(
319                [bus, signal_3, signal_2, signal_1, signal_0, length, *data]
320            )
321            return True, remaining
322        case [cmd, command, *rest] if cmd == ERROR:
323            logging.critical(f"Parsed - ERROR for command: {command}. Rest={rest}")
324            raise hil_errors.SerialError(f"HIL reported error for command {command}")
325        case [first, *rest] if first not in SERIAL_RESPONSES:
326            logging.critical(f"Unexpected response: {first}. Rest={rest}")
327            raise hil_errors.SerialError(f"Unexpected response. Command error: {first}")
328        case _:
329            return False, readings
READ_ID = 0
WRITE_GPIO = 1
HIZ_GPIO = 2
READ_GPIO = 3
WRITE_DAC = 4
HIZ_DAC = 5
READ_ADC = 6
WRITE_POT = 7
SEND_CAN = 8
RECV_CAN = 9
ERROR = 10
READ_ID_RESPONSE_LENGTH = 6
SYNC_BYTES = [222, 173, 190, 239]
SERIAL_RESPONSES = [0, 3, 6, 9, 10]
def read_id( ser_raw: serial.serialposix.Serial, response_wait: float) -> Optional[tuple[int, list[int]]]:
38def read_id(
39    ser_raw: serial.Serial, response_wait: float
40) -> Optional[tuple[int, list[int]]]:
41    """
42    Attempts to connect and read the HIL ID from a device.
43    Sends a READ_ID command and waits for a response.
44
45    :param ser_raw: The raw serial connection to use (raw Serial object).
46    :return: The HIL ID and any bytes received after the ID, or None if not a HIL device.
47    """
48    command = [READ_ID]
49    logging.debug(f"Sending - READ_ID: {command}")
50    ser_raw.reset_input_buffer()
51    ser_raw.write(bytearray(command))
52    read_buffer = []
53
54    try:
55        start_time = time.time()
56
57        while time.time() - start_time < response_wait:
58            to_read = max(ser_raw.in_waiting, 1)
59            chunk = ser_raw.read(to_read)
60            if not chunk:
61                continue
62            read_buffer.extend([int(b) for b in chunk])
63            logging.debug(f"Read buffer: {read_buffer}")
64
65            if len(read_buffer) >= READ_ID_RESPONSE_LENGTH:
66                # Look for SYNC_BYTES in the buffer
67                for i in range(len(read_buffer) - 5):
68                    if read_buffer[i : i + 4] == SYNC_BYTES:
69                        logging.debug(f"SYNC_BYTES found at position {i}")
70                        # Check if the next byte is READ_ID
71                        if read_buffer[i + 4] == READ_ID:
72                            read_hil_id = read_buffer[i + 5]
73                            remaining_bytes = read_buffer[i + 6 :]
74                            logging.debug(
75                                f"Received - READ_ID: {read_hil_id},"
76                                f"remaining bytes: {len(remaining_bytes)}"
77                            )
78                            return read_hil_id, remaining_bytes
79                # If SYNC_BYTES not found, remove bytes before the last 3 bytes
80                read_buffer = read_buffer[-3:]
81    except serial.SerialException as e:
82        logging.error(f"Serial exception occurred: {e}")
83        return None
84    
85    return None

Attempts to connect and read the HIL ID from a device. Sends a READ_ID command and waits for a response.

Parameters
  • ser_raw: The raw serial connection to use (raw Serial object).
Returns

The HIL ID and any bytes received after the ID, or None if not a HIL device.

def write_gpio(ser: hil2.serial_helper.ThreadedSerial, pin: int, value: bool) -> None:
88def write_gpio(ser: serial_helper.ThreadedSerial, pin: int, value: bool) -> None:
89    """
90    Writes a GPIO value to a device.
91    Sends a WRITE_GPIO command with the specified pin and value.
92
93    :param ser: The serial connection to use.
94    :param pin: The GPIO pin number.
95    :param value: The value to write (low = false, high = true).
96    """
97    command = [WRITE_GPIO, pin, int(value)]
98    logging.debug(f"Sending - WRITE_GPIO: {command}")
99    ser.write(bytearray(command))

Writes a GPIO value to a device. Sends a WRITE_GPIO command with the specified pin and value.

Parameters
  • ser: The serial connection to use.
  • pin: The GPIO pin number.
  • value: The value to write (low = false, high = true).
def hiZ_gpio(ser: hil2.serial_helper.ThreadedSerial, pin: int) -> None:
102def hiZ_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> None:
103    """
104    Set a GPIO pin to high impedance (HiZ).
105    (This is equivalent to setting the pin as an input.)
106    Sends a HIZ_GPIO command with the specified pin.
107
108    :param ser: The serial connection to use.
109    :param pin: The GPIO pin number.
110    """
111    command = [HIZ_GPIO, pin]
112    logging.debug(f"Sending - HIZ_GPIO: {command}")
113    ser.write(bytearray(command))

Set a GPIO pin to high impedance (HiZ). (This is equivalent to setting the pin as an input.) Sends a HIZ_GPIO command with the specified pin.

Parameters
  • ser: The serial connection to use.
  • pin: The GPIO pin number.
def read_gpio(ser: hil2.serial_helper.ThreadedSerial, pin: int) -> bool:
116def read_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> bool:
117    """
118    Reads a GPIO value from a device.
119    Sends a READ_GPIO command with the specified pin and waits for a response.
120
121    :param ser: The serial connection to use.
122    :param pin: The GPIO pin number.
123    :return: The value read from the GPIO pin (low = false, high = true).
124    """
125
126    command = [READ_GPIO, pin]
127    logging.debug(f"Sending - READ_GPIO: {command}")
128    ser.write(bytearray(command))
129    match ser.get_readings_with_timeout(READ_GPIO):
130        case None:
131            raise hil_errors.SerialError("Failed to read GPIO value, no response")
132        case [read_value]:
133            logging.debug(f"Received - READ_GPIO: {read_value}")
134            return bool(read_value)
135        case x:
136            error_msg = f"Failed to read GPIO value, expected 1 byte: {x}"
137            raise hil_errors.EngineError(error_msg)

Reads a GPIO value from a device. Sends a READ_GPIO command with the specified pin and waits for a response.

Parameters
  • ser: The serial connection to use.
  • pin: The GPIO pin number.
Returns

The value read from the GPIO pin (low = false, high = true).

def write_dac(ser: hil2.serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
140def write_dac(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
141    """
142    Writes a DAC value to a device.
143    Sends a WRITE_DAC command with the specified pin and raw value.
144
145    :param ser: The serial connection to use.
146    :param pin: The DAC pin number.
147    :param raw_value: The raw value to write (0-255).
148    """
149    command = [WRITE_DAC, pin, raw_value]
150    logging.debug(f"Sending - WRITE_DAC: {command}")
151    ser.write(bytearray(command))

Writes a DAC value to a device. Sends a WRITE_DAC command with the specified pin and raw value.

Parameters
  • ser: The serial connection to use.
  • pin: The DAC pin number.
  • raw_value: The raw value to write (0-255).
def hiZ_dac(ser: hil2.serial_helper.ThreadedSerial, pin: int) -> None:
154def hiZ_dac(ser: serial_helper.ThreadedSerial, pin: int) -> None:
155    """
156    Sets a DAC pin to high impedance mode.
157    Sends a HIZ_DAC command with the specified pin.
158
159    :param ser: The serial connection to use.
160    :param pin: The DAC pin number.
161    """
162    command = [HIZ_DAC, pin]
163    logging.debug(f"Sending - HIZ_DAC: {command}")
164    ser.write(bytearray(command))

Sets a DAC pin to high impedance mode. Sends a HIZ_DAC command with the specified pin.

Parameters
  • ser: The serial connection to use.
  • pin: The DAC pin number.
def read_adc(ser: hil2.serial_helper.ThreadedSerial, pin: int) -> int:
167def read_adc(ser: serial_helper.ThreadedSerial, pin: int) -> int:
168    """
169    Reads an ADC value from a device.
170    Sends a READ_ADC command with the specified pin and waits for a response.
171
172    :param ser: The serial connection to use.
173    :param pin: The ADC pin number.
174    :return: The raw ADC value read from the specified pin.
175    """
176
177    command = [READ_ADC, pin]
178    logging.debug(f"Sending - READ_ADC: {command}")
179    ser.write(bytearray(command))
180    match ser.get_readings_with_timeout(READ_ADC):
181        case None:
182            raise hil_errors.SerialError("Failed to read ADC value, no response")
183        case [read_value_high, read_value_low]:
184            logging.debug(f"Received - READ_ADC: {read_value_high}, {read_value_low}")
185            return (read_value_high << 8) | read_value_low
186        case x:
187            error_msg = f"Failed to read ADC value, expected 2 bytes: {x}"
188            raise hil_errors.EngineError(error_msg)

Reads an ADC value from a device. Sends a READ_ADC command with the specified pin and waits for a response.

Parameters
  • ser: The serial connection to use.
  • pin: The ADC pin number.
Returns

The raw ADC value read from the specified pin.

def write_pot(ser: hil2.serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
191def write_pot(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None:
192    """
193    Writes a potentiometer value to a device.
194    Sends a WRITE_POT command with the specified pin and raw value.
195
196    :param ser: The serial connection to use.
197    :param pin: The potentiometer pin number.
198    :param raw_value: The raw value to write (0-255).
199    """
200    command = [WRITE_POT, pin, raw_value]
201    logging.debug(f"Sending - WRITE_POT: {command}")
202    ser.write(bytearray(command))

Writes a potentiometer value to a device. Sends a WRITE_POT command with the specified pin and raw value.

Parameters
  • ser: The serial connection to use.
  • pin: The potentiometer pin number.
  • raw_value: The raw value to write (0-255).
def send_can( ser: hil2.serial_helper.ThreadedSerial, bus: int, signal: int, data: list[int]) -> None:
206def send_can(
207    ser: serial_helper.ThreadedSerial,
208    bus: int,
209    signal: int,
210    data: list[int],
211) -> None:
212    """
213    Sends a CAN message over the specified bus.
214
215    :param ser: The serial connection to use.
216    :param bus: The CAN bus number.
217    :param signal: The CAN signal ID.
218    :param data: The data to send (up to 8 bytes). When sent, will be padded with zeros.
219    """
220    signal_3 = (signal >> 24) & 0xFF
221    signal_2 = (signal >> 16) & 0xFF
222    signal_1 = (signal >> 8) & 0xFF
223    signal_0 = signal & 0xFF
224    length = len(data)
225    padding = [0] * (8 - len(data))
226    command = [
227        SEND_CAN,
228        bus,
229        signal_3,
230        signal_2,
231        signal_1,
232        signal_0,
233        length,
234        *data,
235        *padding,
236    ]
237    logging.debug(f"Sending - SEND_CAN: {command}")
238    print(f"Sending - SEND_CAN: {command}")
239    ser.write(bytearray(command))

Sends a CAN message over the specified bus.

Parameters
  • ser: The serial connection to use.
  • bus: The CAN bus number.
  • signal: The CAN signal ID.
  • data: The data to send (up to 8 bytes). When sent, will be padded with zeros.
def parse_can_messages( ser: hil2.serial_helper.ThreadedSerial, bus: int, can_dbc: cantools.database.can.database.Database) -> list[hil2.can_helper.CanMessage]:
242def parse_can_messages(
243    ser: serial_helper.ThreadedSerial, bus: int, can_dbc: cantools_db.Database
244) -> list[can_helper.CanMessage]:
245    """
246    Parses received CAN messages from the serial connection for the specified bus.
247
248    :param ser: The serial connection to use.
249    :param bus: The CAN bus number.
250    :param can_dbc: The DBC database to use for decoding messages.
251    :return: A list of parsed CAN messages.
252    """
253
254    def decode(values):
255        signal = (
256            (values[1] << 24) | (values[2] << 16) | (values[3] << 8) | values[4]
257        ) & 0x1FFFFFFF
258        data = bytes(values[6 : 6 + values[5]])
259        try:
260            decoded = can_dbc.decode_message(signal, data)
261            return can_helper.CanMessage(signal, decoded)
262        except Exception as e:
263            logging.error(f"Failed to decode CAN message with ID {signal} ({e})")
264            return None
265
266    return [
267        msg
268        for values in ser.get_parsed_can_messages(bus)
269        if (msg := decode(values)) is not None
270    ]

Parses received CAN messages from the serial connection for the specified bus.

Parameters
  • ser: The serial connection to use.
  • bus: The CAN bus number.
  • can_dbc: The DBC database to use for decoding messages.
Returns

A list of parsed CAN messages.

def parse_readings( readings: list[int], parsed_readings: dict[int, list[int]], parsed_can_messages: dict[int, list[list[int]]]) -> tuple[bool, list[int]]:
274def parse_readings(
275    readings: list[int],
276    parsed_readings: dict[int, list[int]],
277    parsed_can_messages: dict[int, list[list[int]]],
278) -> tuple[bool, list[int]]:
279    """
280    Parse the first serial reading if possible.
281    Does not do conversion, just separates and saves the raw bytes for the corresponding
282    reading.
283
284    :param readings: The entire list of readings (bytes received from serial) to parse
285    :param parsed_readings: The dictionary to store parsed readings.
286    :param parsed_can_messages: The dictionary to store parsed CAN messages.
287    :return: A tuple:
288             - A boolean indicating if anything was parsed (and maybe this function
289               should be called again)
290             - The remaining unparsed readings.
291    """
292
293    logging.debug(f"Current readings to parse: {readings}")
294    match readings:
295        case []:
296            return False, []
297        case [cmd, value, *rest] if cmd == READ_ID:
298            logging.debug(f"Parsed - READ_ID: {value}")
299            parsed_readings[READ_ID] = [value]
300            return True, rest
301        case [cmd, value, *rest] if cmd == READ_GPIO:
302            logging.debug(f"Parsed - READ_GPIO: {value}")
303            parsed_readings[READ_GPIO] = [value]
304            return True, rest
305        case [cmd, value_high, value_low, *rest] if cmd == READ_ADC:
306            logging.debug(f"Parsed - READ_ADC: {value_high}, {value_low}")
307            parsed_readings[READ_ADC] = [value_high, value_low]
308            return True, rest
309        case [cmd, bus, signal_3, signal_2, signal_1, signal_0, length, *rest] if (
310            cmd == RECV_CAN and len(rest) >= length
311        ):
312            logging.debug(
313                f"Parsed - RECV_CAN: {bus}, {signal_3}, {signal_2}, {signal_1}, "
314                + f"{signal_0}, {length}, {rest[:length]}"
315            )
316            data, remaining = rest[:length], rest[length:]
317            if bus not in parsed_can_messages:
318                parsed_can_messages[bus] = []
319            parsed_can_messages[bus].append(
320                [bus, signal_3, signal_2, signal_1, signal_0, length, *data]
321            )
322            return True, remaining
323        case [cmd, command, *rest] if cmd == ERROR:
324            logging.critical(f"Parsed - ERROR for command: {command}. Rest={rest}")
325            raise hil_errors.SerialError(f"HIL reported error for command {command}")
326        case [first, *rest] if first not in SERIAL_RESPONSES:
327            logging.critical(f"Unexpected response: {first}. Rest={rest}")
328            raise hil_errors.SerialError(f"Unexpected response. Command error: {first}")
329        case _:
330            return False, readings

Parse the first serial reading if possible. Does not do conversion, just separates and saves the raw bytes for the corresponding reading.

Parameters
  • readings: The entire list of readings (bytes received from serial) to parse
  • parsed_readings: The dictionary to store parsed readings.
  • parsed_can_messages: The dictionary to store parsed CAN messages.
Returns

A tuple: - A boolean indicating if anything was parsed (and maybe this function should be called again) - The remaining unparsed readings.