hil2.serial_helper

  1from typing import Optional
  2
  3import logging
  4import threading
  5import time
  6
  7import serial
  8import serial.tools.list_ports
  9
 10from . import commands
 11from . import hil_errors
 12
 13SERIAL_BAUDRATE = 115200
 14SERIAL_TIMEOUT = 0.1
 15SERIAL_RETRIES = 5
 16RESPONSE_WAIT = 0.2
 17
 18GET_TIMEOUT = 0.1
 19SLEEP_INTERVAL = 0.01
 20
 21
 22# Discover ----------------------------------------------------------------------------#
 23def discover_devices(hil_ids: list[int]) -> dict[int, tuple[serial.Serial, list[int]]]:
 24    """
 25    Attempts to find HIL devices by sending an identification command to each serial
 26    port.
 27
 28    :param hil_ids: A list of expected HIL device IDs
 29    :return: A dictionary mapping discovered HIL device IDs to their serial connections
 30             and any extra readings (that need to be parsed)
 31    """
 32
 33    devices = {}
 34
 35    com_ports = [
 36        port.device
 37        for port in serial.tools.list_ports.comports()
 38        if "USB Serial" in port.description
 39    ]
 40
 41    # For every serial port, try to see if it is a HIL device
 42    for cp in com_ports:
 43        logging.debug(f"Trying to discover HIL device on port {cp}")
 44        serial_con = serial.Serial(
 45            cp,
 46            SERIAL_BAUDRATE,
 47            timeout=SERIAL_TIMEOUT,
 48            bytesize=serial.EIGHTBITS,
 49            parity=serial.PARITY_NONE,
 50            stopbits=serial.STOPBITS_ONE,
 51            xonxoff=0,
 52            rtscts=0,
 53        )
 54        serial_con.dtr = False
 55        time.sleep(1)
 56        serial_con.reset_input_buffer()
 57        serial_con.dtr = True
 58
 59        # Need to give a little time
 60        for _ in range(SERIAL_RETRIES):
 61            match commands.read_id(serial_con, RESPONSE_WAIT):
 62                case None:
 63                    pass
 64                case (read_hil_id, extra_readings) if read_hil_id in hil_ids:
 65                    devices[read_hil_id] = (serial_con, extra_readings)
 66                    logging.info(
 67                        f"Discovered HIL device with ID {read_hil_id} on port {cp}"
 68                    )
 69                    break
 70                case _:
 71                    logging.debug(f"Found non-matching HIL ID on port {cp}")
 72            time.sleep(1)
 73        else:
 74            # If it is not a HIL device, close it
 75            serial_con.close()
 76
 77    # Check we found all devices
 78    for hil_id in hil_ids:
 79        if hil_id not in devices:
 80            error_msg = f"Failed to discover HIL device with ID {hil_id} on any port"
 81            raise hil_errors.SerialError(error_msg)
 82
 83    return devices
 84
 85
 86# Threaded serial ---------------------------------------------------------------------#
 87class ThreadedSerial:
 88    """
 89    A class that handles serial communication in a separate thread.
 90    This is needed because CAN messages are received asynchronously as opposed to
 91    command and response.
 92    """
 93
 94    def __init__(
 95        self,
 96        serial_con: serial.Serial,
 97        readings: list[int],
 98        stop_event: threading.Event,
 99    ):
100        """
101        :param serial_con: The serial connection to the HIL device
102        :param stop_event: The event used to signal the thread to stop
103        """
104        self.serial_con: serial.Serial = serial_con
105        self.stop_event: threading.Event = stop_event
106
107        # Raw readings from the serial port. Some bytes may have been already read when
108        # discovering the device.
109        self.readings: list[int] = readings
110
111        # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the
112        # list of bytes
113        self.parsed_readings: dict[int, list[int]] = {}
114        # Parsed CAN messages. The key is the bus number, the value is a list of the
115        # list of bytes for each message
116        self.parsed_can_messages: dict[int, list[list[int]]] = {}
117
118        # Lock for synchronizing access to shared resources
119        self.lock = threading.Lock()
120
121        if len(self.readings) > 0:
122            # If there are any initial readings, try to process them
123            with self.lock:
124                self._process_readings()
125
126    def write(self, data: bytes) -> None:
127        """
128        Write data to the serial port. Safe to be called from another thread.
129
130        :param data: The data to write to the serial port
131        """
132        self.serial_con.write(data)
133
134    def _read(self):
135        """
136        Attempt to read a single byte from the serial port.
137        """
138        read_data = self.serial_con.read(1)
139        if len(read_data) < 1:
140            return
141        value = int.from_bytes(read_data, "big")
142        self.readings.append(value)
143
144    def _process_readings(self):
145        """
146        Attempt to process read bytes.
147        """
148        processed = True
149        while processed:
150            # If something was processed, try to process again
151            processed, self.readings = commands.parse_readings(
152                self.readings, self.parsed_readings, self.parsed_can_messages
153            )
154
155    def _get_readings(self, command: int) -> Optional[list[int]]:
156        """
157        Get the readings for a specific command. Safe to be called from a different thread.
158
159        :param command: The command to get readings for (used as key)
160        :return: The readings for the command, or None if not found
161        """
162        with self.lock:
163            val = self.parsed_readings.pop(command, None)
164            return val
165
166    def get_readings_with_timeout(
167        self,
168        command: int,
169        timeout: float = GET_TIMEOUT,
170        sleep_interval: float = SLEEP_INTERVAL,
171    ) -> Optional[list[int]]:
172        """
173        Get the readings for a command, with a delay.
174        Retries the reading at regular intervals until the timeout is reached.
175        Safe to be called from a different thread.
176
177        :param command: The command to get readings for (used as key)
178        :param timeout: The maximum time to wait for readings (seconds)
179        :param sleep_interval: The time to wait between retries (seconds)
180        :return: The readings for the command, or None if not found
181        """
182
183        deadline = time.time() + timeout
184        while time.time() < deadline:
185            if (reading := self._get_readings(command)) is not None:
186                return reading
187            time.sleep(sleep_interval)
188        return None
189
190    def get_parsed_can_messages(self, bus: int) -> list[list[int]]:
191        """
192        Get the parsed CAN messages for a specific bus.
193        Safe to be called from a different thread.
194
195        :param bus: The bus number to get messages for
196        :return: A list of parsed (but not decoded) CAN messages for the bus
197        """
198        with self.lock:
199            return self.parsed_can_messages.pop(bus, [])
200
201    def stop(self):
202        """
203        Stop the serial helper thread.
204        Safe to be called from a different thread.
205        """
206        self.stop_event.set()
207
208    def _close(self):
209        """
210        Close the serial connection.
211        """
212        self.serial_con.close()
213
214    def run(self):
215        """
216        Run the serial helper thread.
217        Constantly tries to read from the serial port and process the readings.
218        Should be run in a separate thread.
219        """
220        while not self.stop_event.is_set():
221            self._read()
222            if len(self.readings) > 0:
223                with self.lock:
224                    self._process_readings()
225
226        self._close()
SERIAL_BAUDRATE = 115200
SERIAL_TIMEOUT = 0.1
SERIAL_RETRIES = 5
RESPONSE_WAIT = 0.2
GET_TIMEOUT = 0.1
SLEEP_INTERVAL = 0.01
def discover_devices( hil_ids: list[int]) -> dict[int, tuple[serial.serialposix.Serial, list[int]]]:
24def discover_devices(hil_ids: list[int]) -> dict[int, tuple[serial.Serial, list[int]]]:
25    """
26    Attempts to find HIL devices by sending an identification command to each serial
27    port.
28
29    :param hil_ids: A list of expected HIL device IDs
30    :return: A dictionary mapping discovered HIL device IDs to their serial connections
31             and any extra readings (that need to be parsed)
32    """
33
34    devices = {}
35
36    com_ports = [
37        port.device
38        for port in serial.tools.list_ports.comports()
39        if "USB Serial" in port.description
40    ]
41
42    # For every serial port, try to see if it is a HIL device
43    for cp in com_ports:
44        logging.debug(f"Trying to discover HIL device on port {cp}")
45        serial_con = serial.Serial(
46            cp,
47            SERIAL_BAUDRATE,
48            timeout=SERIAL_TIMEOUT,
49            bytesize=serial.EIGHTBITS,
50            parity=serial.PARITY_NONE,
51            stopbits=serial.STOPBITS_ONE,
52            xonxoff=0,
53            rtscts=0,
54        )
55        serial_con.dtr = False
56        time.sleep(1)
57        serial_con.reset_input_buffer()
58        serial_con.dtr = True
59
60        # Need to give a little time
61        for _ in range(SERIAL_RETRIES):
62            match commands.read_id(serial_con, RESPONSE_WAIT):
63                case None:
64                    pass
65                case (read_hil_id, extra_readings) if read_hil_id in hil_ids:
66                    devices[read_hil_id] = (serial_con, extra_readings)
67                    logging.info(
68                        f"Discovered HIL device with ID {read_hil_id} on port {cp}"
69                    )
70                    break
71                case _:
72                    logging.debug(f"Found non-matching HIL ID on port {cp}")
73            time.sleep(1)
74        else:
75            # If it is not a HIL device, close it
76            serial_con.close()
77
78    # Check we found all devices
79    for hil_id in hil_ids:
80        if hil_id not in devices:
81            error_msg = f"Failed to discover HIL device with ID {hil_id} on any port"
82            raise hil_errors.SerialError(error_msg)
83
84    return devices

Attempts to find HIL devices by sending an identification command to each serial port.

Parameters
  • hil_ids: A list of expected HIL device IDs
Returns

A dictionary mapping discovered HIL device IDs to their serial connections and any extra readings (that need to be parsed)

class ThreadedSerial:
 88class ThreadedSerial:
 89    """
 90    A class that handles serial communication in a separate thread.
 91    This is needed because CAN messages are received asynchronously as opposed to
 92    command and response.
 93    """
 94
 95    def __init__(
 96        self,
 97        serial_con: serial.Serial,
 98        readings: list[int],
 99        stop_event: threading.Event,
100    ):
101        """
102        :param serial_con: The serial connection to the HIL device
103        :param stop_event: The event used to signal the thread to stop
104        """
105        self.serial_con: serial.Serial = serial_con
106        self.stop_event: threading.Event = stop_event
107
108        # Raw readings from the serial port. Some bytes may have been already read when
109        # discovering the device.
110        self.readings: list[int] = readings
111
112        # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the
113        # list of bytes
114        self.parsed_readings: dict[int, list[int]] = {}
115        # Parsed CAN messages. The key is the bus number, the value is a list of the
116        # list of bytes for each message
117        self.parsed_can_messages: dict[int, list[list[int]]] = {}
118
119        # Lock for synchronizing access to shared resources
120        self.lock = threading.Lock()
121
122        if len(self.readings) > 0:
123            # If there are any initial readings, try to process them
124            with self.lock:
125                self._process_readings()
126
127    def write(self, data: bytes) -> None:
128        """
129        Write data to the serial port. Safe to be called from another thread.
130
131        :param data: The data to write to the serial port
132        """
133        self.serial_con.write(data)
134
135    def _read(self):
136        """
137        Attempt to read a single byte from the serial port.
138        """
139        read_data = self.serial_con.read(1)
140        if len(read_data) < 1:
141            return
142        value = int.from_bytes(read_data, "big")
143        self.readings.append(value)
144
145    def _process_readings(self):
146        """
147        Attempt to process read bytes.
148        """
149        processed = True
150        while processed:
151            # If something was processed, try to process again
152            processed, self.readings = commands.parse_readings(
153                self.readings, self.parsed_readings, self.parsed_can_messages
154            )
155
156    def _get_readings(self, command: int) -> Optional[list[int]]:
157        """
158        Get the readings for a specific command. Safe to be called from a different thread.
159
160        :param command: The command to get readings for (used as key)
161        :return: The readings for the command, or None if not found
162        """
163        with self.lock:
164            val = self.parsed_readings.pop(command, None)
165            return val
166
167    def get_readings_with_timeout(
168        self,
169        command: int,
170        timeout: float = GET_TIMEOUT,
171        sleep_interval: float = SLEEP_INTERVAL,
172    ) -> Optional[list[int]]:
173        """
174        Get the readings for a command, with a delay.
175        Retries the reading at regular intervals until the timeout is reached.
176        Safe to be called from a different thread.
177
178        :param command: The command to get readings for (used as key)
179        :param timeout: The maximum time to wait for readings (seconds)
180        :param sleep_interval: The time to wait between retries (seconds)
181        :return: The readings for the command, or None if not found
182        """
183
184        deadline = time.time() + timeout
185        while time.time() < deadline:
186            if (reading := self._get_readings(command)) is not None:
187                return reading
188            time.sleep(sleep_interval)
189        return None
190
191    def get_parsed_can_messages(self, bus: int) -> list[list[int]]:
192        """
193        Get the parsed CAN messages for a specific bus.
194        Safe to be called from a different thread.
195
196        :param bus: The bus number to get messages for
197        :return: A list of parsed (but not decoded) CAN messages for the bus
198        """
199        with self.lock:
200            return self.parsed_can_messages.pop(bus, [])
201
202    def stop(self):
203        """
204        Stop the serial helper thread.
205        Safe to be called from a different thread.
206        """
207        self.stop_event.set()
208
209    def _close(self):
210        """
211        Close the serial connection.
212        """
213        self.serial_con.close()
214
215    def run(self):
216        """
217        Run the serial helper thread.
218        Constantly tries to read from the serial port and process the readings.
219        Should be run in a separate thread.
220        """
221        while not self.stop_event.is_set():
222            self._read()
223            if len(self.readings) > 0:
224                with self.lock:
225                    self._process_readings()
226
227        self._close()

A class that handles serial communication in a separate thread. This is needed because CAN messages are received asynchronously as opposed to command and response.

ThreadedSerial( serial_con: serial.serialposix.Serial, readings: list[int], stop_event: threading.Event)
 95    def __init__(
 96        self,
 97        serial_con: serial.Serial,
 98        readings: list[int],
 99        stop_event: threading.Event,
100    ):
101        """
102        :param serial_con: The serial connection to the HIL device
103        :param stop_event: The event used to signal the thread to stop
104        """
105        self.serial_con: serial.Serial = serial_con
106        self.stop_event: threading.Event = stop_event
107
108        # Raw readings from the serial port. Some bytes may have been already read when
109        # discovering the device.
110        self.readings: list[int] = readings
111
112        # Parsed readings. The key is the command (ex: READ_GPIO) and the value is the
113        # list of bytes
114        self.parsed_readings: dict[int, list[int]] = {}
115        # Parsed CAN messages. The key is the bus number, the value is a list of the
116        # list of bytes for each message
117        self.parsed_can_messages: dict[int, list[list[int]]] = {}
118
119        # Lock for synchronizing access to shared resources
120        self.lock = threading.Lock()
121
122        if len(self.readings) > 0:
123            # If there are any initial readings, try to process them
124            with self.lock:
125                self._process_readings()
Parameters
  • serial_con: The serial connection to the HIL device
  • stop_event: The event used to signal the thread to stop
serial_con: serial.serialposix.Serial
stop_event: threading.Event
readings: list[int]
parsed_readings: dict[int, list[int]]
parsed_can_messages: dict[int, list[list[int]]]
lock
def write(self, data: bytes) -> None:
127    def write(self, data: bytes) -> None:
128        """
129        Write data to the serial port. Safe to be called from another thread.
130
131        :param data: The data to write to the serial port
132        """
133        self.serial_con.write(data)

Write data to the serial port. Safe to be called from another thread.

Parameters
  • data: The data to write to the serial port
def get_readings_with_timeout( self, command: int, timeout: float = 0.1, sleep_interval: float = 0.01) -> Optional[list[int]]:
167    def get_readings_with_timeout(
168        self,
169        command: int,
170        timeout: float = GET_TIMEOUT,
171        sleep_interval: float = SLEEP_INTERVAL,
172    ) -> Optional[list[int]]:
173        """
174        Get the readings for a command, with a delay.
175        Retries the reading at regular intervals until the timeout is reached.
176        Safe to be called from a different thread.
177
178        :param command: The command to get readings for (used as key)
179        :param timeout: The maximum time to wait for readings (seconds)
180        :param sleep_interval: The time to wait between retries (seconds)
181        :return: The readings for the command, or None if not found
182        """
183
184        deadline = time.time() + timeout
185        while time.time() < deadline:
186            if (reading := self._get_readings(command)) is not None:
187                return reading
188            time.sleep(sleep_interval)
189        return None

Get the readings for a command, with a delay. Retries the reading at regular intervals until the timeout is reached. Safe to be called from a different thread.

Parameters
  • command: The command to get readings for (used as key)
  • timeout: The maximum time to wait for readings (seconds)
  • sleep_interval: The time to wait between retries (seconds)
Returns

The readings for the command, or None if not found

def get_parsed_can_messages(self, bus: int) -> list[list[int]]:
191    def get_parsed_can_messages(self, bus: int) -> list[list[int]]:
192        """
193        Get the parsed CAN messages for a specific bus.
194        Safe to be called from a different thread.
195
196        :param bus: The bus number to get messages for
197        :return: A list of parsed (but not decoded) CAN messages for the bus
198        """
199        with self.lock:
200            return self.parsed_can_messages.pop(bus, [])

Get the parsed CAN messages for a specific bus. Safe to be called from a different thread.

Parameters
  • bus: The bus number to get messages for
Returns

A list of parsed (but not decoded) CAN messages for the bus

def stop(self):
202    def stop(self):
203        """
204        Stop the serial helper thread.
205        Safe to be called from a different thread.
206        """
207        self.stop_event.set()

Stop the serial helper thread. Safe to be called from a different thread.

def run(self):
215    def run(self):
216        """
217        Run the serial helper thread.
218        Constantly tries to read from the serial port and process the readings.
219        Should be run in a separate thread.
220        """
221        while not self.stop_event.is_set():
222            self._read()
223            if len(self.readings) > 0:
224                with self.lock:
225                    self._process_readings()
226
227        self._close()

Run the serial helper thread. Constantly tries to read from the serial port and process the readings. Should be run in a separate thread.