hil2.commands
1from typing import Optional 2 3import time 4import logging 5 6import cantools.database.can.database as cantools_db 7import serial 8 9from . import can_helper 10from . import hil_errors 11from . import serial_helper 12 13 14# Command constants -------------------------------------------------------------------# 15# fmt: off 16READ_ID = 0 # command -> SYNC_BYTES (4), READ_ID, id 17WRITE_GPIO = 1 # command, pin, value -> [] 18HIZ_GPIO = 2 # command, pin -> [] 19READ_GPIO = 3 # command, pin -> READ_GPIO, value 20WRITE_DAC = 4 # command, pin/offset, value -> [] 21HIZ_DAC = 5 # command, pin/offset -> [] 22READ_ADC = 6 # command, pin -> READ_ADC, value high, value low 23WRITE_POT = 7 # command, pin/offset, value -> [] 24SEND_CAN = 8 # command, bus, signal bytes: 3-0, length, data (8 bytes) -> [] 25RECV_CAN = 9 # <async> -> RECV_CAN, bus, signal bytes: 3-0, 26 # length, data (length bytes) 27ERROR = 10 # <async/any> -> ERROR, command 28# fmt: on 29 30READ_ID_RESPONSE_LENGTH = 6 # SYNC_BYTES (4) + READ_ID (1) + ID (1) 31SYNC_BYTES = [0xDE, 0xAD, 0xBE, 0xEF] 32 33SERIAL_RESPONSES = [READ_ID, READ_GPIO, READ_ADC, RECV_CAN, ERROR] 34 35 36# Simple commands ---------------------------------------------------------------------# 37def read_id( 38 ser_raw: serial.Serial, response_wait: float 39) -> Optional[tuple[int, list[int]]]: 40 """ 41 Attempts to connect and read the HIL ID from a device. 42 Sends a READ_ID command and waits for a response. 43 44 :param ser_raw: The raw serial connection to use (raw Serial object). 45 :return: The HIL ID and any bytes received after the ID, or None if not a HIL device. 46 """ 47 command = [READ_ID] 48 logging.debug(f"Sending - READ_ID: {command}") 49 ser_raw.reset_input_buffer() 50 ser_raw.write(bytearray(command)) 51 read_buffer = [] 52 53 try: 54 start_time = time.time() 55 56 while time.time() - start_time < response_wait: 57 to_read = max(ser_raw.in_waiting, 1) 58 chunk = ser_raw.read(to_read) 59 if not chunk: 60 continue 61 read_buffer.extend([int(b) for b in chunk]) 62 logging.debug(f"Read buffer: {read_buffer}") 63 64 if len(read_buffer) >= READ_ID_RESPONSE_LENGTH: 65 # Look for SYNC_BYTES in the buffer 66 for i in range(len(read_buffer) - 5): 67 if read_buffer[i : i + 4] == SYNC_BYTES: 68 logging.debug(f"SYNC_BYTES found at position {i}") 69 # Check if the next byte is READ_ID 70 if read_buffer[i + 4] == READ_ID: 71 read_hil_id = read_buffer[i + 5] 72 remaining_bytes = read_buffer[i + 6 :] 73 logging.debug( 74 f"Received - READ_ID: {read_hil_id}," 75 f"remaining bytes: {len(remaining_bytes)}" 76 ) 77 return read_hil_id, remaining_bytes 78 # If SYNC_BYTES not found, remove bytes before the last 3 bytes 79 read_buffer = read_buffer[-3:] 80 except serial.SerialException as e: 81 logging.error(f"Serial exception occurred: {e}") 82 return None 83 84 return None 85 86 87def write_gpio(ser: serial_helper.ThreadedSerial, pin: int, value: bool) -> None: 88 """ 89 Writes a GPIO value to a device. 90 Sends a WRITE_GPIO command with the specified pin and value. 91 92 :param ser: The serial connection to use. 93 :param pin: The GPIO pin number. 94 :param value: The value to write (low = false, high = true). 95 """ 96 command = [WRITE_GPIO, pin, int(value)] 97 logging.debug(f"Sending - WRITE_GPIO: {command}") 98 ser.write(bytearray(command)) 99 100 101def hiZ_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> None: 102 """ 103 Set a GPIO pin to high impedance (HiZ). 104 (This is equivalent to setting the pin as an input.) 105 Sends a HIZ_GPIO command with the specified pin. 106 107 :param ser: The serial connection to use. 108 :param pin: The GPIO pin number. 109 """ 110 command = [HIZ_GPIO, pin] 111 logging.debug(f"Sending - HIZ_GPIO: {command}") 112 ser.write(bytearray(command)) 113 114 115def read_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> bool: 116 """ 117 Reads a GPIO value from a device. 118 Sends a READ_GPIO command with the specified pin and waits for a response. 119 120 :param ser: The serial connection to use. 121 :param pin: The GPIO pin number. 122 :return: The value read from the GPIO pin (low = false, high = true). 123 """ 124 125 command = [READ_GPIO, pin] 126 logging.debug(f"Sending - READ_GPIO: {command}") 127 ser.write(bytearray(command)) 128 match ser.get_readings_with_timeout(READ_GPIO): 129 case None: 130 raise hil_errors.SerialError("Failed to read GPIO value, no response") 131 case [read_value]: 132 logging.debug(f"Received - READ_GPIO: {read_value}") 133 return bool(read_value) 134 case x: 135 error_msg = f"Failed to read GPIO value, expected 1 byte: {x}" 136 raise hil_errors.EngineError(error_msg) 137 138 139def write_dac(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None: 140 """ 141 Writes a DAC value to a device. 142 Sends a WRITE_DAC command with the specified pin and raw value. 143 144 :param ser: The serial connection to use. 145 :param pin: The DAC pin number. 146 :param raw_value: The raw value to write (0-255). 147 """ 148 command = [WRITE_DAC, pin, raw_value] 149 logging.debug(f"Sending - WRITE_DAC: {command}") 150 ser.write(bytearray(command)) 151 152 153def hiZ_dac(ser: serial_helper.ThreadedSerial, pin: int) -> None: 154 """ 155 Sets a DAC pin to high impedance mode. 156 Sends a HIZ_DAC command with the specified pin. 157 158 :param ser: The serial connection to use. 159 :param pin: The DAC pin number. 160 """ 161 command = [HIZ_DAC, pin] 162 logging.debug(f"Sending - HIZ_DAC: {command}") 163 ser.write(bytearray(command)) 164 165 166def read_adc(ser: serial_helper.ThreadedSerial, pin: int) -> int: 167 """ 168 Reads an ADC value from a device. 169 Sends a READ_ADC command with the specified pin and waits for a response. 170 171 :param ser: The serial connection to use. 172 :param pin: The ADC pin number. 173 :return: The raw ADC value read from the specified pin. 174 """ 175 176 command = [READ_ADC, pin] 177 logging.debug(f"Sending - READ_ADC: {command}") 178 ser.write(bytearray(command)) 179 match ser.get_readings_with_timeout(READ_ADC): 180 case None: 181 raise hil_errors.SerialError("Failed to read ADC value, no response") 182 case [read_value_high, read_value_low]: 183 logging.debug(f"Received - READ_ADC: {read_value_high}, {read_value_low}") 184 return (read_value_high << 8) | read_value_low 185 case x: 186 error_msg = f"Failed to read ADC value, expected 2 bytes: {x}" 187 raise hil_errors.EngineError(error_msg) 188 189 190def write_pot(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None: 191 """ 192 Writes a potentiometer value to a device. 193 Sends a WRITE_POT command with the specified pin and raw value. 194 195 :param ser: The serial connection to use. 196 :param pin: The potentiometer pin number. 197 :param raw_value: The raw value to write (0-255). 198 """ 199 command = [WRITE_POT, pin, raw_value] 200 logging.debug(f"Sending - WRITE_POT: {command}") 201 ser.write(bytearray(command)) 202 203 204# CAN commands/parsing ----------------------------------------------------------------# 205def send_can( 206 ser: serial_helper.ThreadedSerial, 207 bus: int, 208 signal: int, 209 data: list[int], 210) -> None: 211 """ 212 Sends a CAN message over the specified bus. 213 214 :param ser: The serial connection to use. 215 :param bus: The CAN bus number. 216 :param signal: The CAN signal ID. 217 :param data: The data to send (up to 8 bytes). When sent, will be padded with zeros. 218 """ 219 signal_3 = (signal >> 24) & 0xFF 220 signal_2 = (signal >> 16) & 0xFF 221 signal_1 = (signal >> 8) & 0xFF 222 signal_0 = signal & 0xFF 223 length = len(data) 224 padding = [0] * (8 - len(data)) 225 command = [ 226 SEND_CAN, 227 bus, 228 signal_3, 229 signal_2, 230 signal_1, 231 signal_0, 232 length, 233 *data, 234 *padding, 235 ] 236 logging.debug(f"Sending - SEND_CAN: {command}") 237 print(f"Sending - SEND_CAN: {command}") 238 ser.write(bytearray(command)) 239 240 241def parse_can_messages( 242 ser: serial_helper.ThreadedSerial, bus: int, can_dbc: cantools_db.Database 243) -> list[can_helper.CanMessage]: 244 """ 245 Parses received CAN messages from the serial connection for the specified bus. 246 247 :param ser: The serial connection to use. 248 :param bus: The CAN bus number. 249 :param can_dbc: The DBC database to use for decoding messages. 250 :return: A list of parsed CAN messages. 251 """ 252 253 def decode(values): 254 signal = ( 255 (values[1] << 24) | (values[2] << 16) | (values[3] << 8) | values[4] 256 ) & 0x1FFFFFFF 257 data = bytes(values[6 : 6 + values[5]]) 258 try: 259 decoded = can_dbc.decode_message(signal, data) 260 return can_helper.CanMessage(signal, decoded) 261 except Exception as e: 262 logging.error(f"Failed to decode CAN message with ID {signal} ({e})") 263 return None 264 265 return [ 266 msg 267 for values in ser.get_parsed_can_messages(bus) 268 if (msg := decode(values)) is not None 269 ] 270 271 272# Serial parsing/spliting -------------------------------------------------------------# 273def parse_readings( 274 readings: list[int], 275 parsed_readings: dict[int, list[int]], 276 parsed_can_messages: dict[int, list[list[int]]], 277) -> tuple[bool, list[int]]: 278 """ 279 Parse the first serial reading if possible. 280 Does not do conversion, just separates and saves the raw bytes for the corresponding 281 reading. 282 283 :param readings: The entire list of readings (bytes received from serial) to parse 284 :param parsed_readings: The dictionary to store parsed readings. 285 :param parsed_can_messages: The dictionary to store parsed CAN messages. 286 :return: A tuple: 287 - A boolean indicating if anything was parsed (and maybe this function 288 should be called again) 289 - The remaining unparsed readings. 290 """ 291 292 logging.debug(f"Current readings to parse: {readings}") 293 match readings: 294 case []: 295 return False, [] 296 case [cmd, value, *rest] if cmd == READ_ID: 297 logging.debug(f"Parsed - READ_ID: {value}") 298 parsed_readings[READ_ID] = [value] 299 return True, rest 300 case [cmd, value, *rest] if cmd == READ_GPIO: 301 logging.debug(f"Parsed - READ_GPIO: {value}") 302 parsed_readings[READ_GPIO] = [value] 303 return True, rest 304 case [cmd, value_high, value_low, *rest] if cmd == READ_ADC: 305 logging.debug(f"Parsed - READ_ADC: {value_high}, {value_low}") 306 parsed_readings[READ_ADC] = [value_high, value_low] 307 return True, rest 308 case [cmd, bus, signal_3, signal_2, signal_1, signal_0, length, *rest] if ( 309 cmd == RECV_CAN and len(rest) >= length 310 ): 311 logging.debug( 312 f"Parsed - RECV_CAN: {bus}, {signal_3}, {signal_2}, {signal_1}, " 313 + f"{signal_0}, {length}, {rest[:length]}" 314 ) 315 data, remaining = rest[:length], rest[length:] 316 if bus not in parsed_can_messages: 317 parsed_can_messages[bus] = [] 318 parsed_can_messages[bus].append( 319 [bus, signal_3, signal_2, signal_1, signal_0, length, *data] 320 ) 321 return True, remaining 322 case [cmd, command, *rest] if cmd == ERROR: 323 logging.critical(f"Parsed - ERROR for command: {command}. Rest={rest}") 324 raise hil_errors.SerialError(f"HIL reported error for command {command}") 325 case [first, *rest] if first not in SERIAL_RESPONSES: 326 logging.critical(f"Unexpected response: {first}. Rest={rest}") 327 raise hil_errors.SerialError(f"Unexpected response. Command error: {first}") 328 case _: 329 return False, readings
38def read_id( 39 ser_raw: serial.Serial, response_wait: float 40) -> Optional[tuple[int, list[int]]]: 41 """ 42 Attempts to connect and read the HIL ID from a device. 43 Sends a READ_ID command and waits for a response. 44 45 :param ser_raw: The raw serial connection to use (raw Serial object). 46 :return: The HIL ID and any bytes received after the ID, or None if not a HIL device. 47 """ 48 command = [READ_ID] 49 logging.debug(f"Sending - READ_ID: {command}") 50 ser_raw.reset_input_buffer() 51 ser_raw.write(bytearray(command)) 52 read_buffer = [] 53 54 try: 55 start_time = time.time() 56 57 while time.time() - start_time < response_wait: 58 to_read = max(ser_raw.in_waiting, 1) 59 chunk = ser_raw.read(to_read) 60 if not chunk: 61 continue 62 read_buffer.extend([int(b) for b in chunk]) 63 logging.debug(f"Read buffer: {read_buffer}") 64 65 if len(read_buffer) >= READ_ID_RESPONSE_LENGTH: 66 # Look for SYNC_BYTES in the buffer 67 for i in range(len(read_buffer) - 5): 68 if read_buffer[i : i + 4] == SYNC_BYTES: 69 logging.debug(f"SYNC_BYTES found at position {i}") 70 # Check if the next byte is READ_ID 71 if read_buffer[i + 4] == READ_ID: 72 read_hil_id = read_buffer[i + 5] 73 remaining_bytes = read_buffer[i + 6 :] 74 logging.debug( 75 f"Received - READ_ID: {read_hil_id}," 76 f"remaining bytes: {len(remaining_bytes)}" 77 ) 78 return read_hil_id, remaining_bytes 79 # If SYNC_BYTES not found, remove bytes before the last 3 bytes 80 read_buffer = read_buffer[-3:] 81 except serial.SerialException as e: 82 logging.error(f"Serial exception occurred: {e}") 83 return None 84 85 return None
Attempts to connect and read the HIL ID from a device. Sends a READ_ID command and waits for a response.
Parameters
- ser_raw: The raw serial connection to use (raw Serial object).
Returns
The HIL ID and any bytes received after the ID, or None if not a HIL device.
88def write_gpio(ser: serial_helper.ThreadedSerial, pin: int, value: bool) -> None: 89 """ 90 Writes a GPIO value to a device. 91 Sends a WRITE_GPIO command with the specified pin and value. 92 93 :param ser: The serial connection to use. 94 :param pin: The GPIO pin number. 95 :param value: The value to write (low = false, high = true). 96 """ 97 command = [WRITE_GPIO, pin, int(value)] 98 logging.debug(f"Sending - WRITE_GPIO: {command}") 99 ser.write(bytearray(command))
Writes a GPIO value to a device. Sends a WRITE_GPIO command with the specified pin and value.
Parameters
- ser: The serial connection to use.
- pin: The GPIO pin number.
- value: The value to write (low = false, high = true).
102def hiZ_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> None: 103 """ 104 Set a GPIO pin to high impedance (HiZ). 105 (This is equivalent to setting the pin as an input.) 106 Sends a HIZ_GPIO command with the specified pin. 107 108 :param ser: The serial connection to use. 109 :param pin: The GPIO pin number. 110 """ 111 command = [HIZ_GPIO, pin] 112 logging.debug(f"Sending - HIZ_GPIO: {command}") 113 ser.write(bytearray(command))
Set a GPIO pin to high impedance (HiZ). (This is equivalent to setting the pin as an input.) Sends a HIZ_GPIO command with the specified pin.
Parameters
- ser: The serial connection to use.
- pin: The GPIO pin number.
116def read_gpio(ser: serial_helper.ThreadedSerial, pin: int) -> bool: 117 """ 118 Reads a GPIO value from a device. 119 Sends a READ_GPIO command with the specified pin and waits for a response. 120 121 :param ser: The serial connection to use. 122 :param pin: The GPIO pin number. 123 :return: The value read from the GPIO pin (low = false, high = true). 124 """ 125 126 command = [READ_GPIO, pin] 127 logging.debug(f"Sending - READ_GPIO: {command}") 128 ser.write(bytearray(command)) 129 match ser.get_readings_with_timeout(READ_GPIO): 130 case None: 131 raise hil_errors.SerialError("Failed to read GPIO value, no response") 132 case [read_value]: 133 logging.debug(f"Received - READ_GPIO: {read_value}") 134 return bool(read_value) 135 case x: 136 error_msg = f"Failed to read GPIO value, expected 1 byte: {x}" 137 raise hil_errors.EngineError(error_msg)
Reads a GPIO value from a device. Sends a READ_GPIO command with the specified pin and waits for a response.
Parameters
- ser: The serial connection to use.
- pin: The GPIO pin number.
Returns
The value read from the GPIO pin (low = false, high = true).
140def write_dac(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None: 141 """ 142 Writes a DAC value to a device. 143 Sends a WRITE_DAC command with the specified pin and raw value. 144 145 :param ser: The serial connection to use. 146 :param pin: The DAC pin number. 147 :param raw_value: The raw value to write (0-255). 148 """ 149 command = [WRITE_DAC, pin, raw_value] 150 logging.debug(f"Sending - WRITE_DAC: {command}") 151 ser.write(bytearray(command))
Writes a DAC value to a device. Sends a WRITE_DAC command with the specified pin and raw value.
Parameters
- ser: The serial connection to use.
- pin: The DAC pin number.
- raw_value: The raw value to write (0-255).
154def hiZ_dac(ser: serial_helper.ThreadedSerial, pin: int) -> None: 155 """ 156 Sets a DAC pin to high impedance mode. 157 Sends a HIZ_DAC command with the specified pin. 158 159 :param ser: The serial connection to use. 160 :param pin: The DAC pin number. 161 """ 162 command = [HIZ_DAC, pin] 163 logging.debug(f"Sending - HIZ_DAC: {command}") 164 ser.write(bytearray(command))
Sets a DAC pin to high impedance mode. Sends a HIZ_DAC command with the specified pin.
Parameters
- ser: The serial connection to use.
- pin: The DAC pin number.
167def read_adc(ser: serial_helper.ThreadedSerial, pin: int) -> int: 168 """ 169 Reads an ADC value from a device. 170 Sends a READ_ADC command with the specified pin and waits for a response. 171 172 :param ser: The serial connection to use. 173 :param pin: The ADC pin number. 174 :return: The raw ADC value read from the specified pin. 175 """ 176 177 command = [READ_ADC, pin] 178 logging.debug(f"Sending - READ_ADC: {command}") 179 ser.write(bytearray(command)) 180 match ser.get_readings_with_timeout(READ_ADC): 181 case None: 182 raise hil_errors.SerialError("Failed to read ADC value, no response") 183 case [read_value_high, read_value_low]: 184 logging.debug(f"Received - READ_ADC: {read_value_high}, {read_value_low}") 185 return (read_value_high << 8) | read_value_low 186 case x: 187 error_msg = f"Failed to read ADC value, expected 2 bytes: {x}" 188 raise hil_errors.EngineError(error_msg)
Reads an ADC value from a device. Sends a READ_ADC command with the specified pin and waits for a response.
Parameters
- ser: The serial connection to use.
- pin: The ADC pin number.
Returns
The raw ADC value read from the specified pin.
191def write_pot(ser: serial_helper.ThreadedSerial, pin: int, raw_value: int) -> None: 192 """ 193 Writes a potentiometer value to a device. 194 Sends a WRITE_POT command with the specified pin and raw value. 195 196 :param ser: The serial connection to use. 197 :param pin: The potentiometer pin number. 198 :param raw_value: The raw value to write (0-255). 199 """ 200 command = [WRITE_POT, pin, raw_value] 201 logging.debug(f"Sending - WRITE_POT: {command}") 202 ser.write(bytearray(command))
Writes a potentiometer value to a device. Sends a WRITE_POT command with the specified pin and raw value.
Parameters
- ser: The serial connection to use.
- pin: The potentiometer pin number.
- raw_value: The raw value to write (0-255).
206def send_can( 207 ser: serial_helper.ThreadedSerial, 208 bus: int, 209 signal: int, 210 data: list[int], 211) -> None: 212 """ 213 Sends a CAN message over the specified bus. 214 215 :param ser: The serial connection to use. 216 :param bus: The CAN bus number. 217 :param signal: The CAN signal ID. 218 :param data: The data to send (up to 8 bytes). When sent, will be padded with zeros. 219 """ 220 signal_3 = (signal >> 24) & 0xFF 221 signal_2 = (signal >> 16) & 0xFF 222 signal_1 = (signal >> 8) & 0xFF 223 signal_0 = signal & 0xFF 224 length = len(data) 225 padding = [0] * (8 - len(data)) 226 command = [ 227 SEND_CAN, 228 bus, 229 signal_3, 230 signal_2, 231 signal_1, 232 signal_0, 233 length, 234 *data, 235 *padding, 236 ] 237 logging.debug(f"Sending - SEND_CAN: {command}") 238 print(f"Sending - SEND_CAN: {command}") 239 ser.write(bytearray(command))
Sends a CAN message over the specified bus.
Parameters
- ser: The serial connection to use.
- bus: The CAN bus number.
- signal: The CAN signal ID.
- data: The data to send (up to 8 bytes). When sent, will be padded with zeros.
242def parse_can_messages( 243 ser: serial_helper.ThreadedSerial, bus: int, can_dbc: cantools_db.Database 244) -> list[can_helper.CanMessage]: 245 """ 246 Parses received CAN messages from the serial connection for the specified bus. 247 248 :param ser: The serial connection to use. 249 :param bus: The CAN bus number. 250 :param can_dbc: The DBC database to use for decoding messages. 251 :return: A list of parsed CAN messages. 252 """ 253 254 def decode(values): 255 signal = ( 256 (values[1] << 24) | (values[2] << 16) | (values[3] << 8) | values[4] 257 ) & 0x1FFFFFFF 258 data = bytes(values[6 : 6 + values[5]]) 259 try: 260 decoded = can_dbc.decode_message(signal, data) 261 return can_helper.CanMessage(signal, decoded) 262 except Exception as e: 263 logging.error(f"Failed to decode CAN message with ID {signal} ({e})") 264 return None 265 266 return [ 267 msg 268 for values in ser.get_parsed_can_messages(bus) 269 if (msg := decode(values)) is not None 270 ]
Parses received CAN messages from the serial connection for the specified bus.
Parameters
- ser: The serial connection to use.
- bus: The CAN bus number.
- can_dbc: The DBC database to use for decoding messages.
Returns
A list of parsed CAN messages.
274def parse_readings( 275 readings: list[int], 276 parsed_readings: dict[int, list[int]], 277 parsed_can_messages: dict[int, list[list[int]]], 278) -> tuple[bool, list[int]]: 279 """ 280 Parse the first serial reading if possible. 281 Does not do conversion, just separates and saves the raw bytes for the corresponding 282 reading. 283 284 :param readings: The entire list of readings (bytes received from serial) to parse 285 :param parsed_readings: The dictionary to store parsed readings. 286 :param parsed_can_messages: The dictionary to store parsed CAN messages. 287 :return: A tuple: 288 - A boolean indicating if anything was parsed (and maybe this function 289 should be called again) 290 - The remaining unparsed readings. 291 """ 292 293 logging.debug(f"Current readings to parse: {readings}") 294 match readings: 295 case []: 296 return False, [] 297 case [cmd, value, *rest] if cmd == READ_ID: 298 logging.debug(f"Parsed - READ_ID: {value}") 299 parsed_readings[READ_ID] = [value] 300 return True, rest 301 case [cmd, value, *rest] if cmd == READ_GPIO: 302 logging.debug(f"Parsed - READ_GPIO: {value}") 303 parsed_readings[READ_GPIO] = [value] 304 return True, rest 305 case [cmd, value_high, value_low, *rest] if cmd == READ_ADC: 306 logging.debug(f"Parsed - READ_ADC: {value_high}, {value_low}") 307 parsed_readings[READ_ADC] = [value_high, value_low] 308 return True, rest 309 case [cmd, bus, signal_3, signal_2, signal_1, signal_0, length, *rest] if ( 310 cmd == RECV_CAN and len(rest) >= length 311 ): 312 logging.debug( 313 f"Parsed - RECV_CAN: {bus}, {signal_3}, {signal_2}, {signal_1}, " 314 + f"{signal_0}, {length}, {rest[:length]}" 315 ) 316 data, remaining = rest[:length], rest[length:] 317 if bus not in parsed_can_messages: 318 parsed_can_messages[bus] = [] 319 parsed_can_messages[bus].append( 320 [bus, signal_3, signal_2, signal_1, signal_0, length, *data] 321 ) 322 return True, remaining 323 case [cmd, command, *rest] if cmd == ERROR: 324 logging.critical(f"Parsed - ERROR for command: {command}. Rest={rest}") 325 raise hil_errors.SerialError(f"HIL reported error for command {command}") 326 case [first, *rest] if first not in SERIAL_RESPONSES: 327 logging.critical(f"Unexpected response: {first}. Rest={rest}") 328 raise hil_errors.SerialError(f"Unexpected response. Command error: {first}") 329 case _: 330 return False, readings
Parse the first serial reading if possible. Does not do conversion, just separates and saves the raw bytes for the corresponding reading.
Parameters
- readings: The entire list of readings (bytes received from serial) to parse
- parsed_readings: The dictionary to store parsed readings.
- parsed_can_messages: The dictionary to store parsed CAN messages.
Returns
A tuple: - A boolean indicating if anything was parsed (and maybe this function should be called again) - The remaining unparsed readings.