hil2.serial_helper
1from typing import Optional 2 3import logging 4import threading 5import time 6 7import serial 8import serial.tools.list_ports 9 10from . import commands 11from . import hil_errors 12 13SERIAL_BAUDRATE = 115200 14SERIAL_TIMEOUT = 0.1 15SERIAL_RETRIES = 5 16RESPONSE_WAIT = 0.2 17 18GET_TIMEOUT = 0.1 19SLEEP_INTERVAL = 0.01 20 21 22# Discover ----------------------------------------------------------------------------# 23def discover_devices(hil_ids: list[int]) -> dict[int, tuple[serial.Serial, list[int]]]: 24 """ 25 Attempts to find HIL devices by sending an identification command to each serial 26 port. 27 28 :param hil_ids: A list of expected HIL device IDs 29 :return: A dictionary mapping discovered HIL device IDs to their serial connections 30 and any extra readings (that need to be parsed) 31 """ 32 33 devices = {} 34 35 com_ports = [ 36 port.device 37 for port in serial.tools.list_ports.comports() 38 if "USB Serial" in port.description 39 ] 40 41 # For every serial port, try to see if it is a HIL device 42 for cp in com_ports: 43 logging.debug(f"Trying to discover HIL device on port {cp}") 44 serial_con = serial.Serial( 45 cp, 46 SERIAL_BAUDRATE, 47 timeout=SERIAL_TIMEOUT, 48 bytesize=serial.EIGHTBITS, 49 parity=serial.PARITY_NONE, 50 stopbits=serial.STOPBITS_ONE, 51 xonxoff=0, 52 rtscts=0, 53 ) 54 serial_con.dtr = False 55 time.sleep(1) 56 serial_con.reset_input_buffer() 57 serial_con.dtr = True 58 59 # Need to give a little time 60 for _ in range(SERIAL_RETRIES): 61 match commands.read_id(serial_con, RESPONSE_WAIT): 62 case None: 63 pass 64 case (read_hil_id, extra_readings) if read_hil_id in hil_ids: 65 devices[read_hil_id] = (serial_con, extra_readings) 66 logging.info( 67 f"Discovered HIL device with ID {read_hil_id} on port {cp}" 68 ) 69 break 70 case _: 71 logging.debug(f"Found non-matching HIL ID on port {cp}") 72 time.sleep(1) 73 else: 74 # If it is not a HIL device, close it 75 serial_con.close() 76 77 # Check we found all devices 78 for hil_id in hil_ids: 79 if hil_id not in devices: 80 error_msg = f"Failed to discover HIL device with ID {hil_id} on any port" 81 raise hil_errors.SerialError(error_msg) 82 83 return devices 84 85 86# Threaded serial ---------------------------------------------------------------------# 87class ThreadedSerial: 88 """ 89 A class that handles serial communication in a separate thread. 90 This is needed because CAN messages are received asynchronously as opposed to 91 command and response. 92 """ 93 94 def __init__( 95 self, 96 serial_con: serial.Serial, 97 readings: list[int], 98 stop_event: threading.Event, 99 ): 100 """ 101 :param serial_con: The serial connection to the HIL device 102 :param stop_event: The event used to signal the thread to stop 103 """ 104 self.serial_con: serial.Serial = serial_con 105 self.stop_event: threading.Event = stop_event 106 107 # Raw readings from the serial port. Some bytes may have been already read when 108 # discovering the device. 109 self.readings: list[int] = readings 110 111 # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the 112 # list of bytes 113 self.parsed_readings: dict[int, list[int]] = {} 114 # Parsed CAN messages. The key is the bus number, the value is a list of the 115 # list of bytes for each message 116 self.parsed_can_messages: dict[int, list[list[int]]] = {} 117 118 # Lock for synchronizing access to shared resources 119 self.lock = threading.Lock() 120 121 if len(self.readings) > 0: 122 # If there are any initial readings, try to process them 123 with self.lock: 124 self._process_readings() 125 126 def write(self, data: bytes) -> None: 127 """ 128 Write data to the serial port. Safe to be called from another thread. 129 130 :param data: The data to write to the serial port 131 """ 132 self.serial_con.write(data) 133 134 def _read(self): 135 """ 136 Attempt to read a single byte from the serial port. 137 """ 138 read_data = self.serial_con.read(1) 139 if len(read_data) < 1: 140 return 141 value = int.from_bytes(read_data, "big") 142 self.readings.append(value) 143 144 def _process_readings(self): 145 """ 146 Attempt to process read bytes. 147 """ 148 processed = True 149 while processed: 150 # If something was processed, try to process again 151 processed, self.readings = commands.parse_readings( 152 self.readings, self.parsed_readings, self.parsed_can_messages 153 ) 154 155 def _get_readings(self, command: int) -> Optional[list[int]]: 156 """ 157 Get the readings for a specific command. Safe to be called from a different thread. 158 159 :param command: The command to get readings for (used as key) 160 :return: The readings for the command, or None if not found 161 """ 162 with self.lock: 163 val = self.parsed_readings.pop(command, None) 164 return val 165 166 def get_readings_with_timeout( 167 self, 168 command: int, 169 timeout: float = GET_TIMEOUT, 170 sleep_interval: float = SLEEP_INTERVAL, 171 ) -> Optional[list[int]]: 172 """ 173 Get the readings for a command, with a delay. 174 Retries the reading at regular intervals until the timeout is reached. 175 Safe to be called from a different thread. 176 177 :param command: The command to get readings for (used as key) 178 :param timeout: The maximum time to wait for readings (seconds) 179 :param sleep_interval: The time to wait between retries (seconds) 180 :return: The readings for the command, or None if not found 181 """ 182 183 deadline = time.time() + timeout 184 while time.time() < deadline: 185 if (reading := self._get_readings(command)) is not None: 186 return reading 187 time.sleep(sleep_interval) 188 return None 189 190 def get_parsed_can_messages(self, bus: int) -> list[list[int]]: 191 """ 192 Get the parsed CAN messages for a specific bus. 193 Safe to be called from a different thread. 194 195 :param bus: The bus number to get messages for 196 :return: A list of parsed (but not decoded) CAN messages for the bus 197 """ 198 with self.lock: 199 return self.parsed_can_messages.pop(bus, []) 200 201 def stop(self): 202 """ 203 Stop the serial helper thread. 204 Safe to be called from a different thread. 205 """ 206 self.stop_event.set() 207 208 def _close(self): 209 """ 210 Close the serial connection. 211 """ 212 self.serial_con.close() 213 214 def run(self): 215 """ 216 Run the serial helper thread. 217 Constantly tries to read from the serial port and process the readings. 218 Should be run in a separate thread. 219 """ 220 while not self.stop_event.is_set(): 221 self._read() 222 if len(self.readings) > 0: 223 with self.lock: 224 self._process_readings() 225 226 self._close()
24def discover_devices(hil_ids: list[int]) -> dict[int, tuple[serial.Serial, list[int]]]: 25 """ 26 Attempts to find HIL devices by sending an identification command to each serial 27 port. 28 29 :param hil_ids: A list of expected HIL device IDs 30 :return: A dictionary mapping discovered HIL device IDs to their serial connections 31 and any extra readings (that need to be parsed) 32 """ 33 34 devices = {} 35 36 com_ports = [ 37 port.device 38 for port in serial.tools.list_ports.comports() 39 if "USB Serial" in port.description 40 ] 41 42 # For every serial port, try to see if it is a HIL device 43 for cp in com_ports: 44 logging.debug(f"Trying to discover HIL device on port {cp}") 45 serial_con = serial.Serial( 46 cp, 47 SERIAL_BAUDRATE, 48 timeout=SERIAL_TIMEOUT, 49 bytesize=serial.EIGHTBITS, 50 parity=serial.PARITY_NONE, 51 stopbits=serial.STOPBITS_ONE, 52 xonxoff=0, 53 rtscts=0, 54 ) 55 serial_con.dtr = False 56 time.sleep(1) 57 serial_con.reset_input_buffer() 58 serial_con.dtr = True 59 60 # Need to give a little time 61 for _ in range(SERIAL_RETRIES): 62 match commands.read_id(serial_con, RESPONSE_WAIT): 63 case None: 64 pass 65 case (read_hil_id, extra_readings) if read_hil_id in hil_ids: 66 devices[read_hil_id] = (serial_con, extra_readings) 67 logging.info( 68 f"Discovered HIL device with ID {read_hil_id} on port {cp}" 69 ) 70 break 71 case _: 72 logging.debug(f"Found non-matching HIL ID on port {cp}") 73 time.sleep(1) 74 else: 75 # If it is not a HIL device, close it 76 serial_con.close() 77 78 # Check we found all devices 79 for hil_id in hil_ids: 80 if hil_id not in devices: 81 error_msg = f"Failed to discover HIL device with ID {hil_id} on any port" 82 raise hil_errors.SerialError(error_msg) 83 84 return devices
Attempts to find HIL devices by sending an identification command to each serial port.
Parameters
- hil_ids: A list of expected HIL device IDs
Returns
A dictionary mapping discovered HIL device IDs to their serial connections and any extra readings (that need to be parsed)
88class ThreadedSerial: 89 """ 90 A class that handles serial communication in a separate thread. 91 This is needed because CAN messages are received asynchronously as opposed to 92 command and response. 93 """ 94 95 def __init__( 96 self, 97 serial_con: serial.Serial, 98 readings: list[int], 99 stop_event: threading.Event, 100 ): 101 """ 102 :param serial_con: The serial connection to the HIL device 103 :param stop_event: The event used to signal the thread to stop 104 """ 105 self.serial_con: serial.Serial = serial_con 106 self.stop_event: threading.Event = stop_event 107 108 # Raw readings from the serial port. Some bytes may have been already read when 109 # discovering the device. 110 self.readings: list[int] = readings 111 112 # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the 113 # list of bytes 114 self.parsed_readings: dict[int, list[int]] = {} 115 # Parsed CAN messages. The key is the bus number, the value is a list of the 116 # list of bytes for each message 117 self.parsed_can_messages: dict[int, list[list[int]]] = {} 118 119 # Lock for synchronizing access to shared resources 120 self.lock = threading.Lock() 121 122 if len(self.readings) > 0: 123 # If there are any initial readings, try to process them 124 with self.lock: 125 self._process_readings() 126 127 def write(self, data: bytes) -> None: 128 """ 129 Write data to the serial port. Safe to be called from another thread. 130 131 :param data: The data to write to the serial port 132 """ 133 self.serial_con.write(data) 134 135 def _read(self): 136 """ 137 Attempt to read a single byte from the serial port. 138 """ 139 read_data = self.serial_con.read(1) 140 if len(read_data) < 1: 141 return 142 value = int.from_bytes(read_data, "big") 143 self.readings.append(value) 144 145 def _process_readings(self): 146 """ 147 Attempt to process read bytes. 148 """ 149 processed = True 150 while processed: 151 # If something was processed, try to process again 152 processed, self.readings = commands.parse_readings( 153 self.readings, self.parsed_readings, self.parsed_can_messages 154 ) 155 156 def _get_readings(self, command: int) -> Optional[list[int]]: 157 """ 158 Get the readings for a specific command. Safe to be called from a different thread. 159 160 :param command: The command to get readings for (used as key) 161 :return: The readings for the command, or None if not found 162 """ 163 with self.lock: 164 val = self.parsed_readings.pop(command, None) 165 return val 166 167 def get_readings_with_timeout( 168 self, 169 command: int, 170 timeout: float = GET_TIMEOUT, 171 sleep_interval: float = SLEEP_INTERVAL, 172 ) -> Optional[list[int]]: 173 """ 174 Get the readings for a command, with a delay. 175 Retries the reading at regular intervals until the timeout is reached. 176 Safe to be called from a different thread. 177 178 :param command: The command to get readings for (used as key) 179 :param timeout: The maximum time to wait for readings (seconds) 180 :param sleep_interval: The time to wait between retries (seconds) 181 :return: The readings for the command, or None if not found 182 """ 183 184 deadline = time.time() + timeout 185 while time.time() < deadline: 186 if (reading := self._get_readings(command)) is not None: 187 return reading 188 time.sleep(sleep_interval) 189 return None 190 191 def get_parsed_can_messages(self, bus: int) -> list[list[int]]: 192 """ 193 Get the parsed CAN messages for a specific bus. 194 Safe to be called from a different thread. 195 196 :param bus: The bus number to get messages for 197 :return: A list of parsed (but not decoded) CAN messages for the bus 198 """ 199 with self.lock: 200 return self.parsed_can_messages.pop(bus, []) 201 202 def stop(self): 203 """ 204 Stop the serial helper thread. 205 Safe to be called from a different thread. 206 """ 207 self.stop_event.set() 208 209 def _close(self): 210 """ 211 Close the serial connection. 212 """ 213 self.serial_con.close() 214 215 def run(self): 216 """ 217 Run the serial helper thread. 218 Constantly tries to read from the serial port and process the readings. 219 Should be run in a separate thread. 220 """ 221 while not self.stop_event.is_set(): 222 self._read() 223 if len(self.readings) > 0: 224 with self.lock: 225 self._process_readings() 226 227 self._close()
A class that handles serial communication in a separate thread. This is needed because CAN messages are received asynchronously as opposed to command and response.
95 def __init__( 96 self, 97 serial_con: serial.Serial, 98 readings: list[int], 99 stop_event: threading.Event, 100 ): 101 """ 102 :param serial_con: The serial connection to the HIL device 103 :param stop_event: The event used to signal the thread to stop 104 """ 105 self.serial_con: serial.Serial = serial_con 106 self.stop_event: threading.Event = stop_event 107 108 # Raw readings from the serial port. Some bytes may have been already read when 109 # discovering the device. 110 self.readings: list[int] = readings 111 112 # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the 113 # list of bytes 114 self.parsed_readings: dict[int, list[int]] = {} 115 # Parsed CAN messages. The key is the bus number, the value is a list of the 116 # list of bytes for each message 117 self.parsed_can_messages: dict[int, list[list[int]]] = {} 118 119 # Lock for synchronizing access to shared resources 120 self.lock = threading.Lock() 121 122 if len(self.readings) > 0: 123 # If there are any initial readings, try to process them 124 with self.lock: 125 self._process_readings()
Parameters
- serial_con: The serial connection to the HIL device
- stop_event: The event used to signal the thread to stop
127 def write(self, data: bytes) -> None: 128 """ 129 Write data to the serial port. Safe to be called from another thread. 130 131 :param data: The data to write to the serial port 132 """ 133 self.serial_con.write(data)
Write data to the serial port. Safe to be called from another thread.
Parameters
- data: The data to write to the serial port
167 def get_readings_with_timeout( 168 self, 169 command: int, 170 timeout: float = GET_TIMEOUT, 171 sleep_interval: float = SLEEP_INTERVAL, 172 ) -> Optional[list[int]]: 173 """ 174 Get the readings for a command, with a delay. 175 Retries the reading at regular intervals until the timeout is reached. 176 Safe to be called from a different thread. 177 178 :param command: The command to get readings for (used as key) 179 :param timeout: The maximum time to wait for readings (seconds) 180 :param sleep_interval: The time to wait between retries (seconds) 181 :return: The readings for the command, or None if not found 182 """ 183 184 deadline = time.time() + timeout 185 while time.time() < deadline: 186 if (reading := self._get_readings(command)) is not None: 187 return reading 188 time.sleep(sleep_interval) 189 return None
Get the readings for a command, with a delay. Retries the reading at regular intervals until the timeout is reached. Safe to be called from a different thread.
Parameters
- command: The command to get readings for (used as key)
- timeout: The maximum time to wait for readings (seconds)
- sleep_interval: The time to wait between retries (seconds)
Returns
The readings for the command, or None if not found
191 def get_parsed_can_messages(self, bus: int) -> list[list[int]]: 192 """ 193 Get the parsed CAN messages for a specific bus. 194 Safe to be called from a different thread. 195 196 :param bus: The bus number to get messages for 197 :return: A list of parsed (but not decoded) CAN messages for the bus 198 """ 199 with self.lock: 200 return self.parsed_can_messages.pop(bus, [])
Get the parsed CAN messages for a specific bus. Safe to be called from a different thread.
Parameters
- bus: The bus number to get messages for
Returns
A list of parsed (but not decoded) CAN messages for the bus
202 def stop(self): 203 """ 204 Stop the serial helper thread. 205 Safe to be called from a different thread. 206 """ 207 self.stop_event.set()
Stop the serial helper thread. Safe to be called from a different thread.
215 def run(self): 216 """ 217 Run the serial helper thread. 218 Constantly tries to read from the serial port and process the readings. 219 Should be run in a separate thread. 220 """ 221 while not self.stop_event.is_set(): 222 self._read() 223 if len(self.readings) > 0: 224 with self.lock: 225 self._process_readings() 226 227 self._close()
Run the serial helper thread. Constantly tries to read from the serial port and process the readings. Should be run in a separate thread.