hil2.test_device

  1from typing import Any, Optional
  2
  3import json
  4import os
  5import threading
  6
  7import cantools.database.can.database as cantools_db
  8
  9from . import action
 10from . import can_helper
 11from . import commands
 12from . import dut_cons
 13from . import hil_errors
 14from . import serial_helper
 15
 16
 17# Peripheral configuration ------------------------------------------------------------#
 18class AdcConfig:
 19    """Configuration for an ADC (Analog-to-Digital Converter)."""
 20
 21    def __init__(self, adc_config: dict):
 22        """
 23        :param adc_config: The ADC configuration dictionary
 24        """
 25        match adc_config:
 26            # PCB
 27            case {
 28                "bit_resolution": br,
 29                "adc_reference_v": ar,
 30                "5v_reference_v": v5r,
 31                "24v_reference_v": v24r,
 32            }:
 33                self.bit_resolution: int = br
 34                self.adc_reference_v: float = ar
 35                self.five_v_reference_v: Optional[float] = v5r
 36                self.twenty_four_v_reference_v: Optional[float] = v24r
 37            # Breadboard
 38            case {
 39                "bit_resolution": br,
 40                "adc_reference_v": ar,
 41            }:
 42                self.bit_resolution: int = br
 43                self.adc_reference_v: float = ar
 44                self.five_v_reference_v: Optional[float] = None
 45                self.twenty_four_v_reference_v: Optional[float] = None
 46            case _:
 47                raise hil_errors.ConfigurationError("Invalid ADC configuration")
 48
 49    def raw_to_v(self, raw_value: int) -> float:
 50        """
 51        Convert a raw ADC value to a voltage.
 52
 53        :param raw_value: The raw ADC value to convert
 54        :return: The converted voltage value
 55        """
 56        if raw_value < 0 or raw_value > (2**self.bit_resolution - 1):
 57            raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range")
 58        return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v
 59
 60    def raw_to_5v(self, raw_value: int) -> float:
 61        """
 62        Convert a raw ADC value to a voltage using the 5V reference.
 63        If on a PCB, AI5 means it when through a voltage divider.
 64
 65        :param raw_value: The raw ADC value to convert
 66        :return: The converted voltage value
 67        """
 68        match self.five_v_reference_v:
 69            case None:
 70                error_msg = "5V reference voltage not configured"
 71                raise hil_errors.ConfigurationError(error_msg)
 72            case v5r:
 73                return (self.raw_to_v(raw_value) / v5r) * 5.0
 74
 75    def raw_to_24v(self, raw_value: int) -> float:
 76        """
 77        Convert a raw ADC value to a voltage using the 24V reference.
 78        If on a PCB, AI24 means it when through a voltage divider.
 79
 80        :param raw_value: The raw ADC value to convert
 81        :return: The converted voltage value
 82        """
 83        match self.twenty_four_v_reference_v:
 84            case None:
 85                error_msg = "24V reference voltage not configured"
 86                raise hil_errors.ConfigurationError(error_msg)
 87            case v24r:
 88                return (self.raw_to_v(raw_value) / v24r) * 24.0
 89
 90
 91class DacConfig:
 92    """Configuration for a DAC (Digital-to-Analog Converter)."""
 93
 94    def __init__(self, dac_config: dict):
 95        """
 96        :param dac_config: The DAC configuration dictionary
 97        """
 98        match dac_config:
 99            case {"bit_resolution": br, "reference_v": rv}:
100                self.bit_resolution = br
101                self.reference_v = rv
102            case _:
103                raise hil_errors.ConfigurationError("Invalid DAC configuration")
104
105    def v_to_raw(self, value: float) -> int:
106        """
107        Convert a voltage value to a raw DAC value.
108
109        :param value: The voltage value to convert
110        :return: The converted raw DAC value
111        """
112        if value < 0 or value > self.reference_v:
113            raise hil_errors.RangeError(f"DAC value {value} out of range")
114        return int((value / self.reference_v) * (2**self.bit_resolution - 1))
115
116
117class PotConfig:
118    """Configuration for a POT (Potentiometer)."""
119
120    def __init__(self, pot_config: dict):
121        """
122        :param pot_config: The POT configuration dictionary
123        """
124        match pot_config:
125            case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}:
126                self.bit_resolution = br
127                self.reference_ohms = r
128                self.wiper_ohms = w
129            case _:
130                raise hil_errors.ConfigurationError("Invalid POT configuration")
131
132    def ohms_to_raw(self, value: float) -> int:
133        """
134        Convert an ohm value to a raw POT value.
135
136        :param value: The ohm value to convert
137        :return: The converted raw POT value
138        """
139        if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms:
140            raise hil_errors.RangeError(f"POT value {value} out of range")
141        steps = self.bit_resolution**2 - 1
142        return int((steps * (value - self.wiper_ohms)) / self.reference_ohms)
143
144
145# Interface configuration -------------------------------------------------------------#
146class Port:
147    """Configuration for a port."""
148
149    def __init__(self, port: dict):
150        """
151        :param port: The port configuration dictionary
152        """
153        match port:
154            case {"name": name, "port": port, "mode": mode}:
155                self.name: str = name
156                self.port: int = port
157                self.mode: str = mode
158            case _:
159                raise hil_errors.ConfigurationError("Invalid Port configuration")
160
161
162class Mux:
163    """Configuration for a MUX (Multiplexer)."""
164
165    def __init__(self, mux: dict):
166        """
167        :param mux: The MUX configuration dictionary
168        """
169        match mux:
170            case {
171                "name": name,
172                "mode": mode,
173                "select_ports": select_ports,
174                "port": port,
175            }:
176                self.name: str = name
177                self.mode: str = mode
178                self.select_ports: list[int] = select_ports
179                self.port: int = port
180            case _:
181                raise hil_errors.ConfigurationError("Invalid Mux configuration")
182
183    def select_from_name(self, name: str) -> Optional["MuxSelect"]:
184        """
185        Attempt to see if self is the base mux that is being referenced.
186        For example: DMUX_6 means DMUX is the base mux and 6 is the select line.
187
188        :param name: The name of the MUX select line (ex: DMUX_6)
189        :return: The MuxSelect instance if found, None otherwise
190        """
191        name_parts = name.rsplit("_", 1)
192        if len(name_parts) < 2:
193            return None
194        if name_parts[0] != self.name:
195            return None
196        try:
197            return MuxSelect(self, int(name_parts[1]))
198        except ValueError:
199            return None
200
201
202class MuxSelect:
203    """Represents a selected MUX (Multiplexer) line."""
204
205    def __init__(self, mux: Mux, select: int):
206        """
207        :param mux: The MUX instance
208        :param select: The selected line number (0 indexed)
209        """
210        self.mux: Mux = mux
211        self.select: int = select
212
213
214class CanBus:
215    """Configuration for a CAN (Controller Area Network) bus."""
216
217    def __init__(self, can_bus: dict):
218        """
219        :param can_bus: The CAN bus configuration dictionary
220        """
221        match can_bus:
222            case {"name": name, "bus": bus, "dbc_file": dbc_file}:
223                self.name: str = name
224                self.bus: int = bus
225                self.dbc_file: str = dbc_file
226            case _:
227                raise hil_errors.ConfigurationError("Invalid CAN Bus configuration")
228
229    def find_dbc(
230        self, can_dbcs: dict[str, cantools_db.Database]
231    ) -> cantools_db.Database:
232        """
233        Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error.
234
235        :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name
236        :return: The CAN database if found
237        """
238        match can_dbcs.get(self.dbc_file, None):
239            case None:
240                error_msg = (
241                    f"DBC file {self.dbc_file} not found for CAN bus {self.name}"
242                )
243                raise hil_errors.ConfigurationError(error_msg)
244            case db:
245                return db
246
247
248# Test device -------------------------------------------------------------------------#
249class TestDevice:
250    # Init ----------------------------------------------------------------------------#
251    def __init__(
252        self,
253        hil_id: int,
254        name: str,
255        ports: dict[str, Port],
256        muxs: dict[str, Mux],
257        can_busses: dict[str, CanBus],
258        adc_config: AdcConfig,
259        dac_config: Optional[DacConfig],
260        pot_config: Optional[PotConfig],
261    ):
262        """
263        :param hil_id: The HIL ID of the device
264        :param name: The name of the device
265        :param ports: The port configurations
266        :param muxs: The MUX configurations
267        :param can_busses: The CAN bus configurations
268        :param adc_config: The ADC configuration
269        :param dac_config: The DAC configuration
270        :param pot_config: The potentiometer configuration
271        """
272        self.hil_id: int = hil_id
273        self._name: str = name
274        self._ports: dict[str, Port] = ports
275        self._muxs: dict[str, Mux] = muxs
276        self._can_busses: dict[str, CanBus] = can_busses
277        self._adc_config: AdcConfig = adc_config
278        self._dac_config: Optional[DacConfig] = dac_config
279        self._pot_config: Optional[PotConfig] = pot_config
280
281        # Please use set_serial() to set the serial!
282        self._ser: Optional[serial_helper.ThreadedSerial] = None
283
284        self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict(
285            map(
286                lambda c: (c.bus, can_helper.CanMessageManager()),
287                self._can_busses.values(),
288            )
289        )
290
291    @classmethod
292    def from_json(cls, hil_id: int, name: str, device_config_path: str):
293        """
294        Create a TestDevice instance from a JSON configuration file.
295
296        :param hil_id: The HIL ID of the device
297        :param name: The name of the device
298        :param device_config_path: The path to the device configuration JSON file
299        """
300        with open(device_config_path, "r") as device_config_path:
301            device_config = json.load(device_config_path)
302
303        ports = dict(
304            map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", []))
305        )
306        muxs = dict(
307            map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", []))
308        )
309        can_busses = dict(
310            map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", []))
311        )
312
313        match device_config:
314            case {"adc_config": adc_config_data}:
315                adc_config = AdcConfig(adc_config_data)
316            case _:
317                error_msg = f"ADC configuration missing for device {name}"
318                raise hil_errors.ConfigurationError(error_msg)
319
320        match device_config:
321            case {"dac_config": dac_config_data}:
322                dac_config = DacConfig(dac_config_data)
323            case _:
324                dac_config = None
325
326        match device_config:
327            case {"pot_config": pot_config_data}:
328                pot_config = PotConfig(pot_config_data)
329            case _:
330                pot_config = None
331
332        return cls(
333            hil_id,
334            name,
335            ports,
336            muxs,
337            can_busses,
338            adc_config,
339            dac_config,
340            pot_config,
341        )
342
343    def set_serial(self, ser: serial_helper.ThreadedSerial) -> None:
344        """
345        Set the serial connection for the TestDevice.
346        The caller is responsible for starting the serial connection's thread.
347
348        :param ser: The serial connection to set
349        """
350        self._ser = ser
351
352    def close(self) -> None:
353        """
354        Close the serial connection for the TestDevice.
355        """
356        match self._ser:
357            case None:
358                error_msg = f"Cannot close TestDevice {self._name}: serial not set"
359                raise hil_errors.EngineError(error_msg)
360            case ser:
361                ser.stop()
362
363    # Command handling ----------------------------------------------------------------#
364    def _select_mux(self, mux_select: MuxSelect) -> None:
365        """
366        Select a MUX (Multiplexer) line.
367
368        :param mux_select: The MUX selection information
369        """
370        for i, p in enumerate(mux_select.mux.select_ports):
371            select_bit = True if (mux_select.select & (1 << i)) else False
372            self._set_do(p, select_bit)
373
374    def _set_do(self, pin: int, value: bool) -> None:
375        """
376        Set a digital output (DO) pin.
377
378        :param pin: The pin number to set
379        :param value: The value to set the pin to (low = False, high = True)
380        """
381        match self._ser:
382            case None:
383                error_msg = f"Cannot set DO on TestDevice {self._name}: serial not set"
384                raise hil_errors.EngineError(error_msg)
385            case ser:
386                commands.write_gpio(ser, pin, value)
387
388    def _hiZ_do(self, pin: int) -> None:
389        """
390        Set a digital output (DO) pin to high impedance (HiZ).
391
392        :param pin: The pin number to set
393        """
394        match self._ser:
395            case None:
396                error_msg = (
397                    f"Cannot set HiZ DO on TestDevice {self._name}: serial not set"
398                )
399                raise hil_errors.EngineError(error_msg)
400            case ser:
401                commands.hiZ_gpio(ser, pin)
402
403    def _get_di(self, pin: int) -> bool:
404        """
405        Get the digital input (DI) state of a pin.
406
407        :param pin: The pin number to read
408        :return: The state of the pin (True for high, False for low)
409        """
410        match self._ser:
411            case None:
412                error_msg = f"Cannot get DI on TestDevice {self._name}: serial not set"
413                raise hil_errors.EngineError(error_msg)
414            case ser:
415                return commands.read_gpio(ser, pin)
416
417    def _set_ao(self, pin: int, value: float) -> None:
418        """
419        Set an analog output (AO) pin after converting the volts value to raw.
420
421        :param pin: The pin/offset number to set
422        :param value: The voltage value to set the pin to
423        """
424        match (self._ser, self._dac_config):
425            case (ser, dac_config) if ser is not None and dac_config is not None:
426                raw_value = dac_config.v_to_raw(value)
427                commands.write_dac(ser, pin, raw_value)
428            case _:
429                error_msg = f"Cannot set AO on TestDevice {self._name}: serial or DAC config not set"
430                raise hil_errors.EngineError(error_msg)
431
432    def _hiZ_ao(self, pin: int) -> None:
433        """
434        Set an analog output (AO) pin to high impedance (HiZ).
435
436        :param pin: The pin/offset number to set
437        """
438        match self._ser:
439            case None:
440                error_msg = (
441                    f"Cannot set HiZ AO on TestDevice {self._name}: serial not set"
442                )
443                raise hil_errors.EngineError(error_msg)
444            case ser:
445                commands.hiZ_dac(ser, pin)
446
447    def _get_ai(self, pin: int, mode: str) -> float:
448        """
449        Get an analog input (AI) reading from a pin and convert the reading to volts.
450
451        :param pin: The pin number to read
452        :param mode: The mode to use for the reading (AI5, AI24, or AI)
453        :return: The voltage value read from the pin
454        """
455        match self._ser:
456            case None:
457                error_msg = f"Cannot get AI on TestDevice {self._name}: serial not set"
458                raise hil_errors.EngineError(error_msg)
459            case ser:
460                raw_value = commands.read_adc(ser, pin)
461
462        if mode == "AI5":
463            return self._adc_config.raw_to_5v(raw_value)
464        elif mode == "AI24":
465            return self._adc_config.raw_to_24v(raw_value)
466        elif mode == "AI":
467            return self._adc_config.raw_to_v(raw_value)
468        else:
469            raise ValueError(f"Unsupported AI mode: {mode}")
470
471    def _set_pot(self, pin: int, value: float) -> None:
472        """
473        Set a potentiometer (POT) pin after converting the ohms value to raw.
474
475        :param pin: The pin/offset to set
476        :param value: The resistance value to set the pin to (in ohms)
477        """
478        match (self._ser, self._pot_config):
479            case (ser, pot_config) if ser is not None and pot_config is not None:
480                raw_value = pot_config.ohms_to_raw(value)
481                commands.write_pot(ser, pin, raw_value)
482            case _:
483                error_msg = f"Cannot set POT on TestDevice {self._name}: serial not set"
484                raise hil_errors.EngineError(error_msg)
485
486    def _update_can_messages(self, bus: int, can_dbc: cantools_db.Database) -> None:
487        """
488        Update the CAN message store by decoding the saved parsed can messages from the Serial.
489
490        :param bus: The CAN bus to update
491        :param can_dbc: The CAN database to use for decoding
492        """
493        match self._ser:
494            case None:
495                error_msg = (
496                    f"Cannot update CAN messages on TestDevice {self._name}: "
497                    "serial not set"
498                )
499                raise hil_errors.EngineError(error_msg)
500            case ser:
501                self.device_can_busses[bus].add_multiple(
502                    commands.parse_can_messages(ser, bus, can_dbc)
503                )
504
505    def _send_can(
506        self, bus: int, signal: str | int, data: dict, can_dbc: cantools_db.Database
507    ) -> None:
508        """
509        Send a CAN message on the specified bus.
510
511        :param bus: The CAN bus to send the message on
512        :param signal: The CAN signal to send
513        :param data: The data to include in the CAN message
514        :param can_dbc: The CAN database to use for encoding
515        """
516        raw_data = list(can_dbc.encode_message(signal, data))
517        if isinstance(signal, int):
518            msg_id = can_dbc.get_message_by_frame_id(signal).frame_id
519        else:
520            msg_id = can_dbc.get_message_by_name(signal).frame_id
521
522        match self._ser:
523            case None:
524                error_msg = (
525                    f"Cannot send CAN message on TestDevice {self._name}: "
526                    "serial not set"
527                )
528                raise hil_errors.EngineError(error_msg)
529            case ser:
530                commands.send_can(ser, bus, msg_id, raw_data)
531
532    # Action --------------------------------------------------------------------------#
533    def do_action(self, action_type: action.ActionType, port: str) -> Any:
534        """
535        Perform a HIL action on a specific port.
536
537        :param action_type: The type of action to perform (+ includes all needed info)
538        :param port: The HIL port to perform the action on
539        :return: depends on the action type
540        """
541
542        maybe_port = self._ports.get(port, None)
543        maybe_mux_select = next(
544            (
545                val
546                for m in self._muxs.values()
547                if (val := m.select_from_name(port)) is not None
548            ),
549            None,
550        )
551        maybe_can_bus = self._can_busses.get(port, None)
552
553        match (action_type, maybe_port, maybe_mux_select, maybe_can_bus):
554            # Set DO + direct port
555            case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO":
556                self._set_do(mp.port, value)
557            # Set DO + mux select
558            case (action.SetDo(value), _, mms, _) if (
559                mms is not None and mms.mux.mode == "DO"
560            ):
561                self._select_mux(mms)
562                self._set_do(mms.mux.port, value)
563            # HiZ DO + direct port
564            case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO":
565                self._hiZ_do(mp.port)
566            # HiZ DO + mux select
567            case (action.HiZDo(), _, mms, _) if (
568                mms is not None and mms.mux.mode == "DO"
569            ):
570                self._select_mux(mms)
571                self._hiZ_do(mms.mux.port)
572            # Get DI + direct port
573            case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI":
574                return self._get_di(mp.port)
575            # Get DI + mux select
576            case (action.GetDi(), _, mms, _) if (
577                mms is not None and mms.mux.mode == "DI"
578            ):
579                self._select_mux(mms)
580                return self._get_di(mms.mux.port)
581            # Set AO + direct port
582            case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO":
583                self._set_ao(mp.port, value)
584            # HiZ AO + direct port
585            case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO":
586                self._hiZ_ao(mp.port)
587            # Get AI + direct port
588            case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith(
589                "AI"
590            ):
591                return self._get_ai(mp.port, mp.mode)
592            # Get AI + mux select
593            case (
594                action.GetAi(),
595                _,
596                mms,
597                _,
598            ) if mms is not None and mms.mux.mode.startswith("AI"):
599                self._select_mux(mms)
600                return self._get_ai(mms.mux.port, mms.mux.mode)
601            # Set Pot + direct port
602            case (action.SetPot(value), mp, _, _) if (
603                mp is not None and mp.mode == "POT"
604            ):
605                self._set_pot(mp.port, value)
606            # Send CAN msg + can bus name
607            case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None:
608                can_dbc = mcb.find_dbc(can_dbcs)
609                self._update_can_messages(mcb.bus, can_dbc)
610                self._send_can(mcb.bus, signal, data, can_dbc)
611            # Get last CAN msg + can bus name
612            case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
613                can_dbc = mcb.find_dbc(can_dbcs)
614                self._update_can_messages(mcb.bus, can_dbc)
615                return self.device_can_busses[mcb.bus].get_last(signal)
616            # Get all CAN msgs + can bus name
617            case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
618                can_dbc = mcb.find_dbc(can_dbcs)
619                self._update_can_messages(mcb.bus, can_dbc)
620                return self.device_can_busses[mcb.bus].get_all(signal)
621            # Clear CAN msgs + can bus name
622            case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
623                can_dbc = mcb.find_dbc(can_dbcs)
624                self._update_can_messages(mcb.bus, can_dbc)
625                self.device_can_busses[mcb.bus].clear(signal)
626            # Unsupported action
627            case _:
628                error_msg = (
629                    f"Action {type(action)} not supported for "
630                    f"port {port} on device {self._name}"
631                )
632                raise hil_errors.EngineError(error_msg)
633
634
635# Test device manager -----------------------------------------------------------------#
636class TestDeviceManager:
637    """
638    Manages test devices for HIL (Hardware-in-the-Loop) simulation.
639    """
640
641    def __init__(self, test_devices: dict[str, TestDevice]):
642        """
643        :param test_devices: A dictionary of test devices managed by this manager.
644                             key = device name, value = TestDevice instance
645        """
646        self._test_devices: dict[str, TestDevice] = test_devices
647
648    @classmethod
649    def from_json(
650        cls, test_config_path: str, device_config_fpath: str
651    ) -> "TestDeviceManager":
652        """
653        Create a TestDeviceManager instance from JSON configuration files.
654        Is responsible for starting all of the ThreadedSerial instances.
655
656        :param test_config_path: The path to the test configuration JSON file.
657        :param device_config_fpath: The file path to the directory containing device
658                                    configuration files.
659        :return: A TestDeviceManager instance.
660        """
661
662        with open(test_config_path, "r") as test_config_file:
663            test_config = json.load(test_config_file)
664
665        hil_ids = []
666        stop_events = {}
667        test_devices = {}
668        match test_config:
669            case {"hil_devices": hil_devices}:
670                for device in hil_devices:
671                    match device:
672                        case {
673                            "id": hil_id,
674                            "name": name,
675                            "config": config_file_name,
676                        } if (not hil_id in hil_ids):
677                            hil_ids.append(hil_id)
678                            stop_events[hil_id] = threading.Event()
679                            test_devices[name] = TestDevice.from_json(
680                                hil_id,
681                                name,
682                                os.path.join(device_config_fpath, config_file_name),
683                            )
684                        case {"id": hil_id}:
685                            error_msg = f"Duplicate HIL device ID found: {hil_id}"
686                            raise hil_errors.ConfigurationError(error_msg)
687                        case _:
688                            error_msg = f"Invalid HIL device configuration: {device}"
689                            raise hil_errors.ConfigurationError(error_msg)
690            case _:
691                error_msg = "Invalid test configuration: missing 'hil_devices' key"
692                raise hil_errors.ConfigurationError(error_msg)
693
694        hil_devices = serial_helper.discover_devices(hil_ids)
695
696        sers = {
697            hil_id: serial_helper.ThreadedSerial(
698                hil_devices[hil_id][0],
699                hil_devices[hil_id][1],
700                stop_events[hil_id],
701            )
702            for hil_id in hil_ids
703        }
704        for test_device in test_devices.values():
705            ser = sers[test_device.hil_id]
706            t = threading.Thread(target=ser.run)
707            t.start()
708            test_device.set_serial(ser)
709
710        return cls(test_devices)
711
712    def maybe_hil_con_from_net(
713        self, board: str, net: str
714    ) -> Optional[dut_cons.HilDutCon]:
715        """
716        Check to see if a board is a HIL device.
717
718        :param board: The name of the board to check.
719        :param net: The network to use for the connection.
720        :return: A HilDutCon instance if the board is a HIL device, None otherwise.
721        """
722        if board in self._test_devices:
723            return dut_cons.HilDutCon(board, net)
724        else:
725            return None
726
727    def do_action(
728        self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon
729    ) -> Any:
730        """
731        Perform an action on a HIL device.
732
733        :param action_type: The type of action to perform.
734        :param hil_dut_con: The HIL DUT connection information.
735        :return: The result of the action (if any).
736        """
737        if hil_dut_con.device in self._test_devices:
738            return self._test_devices[hil_dut_con.device].do_action(
739                action_type, hil_dut_con.port
740            )
741        else:
742            error_msg = f"Device {hil_dut_con.device} not found"
743            raise hil_errors.ConnectionError(error_msg)
744
745    def close(self) -> None:
746        """
747        Close all HIL devices.
748        """
749        for device in self._test_devices.values():
750            device.close()
class AdcConfig:
19class AdcConfig:
20    """Configuration for an ADC (Analog-to-Digital Converter)."""
21
22    def __init__(self, adc_config: dict):
23        """
24        :param adc_config: The ADC configuration dictionary
25        """
26        match adc_config:
27            # PCB
28            case {
29                "bit_resolution": br,
30                "adc_reference_v": ar,
31                "5v_reference_v": v5r,
32                "24v_reference_v": v24r,
33            }:
34                self.bit_resolution: int = br
35                self.adc_reference_v: float = ar
36                self.five_v_reference_v: Optional[float] = v5r
37                self.twenty_four_v_reference_v: Optional[float] = v24r
38            # Breadboard
39            case {
40                "bit_resolution": br,
41                "adc_reference_v": ar,
42            }:
43                self.bit_resolution: int = br
44                self.adc_reference_v: float = ar
45                self.five_v_reference_v: Optional[float] = None
46                self.twenty_four_v_reference_v: Optional[float] = None
47            case _:
48                raise hil_errors.ConfigurationError("Invalid ADC configuration")
49
50    def raw_to_v(self, raw_value: int) -> float:
51        """
52        Convert a raw ADC value to a voltage.
53
54        :param raw_value: The raw ADC value to convert
55        :return: The converted voltage value
56        """
57        if raw_value < 0 or raw_value > (2**self.bit_resolution - 1):
58            raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range")
59        return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v
60
61    def raw_to_5v(self, raw_value: int) -> float:
62        """
63        Convert a raw ADC value to a voltage using the 5V reference.
64        If on a PCB, AI5 means it when through a voltage divider.
65
66        :param raw_value: The raw ADC value to convert
67        :return: The converted voltage value
68        """
69        match self.five_v_reference_v:
70            case None:
71                error_msg = "5V reference voltage not configured"
72                raise hil_errors.ConfigurationError(error_msg)
73            case v5r:
74                return (self.raw_to_v(raw_value) / v5r) * 5.0
75
76    def raw_to_24v(self, raw_value: int) -> float:
77        """
78        Convert a raw ADC value to a voltage using the 24V reference.
79        If on a PCB, AI24 means it when through a voltage divider.
80
81        :param raw_value: The raw ADC value to convert
82        :return: The converted voltage value
83        """
84        match self.twenty_four_v_reference_v:
85            case None:
86                error_msg = "24V reference voltage not configured"
87                raise hil_errors.ConfigurationError(error_msg)
88            case v24r:
89                return (self.raw_to_v(raw_value) / v24r) * 24.0

Configuration for an ADC (Analog-to-Digital Converter).

AdcConfig(adc_config: dict)
22    def __init__(self, adc_config: dict):
23        """
24        :param adc_config: The ADC configuration dictionary
25        """
26        match adc_config:
27            # PCB
28            case {
29                "bit_resolution": br,
30                "adc_reference_v": ar,
31                "5v_reference_v": v5r,
32                "24v_reference_v": v24r,
33            }:
34                self.bit_resolution: int = br
35                self.adc_reference_v: float = ar
36                self.five_v_reference_v: Optional[float] = v5r
37                self.twenty_four_v_reference_v: Optional[float] = v24r
38            # Breadboard
39            case {
40                "bit_resolution": br,
41                "adc_reference_v": ar,
42            }:
43                self.bit_resolution: int = br
44                self.adc_reference_v: float = ar
45                self.five_v_reference_v: Optional[float] = None
46                self.twenty_four_v_reference_v: Optional[float] = None
47            case _:
48                raise hil_errors.ConfigurationError("Invalid ADC configuration")
Parameters
  • adc_config: The ADC configuration dictionary
def raw_to_v(self, raw_value: int) -> float:
50    def raw_to_v(self, raw_value: int) -> float:
51        """
52        Convert a raw ADC value to a voltage.
53
54        :param raw_value: The raw ADC value to convert
55        :return: The converted voltage value
56        """
57        if raw_value < 0 or raw_value > (2**self.bit_resolution - 1):
58            raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range")
59        return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v

Convert a raw ADC value to a voltage.

Parameters
  • raw_value: The raw ADC value to convert
Returns

The converted voltage value

def raw_to_5v(self, raw_value: int) -> float:
61    def raw_to_5v(self, raw_value: int) -> float:
62        """
63        Convert a raw ADC value to a voltage using the 5V reference.
64        If on a PCB, AI5 means it when through a voltage divider.
65
66        :param raw_value: The raw ADC value to convert
67        :return: The converted voltage value
68        """
69        match self.five_v_reference_v:
70            case None:
71                error_msg = "5V reference voltage not configured"
72                raise hil_errors.ConfigurationError(error_msg)
73            case v5r:
74                return (self.raw_to_v(raw_value) / v5r) * 5.0

Convert a raw ADC value to a voltage using the 5V reference. If on a PCB, AI5 means it when through a voltage divider.

Parameters
  • raw_value: The raw ADC value to convert
Returns

The converted voltage value

def raw_to_24v(self, raw_value: int) -> float:
76    def raw_to_24v(self, raw_value: int) -> float:
77        """
78        Convert a raw ADC value to a voltage using the 24V reference.
79        If on a PCB, AI24 means it when through a voltage divider.
80
81        :param raw_value: The raw ADC value to convert
82        :return: The converted voltage value
83        """
84        match self.twenty_four_v_reference_v:
85            case None:
86                error_msg = "24V reference voltage not configured"
87                raise hil_errors.ConfigurationError(error_msg)
88            case v24r:
89                return (self.raw_to_v(raw_value) / v24r) * 24.0

Convert a raw ADC value to a voltage using the 24V reference. If on a PCB, AI24 means it when through a voltage divider.

Parameters
  • raw_value: The raw ADC value to convert
Returns

The converted voltage value

class DacConfig:
 92class DacConfig:
 93    """Configuration for a DAC (Digital-to-Analog Converter)."""
 94
 95    def __init__(self, dac_config: dict):
 96        """
 97        :param dac_config: The DAC configuration dictionary
 98        """
 99        match dac_config:
100            case {"bit_resolution": br, "reference_v": rv}:
101                self.bit_resolution = br
102                self.reference_v = rv
103            case _:
104                raise hil_errors.ConfigurationError("Invalid DAC configuration")
105
106    def v_to_raw(self, value: float) -> int:
107        """
108        Convert a voltage value to a raw DAC value.
109
110        :param value: The voltage value to convert
111        :return: The converted raw DAC value
112        """
113        if value < 0 or value > self.reference_v:
114            raise hil_errors.RangeError(f"DAC value {value} out of range")
115        return int((value / self.reference_v) * (2**self.bit_resolution - 1))

Configuration for a DAC (Digital-to-Analog Converter).

DacConfig(dac_config: dict)
 95    def __init__(self, dac_config: dict):
 96        """
 97        :param dac_config: The DAC configuration dictionary
 98        """
 99        match dac_config:
100            case {"bit_resolution": br, "reference_v": rv}:
101                self.bit_resolution = br
102                self.reference_v = rv
103            case _:
104                raise hil_errors.ConfigurationError("Invalid DAC configuration")
Parameters
  • dac_config: The DAC configuration dictionary
def v_to_raw(self, value: float) -> int:
106    def v_to_raw(self, value: float) -> int:
107        """
108        Convert a voltage value to a raw DAC value.
109
110        :param value: The voltage value to convert
111        :return: The converted raw DAC value
112        """
113        if value < 0 or value > self.reference_v:
114            raise hil_errors.RangeError(f"DAC value {value} out of range")
115        return int((value / self.reference_v) * (2**self.bit_resolution - 1))

Convert a voltage value to a raw DAC value.

Parameters
  • value: The voltage value to convert
Returns

The converted raw DAC value

class PotConfig:
118class PotConfig:
119    """Configuration for a POT (Potentiometer)."""
120
121    def __init__(self, pot_config: dict):
122        """
123        :param pot_config: The POT configuration dictionary
124        """
125        match pot_config:
126            case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}:
127                self.bit_resolution = br
128                self.reference_ohms = r
129                self.wiper_ohms = w
130            case _:
131                raise hil_errors.ConfigurationError("Invalid POT configuration")
132
133    def ohms_to_raw(self, value: float) -> int:
134        """
135        Convert an ohm value to a raw POT value.
136
137        :param value: The ohm value to convert
138        :return: The converted raw POT value
139        """
140        if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms:
141            raise hil_errors.RangeError(f"POT value {value} out of range")
142        steps = self.bit_resolution**2 - 1
143        return int((steps * (value - self.wiper_ohms)) / self.reference_ohms)

Configuration for a POT (Potentiometer).

PotConfig(pot_config: dict)
121    def __init__(self, pot_config: dict):
122        """
123        :param pot_config: The POT configuration dictionary
124        """
125        match pot_config:
126            case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}:
127                self.bit_resolution = br
128                self.reference_ohms = r
129                self.wiper_ohms = w
130            case _:
131                raise hil_errors.ConfigurationError("Invalid POT configuration")
Parameters
  • pot_config: The POT configuration dictionary
def ohms_to_raw(self, value: float) -> int:
133    def ohms_to_raw(self, value: float) -> int:
134        """
135        Convert an ohm value to a raw POT value.
136
137        :param value: The ohm value to convert
138        :return: The converted raw POT value
139        """
140        if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms:
141            raise hil_errors.RangeError(f"POT value {value} out of range")
142        steps = self.bit_resolution**2 - 1
143        return int((steps * (value - self.wiper_ohms)) / self.reference_ohms)

Convert an ohm value to a raw POT value.

Parameters
  • value: The ohm value to convert
Returns

The converted raw POT value

class Port:
147class Port:
148    """Configuration for a port."""
149
150    def __init__(self, port: dict):
151        """
152        :param port: The port configuration dictionary
153        """
154        match port:
155            case {"name": name, "port": port, "mode": mode}:
156                self.name: str = name
157                self.port: int = port
158                self.mode: str = mode
159            case _:
160                raise hil_errors.ConfigurationError("Invalid Port configuration")

Configuration for a port.

Port(port: dict)
150    def __init__(self, port: dict):
151        """
152        :param port: The port configuration dictionary
153        """
154        match port:
155            case {"name": name, "port": port, "mode": mode}:
156                self.name: str = name
157                self.port: int = port
158                self.mode: str = mode
159            case _:
160                raise hil_errors.ConfigurationError("Invalid Port configuration")
Parameters
  • port: The port configuration dictionary
class Mux:
163class Mux:
164    """Configuration for a MUX (Multiplexer)."""
165
166    def __init__(self, mux: dict):
167        """
168        :param mux: The MUX configuration dictionary
169        """
170        match mux:
171            case {
172                "name": name,
173                "mode": mode,
174                "select_ports": select_ports,
175                "port": port,
176            }:
177                self.name: str = name
178                self.mode: str = mode
179                self.select_ports: list[int] = select_ports
180                self.port: int = port
181            case _:
182                raise hil_errors.ConfigurationError("Invalid Mux configuration")
183
184    def select_from_name(self, name: str) -> Optional["MuxSelect"]:
185        """
186        Attempt to see if self is the base mux that is being referenced.
187        For example: DMUX_6 means DMUX is the base mux and 6 is the select line.
188
189        :param name: The name of the MUX select line (ex: DMUX_6)
190        :return: The MuxSelect instance if found, None otherwise
191        """
192        name_parts = name.rsplit("_", 1)
193        if len(name_parts) < 2:
194            return None
195        if name_parts[0] != self.name:
196            return None
197        try:
198            return MuxSelect(self, int(name_parts[1]))
199        except ValueError:
200            return None

Configuration for a MUX (Multiplexer).

Mux(mux: dict)
166    def __init__(self, mux: dict):
167        """
168        :param mux: The MUX configuration dictionary
169        """
170        match mux:
171            case {
172                "name": name,
173                "mode": mode,
174                "select_ports": select_ports,
175                "port": port,
176            }:
177                self.name: str = name
178                self.mode: str = mode
179                self.select_ports: list[int] = select_ports
180                self.port: int = port
181            case _:
182                raise hil_errors.ConfigurationError("Invalid Mux configuration")
Parameters
  • mux: The MUX configuration dictionary
def select_from_name(self, name: str) -> Optional[MuxSelect]:
184    def select_from_name(self, name: str) -> Optional["MuxSelect"]:
185        """
186        Attempt to see if self is the base mux that is being referenced.
187        For example: DMUX_6 means DMUX is the base mux and 6 is the select line.
188
189        :param name: The name of the MUX select line (ex: DMUX_6)
190        :return: The MuxSelect instance if found, None otherwise
191        """
192        name_parts = name.rsplit("_", 1)
193        if len(name_parts) < 2:
194            return None
195        if name_parts[0] != self.name:
196            return None
197        try:
198            return MuxSelect(self, int(name_parts[1]))
199        except ValueError:
200            return None

Attempt to see if self is the base mux that is being referenced. For example: DMUX_6 means DMUX is the base mux and 6 is the select line.

Parameters
  • name: The name of the MUX select line (ex: DMUX_6)
Returns

The MuxSelect instance if found, None otherwise

class MuxSelect:
203class MuxSelect:
204    """Represents a selected MUX (Multiplexer) line."""
205
206    def __init__(self, mux: Mux, select: int):
207        """
208        :param mux: The MUX instance
209        :param select: The selected line number (0 indexed)
210        """
211        self.mux: Mux = mux
212        self.select: int = select

Represents a selected MUX (Multiplexer) line.

MuxSelect(mux: Mux, select: int)
206    def __init__(self, mux: Mux, select: int):
207        """
208        :param mux: The MUX instance
209        :param select: The selected line number (0 indexed)
210        """
211        self.mux: Mux = mux
212        self.select: int = select
Parameters
  • mux: The MUX instance
  • select: The selected line number (0 indexed)
mux: Mux
select: int
class CanBus:
215class CanBus:
216    """Configuration for a CAN (Controller Area Network) bus."""
217
218    def __init__(self, can_bus: dict):
219        """
220        :param can_bus: The CAN bus configuration dictionary
221        """
222        match can_bus:
223            case {"name": name, "bus": bus, "dbc_file": dbc_file}:
224                self.name: str = name
225                self.bus: int = bus
226                self.dbc_file: str = dbc_file
227            case _:
228                raise hil_errors.ConfigurationError("Invalid CAN Bus configuration")
229
230    def find_dbc(
231        self, can_dbcs: dict[str, cantools_db.Database]
232    ) -> cantools_db.Database:
233        """
234        Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error.
235
236        :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name
237        :return: The CAN database if found
238        """
239        match can_dbcs.get(self.dbc_file, None):
240            case None:
241                error_msg = (
242                    f"DBC file {self.dbc_file} not found for CAN bus {self.name}"
243                )
244                raise hil_errors.ConfigurationError(error_msg)
245            case db:
246                return db

Configuration for a CAN (Controller Area Network) bus.

CanBus(can_bus: dict)
218    def __init__(self, can_bus: dict):
219        """
220        :param can_bus: The CAN bus configuration dictionary
221        """
222        match can_bus:
223            case {"name": name, "bus": bus, "dbc_file": dbc_file}:
224                self.name: str = name
225                self.bus: int = bus
226                self.dbc_file: str = dbc_file
227            case _:
228                raise hil_errors.ConfigurationError("Invalid CAN Bus configuration")
Parameters
  • can_bus: The CAN bus configuration dictionary
def find_dbc( self, can_dbcs: dict[str, cantools.database.can.database.Database]) -> cantools.database.can.database.Database:
230    def find_dbc(
231        self, can_dbcs: dict[str, cantools_db.Database]
232    ) -> cantools_db.Database:
233        """
234        Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error.
235
236        :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name
237        :return: The CAN database if found
238        """
239        match can_dbcs.get(self.dbc_file, None):
240            case None:
241                error_msg = (
242                    f"DBC file {self.dbc_file} not found for CAN bus {self.name}"
243                )
244                raise hil_errors.ConfigurationError(error_msg)
245            case db:
246                return db

Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error.

Parameters
  • can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name
Returns

The CAN database if found

class TestDevice:
250class TestDevice:
251    # Init ----------------------------------------------------------------------------#
252    def __init__(
253        self,
254        hil_id: int,
255        name: str,
256        ports: dict[str, Port],
257        muxs: dict[str, Mux],
258        can_busses: dict[str, CanBus],
259        adc_config: AdcConfig,
260        dac_config: Optional[DacConfig],
261        pot_config: Optional[PotConfig],
262    ):
263        """
264        :param hil_id: The HIL ID of the device
265        :param name: The name of the device
266        :param ports: The port configurations
267        :param muxs: The MUX configurations
268        :param can_busses: The CAN bus configurations
269        :param adc_config: The ADC configuration
270        :param dac_config: The DAC configuration
271        :param pot_config: The potentiometer configuration
272        """
273        self.hil_id: int = hil_id
274        self._name: str = name
275        self._ports: dict[str, Port] = ports
276        self._muxs: dict[str, Mux] = muxs
277        self._can_busses: dict[str, CanBus] = can_busses
278        self._adc_config: AdcConfig = adc_config
279        self._dac_config: Optional[DacConfig] = dac_config
280        self._pot_config: Optional[PotConfig] = pot_config
281
282        # Please use set_serial() to set the serial!
283        self._ser: Optional[serial_helper.ThreadedSerial] = None
284
285        self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict(
286            map(
287                lambda c: (c.bus, can_helper.CanMessageManager()),
288                self._can_busses.values(),
289            )
290        )
291
292    @classmethod
293    def from_json(cls, hil_id: int, name: str, device_config_path: str):
294        """
295        Create a TestDevice instance from a JSON configuration file.
296
297        :param hil_id: The HIL ID of the device
298        :param name: The name of the device
299        :param device_config_path: The path to the device configuration JSON file
300        """
301        with open(device_config_path, "r") as device_config_path:
302            device_config = json.load(device_config_path)
303
304        ports = dict(
305            map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", []))
306        )
307        muxs = dict(
308            map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", []))
309        )
310        can_busses = dict(
311            map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", []))
312        )
313
314        match device_config:
315            case {"adc_config": adc_config_data}:
316                adc_config = AdcConfig(adc_config_data)
317            case _:
318                error_msg = f"ADC configuration missing for device {name}"
319                raise hil_errors.ConfigurationError(error_msg)
320
321        match device_config:
322            case {"dac_config": dac_config_data}:
323                dac_config = DacConfig(dac_config_data)
324            case _:
325                dac_config = None
326
327        match device_config:
328            case {"pot_config": pot_config_data}:
329                pot_config = PotConfig(pot_config_data)
330            case _:
331                pot_config = None
332
333        return cls(
334            hil_id,
335            name,
336            ports,
337            muxs,
338            can_busses,
339            adc_config,
340            dac_config,
341            pot_config,
342        )
343
344    def set_serial(self, ser: serial_helper.ThreadedSerial) -> None:
345        """
346        Set the serial connection for the TestDevice.
347        The caller is responsible for starting the serial connection's thread.
348
349        :param ser: The serial connection to set
350        """
351        self._ser = ser
352
353    def close(self) -> None:
354        """
355        Close the serial connection for the TestDevice.
356        """
357        match self._ser:
358            case None:
359                error_msg = f"Cannot close TestDevice {self._name}: serial not set"
360                raise hil_errors.EngineError(error_msg)
361            case ser:
362                ser.stop()
363
364    # Command handling ----------------------------------------------------------------#
365    def _select_mux(self, mux_select: MuxSelect) -> None:
366        """
367        Select a MUX (Multiplexer) line.
368
369        :param mux_select: The MUX selection information
370        """
371        for i, p in enumerate(mux_select.mux.select_ports):
372            select_bit = True if (mux_select.select & (1 << i)) else False
373            self._set_do(p, select_bit)
374
375    def _set_do(self, pin: int, value: bool) -> None:
376        """
377        Set a digital output (DO) pin.
378
379        :param pin: The pin number to set
380        :param value: The value to set the pin to (low = False, high = True)
381        """
382        match self._ser:
383            case None:
384                error_msg = f"Cannot set DO on TestDevice {self._name}: serial not set"
385                raise hil_errors.EngineError(error_msg)
386            case ser:
387                commands.write_gpio(ser, pin, value)
388
389    def _hiZ_do(self, pin: int) -> None:
390        """
391        Set a digital output (DO) pin to high impedance (HiZ).
392
393        :param pin: The pin number to set
394        """
395        match self._ser:
396            case None:
397                error_msg = (
398                    f"Cannot set HiZ DO on TestDevice {self._name}: serial not set"
399                )
400                raise hil_errors.EngineError(error_msg)
401            case ser:
402                commands.hiZ_gpio(ser, pin)
403
404    def _get_di(self, pin: int) -> bool:
405        """
406        Get the digital input (DI) state of a pin.
407
408        :param pin: The pin number to read
409        :return: The state of the pin (True for high, False for low)
410        """
411        match self._ser:
412            case None:
413                error_msg = f"Cannot get DI on TestDevice {self._name}: serial not set"
414                raise hil_errors.EngineError(error_msg)
415            case ser:
416                return commands.read_gpio(ser, pin)
417
418    def _set_ao(self, pin: int, value: float) -> None:
419        """
420        Set an analog output (AO) pin after converting the volts value to raw.
421
422        :param pin: The pin/offset number to set
423        :param value: The voltage value to set the pin to
424        """
425        match (self._ser, self._dac_config):
426            case (ser, dac_config) if ser is not None and dac_config is not None:
427                raw_value = dac_config.v_to_raw(value)
428                commands.write_dac(ser, pin, raw_value)
429            case _:
430                error_msg = f"Cannot set AO on TestDevice {self._name}: serial or DAC config not set"
431                raise hil_errors.EngineError(error_msg)
432
433    def _hiZ_ao(self, pin: int) -> None:
434        """
435        Set an analog output (AO) pin to high impedance (HiZ).
436
437        :param pin: The pin/offset number to set
438        """
439        match self._ser:
440            case None:
441                error_msg = (
442                    f"Cannot set HiZ AO on TestDevice {self._name}: serial not set"
443                )
444                raise hil_errors.EngineError(error_msg)
445            case ser:
446                commands.hiZ_dac(ser, pin)
447
448    def _get_ai(self, pin: int, mode: str) -> float:
449        """
450        Get an analog input (AI) reading from a pin and convert the reading to volts.
451
452        :param pin: The pin number to read
453        :param mode: The mode to use for the reading (AI5, AI24, or AI)
454        :return: The voltage value read from the pin
455        """
456        match self._ser:
457            case None:
458                error_msg = f"Cannot get AI on TestDevice {self._name}: serial not set"
459                raise hil_errors.EngineError(error_msg)
460            case ser:
461                raw_value = commands.read_adc(ser, pin)
462
463        if mode == "AI5":
464            return self._adc_config.raw_to_5v(raw_value)
465        elif mode == "AI24":
466            return self._adc_config.raw_to_24v(raw_value)
467        elif mode == "AI":
468            return self._adc_config.raw_to_v(raw_value)
469        else:
470            raise ValueError(f"Unsupported AI mode: {mode}")
471
472    def _set_pot(self, pin: int, value: float) -> None:
473        """
474        Set a potentiometer (POT) pin after converting the ohms value to raw.
475
476        :param pin: The pin/offset to set
477        :param value: The resistance value to set the pin to (in ohms)
478        """
479        match (self._ser, self._pot_config):
480            case (ser, pot_config) if ser is not None and pot_config is not None:
481                raw_value = pot_config.ohms_to_raw(value)
482                commands.write_pot(ser, pin, raw_value)
483            case _:
484                error_msg = f"Cannot set POT on TestDevice {self._name}: serial not set"
485                raise hil_errors.EngineError(error_msg)
486
487    def _update_can_messages(self, bus: int, can_dbc: cantools_db.Database) -> None:
488        """
489        Update the CAN message store by decoding the saved parsed can messages from the Serial.
490
491        :param bus: The CAN bus to update
492        :param can_dbc: The CAN database to use for decoding
493        """
494        match self._ser:
495            case None:
496                error_msg = (
497                    f"Cannot update CAN messages on TestDevice {self._name}: "
498                    "serial not set"
499                )
500                raise hil_errors.EngineError(error_msg)
501            case ser:
502                self.device_can_busses[bus].add_multiple(
503                    commands.parse_can_messages(ser, bus, can_dbc)
504                )
505
506    def _send_can(
507        self, bus: int, signal: str | int, data: dict, can_dbc: cantools_db.Database
508    ) -> None:
509        """
510        Send a CAN message on the specified bus.
511
512        :param bus: The CAN bus to send the message on
513        :param signal: The CAN signal to send
514        :param data: The data to include in the CAN message
515        :param can_dbc: The CAN database to use for encoding
516        """
517        raw_data = list(can_dbc.encode_message(signal, data))
518        if isinstance(signal, int):
519            msg_id = can_dbc.get_message_by_frame_id(signal).frame_id
520        else:
521            msg_id = can_dbc.get_message_by_name(signal).frame_id
522
523        match self._ser:
524            case None:
525                error_msg = (
526                    f"Cannot send CAN message on TestDevice {self._name}: "
527                    "serial not set"
528                )
529                raise hil_errors.EngineError(error_msg)
530            case ser:
531                commands.send_can(ser, bus, msg_id, raw_data)
532
533    # Action --------------------------------------------------------------------------#
534    def do_action(self, action_type: action.ActionType, port: str) -> Any:
535        """
536        Perform a HIL action on a specific port.
537
538        :param action_type: The type of action to perform (+ includes all needed info)
539        :param port: The HIL port to perform the action on
540        :return: depends on the action type
541        """
542
543        maybe_port = self._ports.get(port, None)
544        maybe_mux_select = next(
545            (
546                val
547                for m in self._muxs.values()
548                if (val := m.select_from_name(port)) is not None
549            ),
550            None,
551        )
552        maybe_can_bus = self._can_busses.get(port, None)
553
554        match (action_type, maybe_port, maybe_mux_select, maybe_can_bus):
555            # Set DO + direct port
556            case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO":
557                self._set_do(mp.port, value)
558            # Set DO + mux select
559            case (action.SetDo(value), _, mms, _) if (
560                mms is not None and mms.mux.mode == "DO"
561            ):
562                self._select_mux(mms)
563                self._set_do(mms.mux.port, value)
564            # HiZ DO + direct port
565            case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO":
566                self._hiZ_do(mp.port)
567            # HiZ DO + mux select
568            case (action.HiZDo(), _, mms, _) if (
569                mms is not None and mms.mux.mode == "DO"
570            ):
571                self._select_mux(mms)
572                self._hiZ_do(mms.mux.port)
573            # Get DI + direct port
574            case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI":
575                return self._get_di(mp.port)
576            # Get DI + mux select
577            case (action.GetDi(), _, mms, _) if (
578                mms is not None and mms.mux.mode == "DI"
579            ):
580                self._select_mux(mms)
581                return self._get_di(mms.mux.port)
582            # Set AO + direct port
583            case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO":
584                self._set_ao(mp.port, value)
585            # HiZ AO + direct port
586            case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO":
587                self._hiZ_ao(mp.port)
588            # Get AI + direct port
589            case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith(
590                "AI"
591            ):
592                return self._get_ai(mp.port, mp.mode)
593            # Get AI + mux select
594            case (
595                action.GetAi(),
596                _,
597                mms,
598                _,
599            ) if mms is not None and mms.mux.mode.startswith("AI"):
600                self._select_mux(mms)
601                return self._get_ai(mms.mux.port, mms.mux.mode)
602            # Set Pot + direct port
603            case (action.SetPot(value), mp, _, _) if (
604                mp is not None and mp.mode == "POT"
605            ):
606                self._set_pot(mp.port, value)
607            # Send CAN msg + can bus name
608            case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None:
609                can_dbc = mcb.find_dbc(can_dbcs)
610                self._update_can_messages(mcb.bus, can_dbc)
611                self._send_can(mcb.bus, signal, data, can_dbc)
612            # Get last CAN msg + can bus name
613            case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
614                can_dbc = mcb.find_dbc(can_dbcs)
615                self._update_can_messages(mcb.bus, can_dbc)
616                return self.device_can_busses[mcb.bus].get_last(signal)
617            # Get all CAN msgs + can bus name
618            case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
619                can_dbc = mcb.find_dbc(can_dbcs)
620                self._update_can_messages(mcb.bus, can_dbc)
621                return self.device_can_busses[mcb.bus].get_all(signal)
622            # Clear CAN msgs + can bus name
623            case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
624                can_dbc = mcb.find_dbc(can_dbcs)
625                self._update_can_messages(mcb.bus, can_dbc)
626                self.device_can_busses[mcb.bus].clear(signal)
627            # Unsupported action
628            case _:
629                error_msg = (
630                    f"Action {type(action)} not supported for "
631                    f"port {port} on device {self._name}"
632                )
633                raise hil_errors.EngineError(error_msg)
TestDevice( hil_id: int, name: str, ports: dict[str, Port], muxs: dict[str, Mux], can_busses: dict[str, CanBus], adc_config: AdcConfig, dac_config: Optional[DacConfig], pot_config: Optional[PotConfig])
252    def __init__(
253        self,
254        hil_id: int,
255        name: str,
256        ports: dict[str, Port],
257        muxs: dict[str, Mux],
258        can_busses: dict[str, CanBus],
259        adc_config: AdcConfig,
260        dac_config: Optional[DacConfig],
261        pot_config: Optional[PotConfig],
262    ):
263        """
264        :param hil_id: The HIL ID of the device
265        :param name: The name of the device
266        :param ports: The port configurations
267        :param muxs: The MUX configurations
268        :param can_busses: The CAN bus configurations
269        :param adc_config: The ADC configuration
270        :param dac_config: The DAC configuration
271        :param pot_config: The potentiometer configuration
272        """
273        self.hil_id: int = hil_id
274        self._name: str = name
275        self._ports: dict[str, Port] = ports
276        self._muxs: dict[str, Mux] = muxs
277        self._can_busses: dict[str, CanBus] = can_busses
278        self._adc_config: AdcConfig = adc_config
279        self._dac_config: Optional[DacConfig] = dac_config
280        self._pot_config: Optional[PotConfig] = pot_config
281
282        # Please use set_serial() to set the serial!
283        self._ser: Optional[serial_helper.ThreadedSerial] = None
284
285        self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict(
286            map(
287                lambda c: (c.bus, can_helper.CanMessageManager()),
288                self._can_busses.values(),
289            )
290        )
Parameters
  • hil_id: The HIL ID of the device
  • name: The name of the device
  • ports: The port configurations
  • muxs: The MUX configurations
  • can_busses: The CAN bus configurations
  • adc_config: The ADC configuration
  • dac_config: The DAC configuration
  • pot_config: The potentiometer configuration
hil_id: int
device_can_busses: dict[int, hil2.can_helper.CanMessageManager]
@classmethod
def from_json(cls, hil_id: int, name: str, device_config_path: str):
292    @classmethod
293    def from_json(cls, hil_id: int, name: str, device_config_path: str):
294        """
295        Create a TestDevice instance from a JSON configuration file.
296
297        :param hil_id: The HIL ID of the device
298        :param name: The name of the device
299        :param device_config_path: The path to the device configuration JSON file
300        """
301        with open(device_config_path, "r") as device_config_path:
302            device_config = json.load(device_config_path)
303
304        ports = dict(
305            map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", []))
306        )
307        muxs = dict(
308            map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", []))
309        )
310        can_busses = dict(
311            map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", []))
312        )
313
314        match device_config:
315            case {"adc_config": adc_config_data}:
316                adc_config = AdcConfig(adc_config_data)
317            case _:
318                error_msg = f"ADC configuration missing for device {name}"
319                raise hil_errors.ConfigurationError(error_msg)
320
321        match device_config:
322            case {"dac_config": dac_config_data}:
323                dac_config = DacConfig(dac_config_data)
324            case _:
325                dac_config = None
326
327        match device_config:
328            case {"pot_config": pot_config_data}:
329                pot_config = PotConfig(pot_config_data)
330            case _:
331                pot_config = None
332
333        return cls(
334            hil_id,
335            name,
336            ports,
337            muxs,
338            can_busses,
339            adc_config,
340            dac_config,
341            pot_config,
342        )

Create a TestDevice instance from a JSON configuration file.

Parameters
  • hil_id: The HIL ID of the device
  • name: The name of the device
  • device_config_path: The path to the device configuration JSON file
def set_serial(self, ser: hil2.serial_helper.ThreadedSerial) -> None:
344    def set_serial(self, ser: serial_helper.ThreadedSerial) -> None:
345        """
346        Set the serial connection for the TestDevice.
347        The caller is responsible for starting the serial connection's thread.
348
349        :param ser: The serial connection to set
350        """
351        self._ser = ser

Set the serial connection for the TestDevice. The caller is responsible for starting the serial connection's thread.

Parameters
  • ser: The serial connection to set
def close(self) -> None:
353    def close(self) -> None:
354        """
355        Close the serial connection for the TestDevice.
356        """
357        match self._ser:
358            case None:
359                error_msg = f"Cannot close TestDevice {self._name}: serial not set"
360                raise hil_errors.EngineError(error_msg)
361            case ser:
362                ser.stop()

Close the serial connection for the TestDevice.

def do_action( self, action_type: Union[ForwardRef('SetDo'), ForwardRef('HiZDo'), ForwardRef('GetDi'), ForwardRef('SetAo'), ForwardRef('HiZAo'), ForwardRef('GetAi'), ForwardRef('SetPot'), ForwardRef('SendCan'), ForwardRef('GetLastCan'), ForwardRef('GetAllCan'), ForwardRef('ClearCan')], port: str) -> Any:
534    def do_action(self, action_type: action.ActionType, port: str) -> Any:
535        """
536        Perform a HIL action on a specific port.
537
538        :param action_type: The type of action to perform (+ includes all needed info)
539        :param port: The HIL port to perform the action on
540        :return: depends on the action type
541        """
542
543        maybe_port = self._ports.get(port, None)
544        maybe_mux_select = next(
545            (
546                val
547                for m in self._muxs.values()
548                if (val := m.select_from_name(port)) is not None
549            ),
550            None,
551        )
552        maybe_can_bus = self._can_busses.get(port, None)
553
554        match (action_type, maybe_port, maybe_mux_select, maybe_can_bus):
555            # Set DO + direct port
556            case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO":
557                self._set_do(mp.port, value)
558            # Set DO + mux select
559            case (action.SetDo(value), _, mms, _) if (
560                mms is not None and mms.mux.mode == "DO"
561            ):
562                self._select_mux(mms)
563                self._set_do(mms.mux.port, value)
564            # HiZ DO + direct port
565            case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO":
566                self._hiZ_do(mp.port)
567            # HiZ DO + mux select
568            case (action.HiZDo(), _, mms, _) if (
569                mms is not None and mms.mux.mode == "DO"
570            ):
571                self._select_mux(mms)
572                self._hiZ_do(mms.mux.port)
573            # Get DI + direct port
574            case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI":
575                return self._get_di(mp.port)
576            # Get DI + mux select
577            case (action.GetDi(), _, mms, _) if (
578                mms is not None and mms.mux.mode == "DI"
579            ):
580                self._select_mux(mms)
581                return self._get_di(mms.mux.port)
582            # Set AO + direct port
583            case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO":
584                self._set_ao(mp.port, value)
585            # HiZ AO + direct port
586            case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO":
587                self._hiZ_ao(mp.port)
588            # Get AI + direct port
589            case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith(
590                "AI"
591            ):
592                return self._get_ai(mp.port, mp.mode)
593            # Get AI + mux select
594            case (
595                action.GetAi(),
596                _,
597                mms,
598                _,
599            ) if mms is not None and mms.mux.mode.startswith("AI"):
600                self._select_mux(mms)
601                return self._get_ai(mms.mux.port, mms.mux.mode)
602            # Set Pot + direct port
603            case (action.SetPot(value), mp, _, _) if (
604                mp is not None and mp.mode == "POT"
605            ):
606                self._set_pot(mp.port, value)
607            # Send CAN msg + can bus name
608            case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None:
609                can_dbc = mcb.find_dbc(can_dbcs)
610                self._update_can_messages(mcb.bus, can_dbc)
611                self._send_can(mcb.bus, signal, data, can_dbc)
612            # Get last CAN msg + can bus name
613            case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
614                can_dbc = mcb.find_dbc(can_dbcs)
615                self._update_can_messages(mcb.bus, can_dbc)
616                return self.device_can_busses[mcb.bus].get_last(signal)
617            # Get all CAN msgs + can bus name
618            case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
619                can_dbc = mcb.find_dbc(can_dbcs)
620                self._update_can_messages(mcb.bus, can_dbc)
621                return self.device_can_busses[mcb.bus].get_all(signal)
622            # Clear CAN msgs + can bus name
623            case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None:
624                can_dbc = mcb.find_dbc(can_dbcs)
625                self._update_can_messages(mcb.bus, can_dbc)
626                self.device_can_busses[mcb.bus].clear(signal)
627            # Unsupported action
628            case _:
629                error_msg = (
630                    f"Action {type(action)} not supported for "
631                    f"port {port} on device {self._name}"
632                )
633                raise hil_errors.EngineError(error_msg)

Perform a HIL action on a specific port.

Parameters
  • action_type: The type of action to perform (+ includes all needed info)
  • port: The HIL port to perform the action on
Returns

depends on the action type

class TestDeviceManager:
637class TestDeviceManager:
638    """
639    Manages test devices for HIL (Hardware-in-the-Loop) simulation.
640    """
641
642    def __init__(self, test_devices: dict[str, TestDevice]):
643        """
644        :param test_devices: A dictionary of test devices managed by this manager.
645                             key = device name, value = TestDevice instance
646        """
647        self._test_devices: dict[str, TestDevice] = test_devices
648
649    @classmethod
650    def from_json(
651        cls, test_config_path: str, device_config_fpath: str
652    ) -> "TestDeviceManager":
653        """
654        Create a TestDeviceManager instance from JSON configuration files.
655        Is responsible for starting all of the ThreadedSerial instances.
656
657        :param test_config_path: The path to the test configuration JSON file.
658        :param device_config_fpath: The file path to the directory containing device
659                                    configuration files.
660        :return: A TestDeviceManager instance.
661        """
662
663        with open(test_config_path, "r") as test_config_file:
664            test_config = json.load(test_config_file)
665
666        hil_ids = []
667        stop_events = {}
668        test_devices = {}
669        match test_config:
670            case {"hil_devices": hil_devices}:
671                for device in hil_devices:
672                    match device:
673                        case {
674                            "id": hil_id,
675                            "name": name,
676                            "config": config_file_name,
677                        } if (not hil_id in hil_ids):
678                            hil_ids.append(hil_id)
679                            stop_events[hil_id] = threading.Event()
680                            test_devices[name] = TestDevice.from_json(
681                                hil_id,
682                                name,
683                                os.path.join(device_config_fpath, config_file_name),
684                            )
685                        case {"id": hil_id}:
686                            error_msg = f"Duplicate HIL device ID found: {hil_id}"
687                            raise hil_errors.ConfigurationError(error_msg)
688                        case _:
689                            error_msg = f"Invalid HIL device configuration: {device}"
690                            raise hil_errors.ConfigurationError(error_msg)
691            case _:
692                error_msg = "Invalid test configuration: missing 'hil_devices' key"
693                raise hil_errors.ConfigurationError(error_msg)
694
695        hil_devices = serial_helper.discover_devices(hil_ids)
696
697        sers = {
698            hil_id: serial_helper.ThreadedSerial(
699                hil_devices[hil_id][0],
700                hil_devices[hil_id][1],
701                stop_events[hil_id],
702            )
703            for hil_id in hil_ids
704        }
705        for test_device in test_devices.values():
706            ser = sers[test_device.hil_id]
707            t = threading.Thread(target=ser.run)
708            t.start()
709            test_device.set_serial(ser)
710
711        return cls(test_devices)
712
713    def maybe_hil_con_from_net(
714        self, board: str, net: str
715    ) -> Optional[dut_cons.HilDutCon]:
716        """
717        Check to see if a board is a HIL device.
718
719        :param board: The name of the board to check.
720        :param net: The network to use for the connection.
721        :return: A HilDutCon instance if the board is a HIL device, None otherwise.
722        """
723        if board in self._test_devices:
724            return dut_cons.HilDutCon(board, net)
725        else:
726            return None
727
728    def do_action(
729        self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon
730    ) -> Any:
731        """
732        Perform an action on a HIL device.
733
734        :param action_type: The type of action to perform.
735        :param hil_dut_con: The HIL DUT connection information.
736        :return: The result of the action (if any).
737        """
738        if hil_dut_con.device in self._test_devices:
739            return self._test_devices[hil_dut_con.device].do_action(
740                action_type, hil_dut_con.port
741            )
742        else:
743            error_msg = f"Device {hil_dut_con.device} not found"
744            raise hil_errors.ConnectionError(error_msg)
745
746    def close(self) -> None:
747        """
748        Close all HIL devices.
749        """
750        for device in self._test_devices.values():
751            device.close()

Manages test devices for HIL (Hardware-in-the-Loop) simulation.

TestDeviceManager(test_devices: dict[str, TestDevice])
642    def __init__(self, test_devices: dict[str, TestDevice]):
643        """
644        :param test_devices: A dictionary of test devices managed by this manager.
645                             key = device name, value = TestDevice instance
646        """
647        self._test_devices: dict[str, TestDevice] = test_devices
Parameters
  • test_devices: A dictionary of test devices managed by this manager. key = device name, value = TestDevice instance
@classmethod
def from_json( cls, test_config_path: str, device_config_fpath: str) -> TestDeviceManager:
649    @classmethod
650    def from_json(
651        cls, test_config_path: str, device_config_fpath: str
652    ) -> "TestDeviceManager":
653        """
654        Create a TestDeviceManager instance from JSON configuration files.
655        Is responsible for starting all of the ThreadedSerial instances.
656
657        :param test_config_path: The path to the test configuration JSON file.
658        :param device_config_fpath: The file path to the directory containing device
659                                    configuration files.
660        :return: A TestDeviceManager instance.
661        """
662
663        with open(test_config_path, "r") as test_config_file:
664            test_config = json.load(test_config_file)
665
666        hil_ids = []
667        stop_events = {}
668        test_devices = {}
669        match test_config:
670            case {"hil_devices": hil_devices}:
671                for device in hil_devices:
672                    match device:
673                        case {
674                            "id": hil_id,
675                            "name": name,
676                            "config": config_file_name,
677                        } if (not hil_id in hil_ids):
678                            hil_ids.append(hil_id)
679                            stop_events[hil_id] = threading.Event()
680                            test_devices[name] = TestDevice.from_json(
681                                hil_id,
682                                name,
683                                os.path.join(device_config_fpath, config_file_name),
684                            )
685                        case {"id": hil_id}:
686                            error_msg = f"Duplicate HIL device ID found: {hil_id}"
687                            raise hil_errors.ConfigurationError(error_msg)
688                        case _:
689                            error_msg = f"Invalid HIL device configuration: {device}"
690                            raise hil_errors.ConfigurationError(error_msg)
691            case _:
692                error_msg = "Invalid test configuration: missing 'hil_devices' key"
693                raise hil_errors.ConfigurationError(error_msg)
694
695        hil_devices = serial_helper.discover_devices(hil_ids)
696
697        sers = {
698            hil_id: serial_helper.ThreadedSerial(
699                hil_devices[hil_id][0],
700                hil_devices[hil_id][1],
701                stop_events[hil_id],
702            )
703            for hil_id in hil_ids
704        }
705        for test_device in test_devices.values():
706            ser = sers[test_device.hil_id]
707            t = threading.Thread(target=ser.run)
708            t.start()
709            test_device.set_serial(ser)
710
711        return cls(test_devices)

Create a TestDeviceManager instance from JSON configuration files. Is responsible for starting all of the ThreadedSerial instances.

Parameters
  • test_config_path: The path to the test configuration JSON file.
  • device_config_fpath: The file path to the directory containing device configuration files.
Returns

A TestDeviceManager instance.

def maybe_hil_con_from_net(self, board: str, net: str) -> Optional[hil2.dut_cons.HilDutCon]:
713    def maybe_hil_con_from_net(
714        self, board: str, net: str
715    ) -> Optional[dut_cons.HilDutCon]:
716        """
717        Check to see if a board is a HIL device.
718
719        :param board: The name of the board to check.
720        :param net: The network to use for the connection.
721        :return: A HilDutCon instance if the board is a HIL device, None otherwise.
722        """
723        if board in self._test_devices:
724            return dut_cons.HilDutCon(board, net)
725        else:
726            return None

Check to see if a board is a HIL device.

Parameters
  • board: The name of the board to check.
  • net: The network to use for the connection.
Returns

A HilDutCon instance if the board is a HIL device, None otherwise.

def do_action( self, action_type: Union[ForwardRef('SetDo'), ForwardRef('HiZDo'), ForwardRef('GetDi'), ForwardRef('SetAo'), ForwardRef('HiZAo'), ForwardRef('GetAi'), ForwardRef('SetPot'), ForwardRef('SendCan'), ForwardRef('GetLastCan'), ForwardRef('GetAllCan'), ForwardRef('ClearCan')], hil_dut_con: hil2.dut_cons.HilDutCon) -> Any:
728    def do_action(
729        self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon
730    ) -> Any:
731        """
732        Perform an action on a HIL device.
733
734        :param action_type: The type of action to perform.
735        :param hil_dut_con: The HIL DUT connection information.
736        :return: The result of the action (if any).
737        """
738        if hil_dut_con.device in self._test_devices:
739            return self._test_devices[hil_dut_con.device].do_action(
740                action_type, hil_dut_con.port
741            )
742        else:
743            error_msg = f"Device {hil_dut_con.device} not found"
744            raise hil_errors.ConnectionError(error_msg)

Perform an action on a HIL device.

Parameters
  • action_type: The type of action to perform.
  • hil_dut_con: The HIL DUT connection information.
Returns

The result of the action (if any).

def close(self) -> None:
746    def close(self) -> None:
747        """
748        Close all HIL devices.
749        """
750        for device in self._test_devices.values():
751            device.close()

Close all HIL devices.