hil2.test_device
1from typing import Any, Optional 2 3import json 4import os 5import threading 6 7import cantools.database.can.database as cantools_db 8 9from . import action 10from . import can_helper 11from . import commands 12from . import dut_cons 13from . import hil_errors 14from . import serial_helper 15 16 17# Peripheral configuration ------------------------------------------------------------# 18class AdcConfig: 19 """Configuration for an ADC (Analog-to-Digital Converter).""" 20 21 def __init__(self, adc_config: dict): 22 """ 23 :param adc_config: The ADC configuration dictionary 24 """ 25 match adc_config: 26 # PCB 27 case { 28 "bit_resolution": br, 29 "adc_reference_v": ar, 30 "5v_reference_v": v5r, 31 "24v_reference_v": v24r, 32 }: 33 self.bit_resolution: int = br 34 self.adc_reference_v: float = ar 35 self.five_v_reference_v: Optional[float] = v5r 36 self.twenty_four_v_reference_v: Optional[float] = v24r 37 # Breadboard 38 case { 39 "bit_resolution": br, 40 "adc_reference_v": ar, 41 }: 42 self.bit_resolution: int = br 43 self.adc_reference_v: float = ar 44 self.five_v_reference_v: Optional[float] = None 45 self.twenty_four_v_reference_v: Optional[float] = None 46 case _: 47 raise hil_errors.ConfigurationError("Invalid ADC configuration") 48 49 def raw_to_v(self, raw_value: int) -> float: 50 """ 51 Convert a raw ADC value to a voltage. 52 53 :param raw_value: The raw ADC value to convert 54 :return: The converted voltage value 55 """ 56 if raw_value < 0 or raw_value > (2**self.bit_resolution - 1): 57 raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range") 58 return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v 59 60 def raw_to_5v(self, raw_value: int) -> float: 61 """ 62 Convert a raw ADC value to a voltage using the 5V reference. 63 If on a PCB, AI5 means it when through a voltage divider. 64 65 :param raw_value: The raw ADC value to convert 66 :return: The converted voltage value 67 """ 68 match self.five_v_reference_v: 69 case None: 70 error_msg = "5V reference voltage not configured" 71 raise hil_errors.ConfigurationError(error_msg) 72 case v5r: 73 return (self.raw_to_v(raw_value) / v5r) * 5.0 74 75 def raw_to_24v(self, raw_value: int) -> float: 76 """ 77 Convert a raw ADC value to a voltage using the 24V reference. 78 If on a PCB, AI24 means it when through a voltage divider. 79 80 :param raw_value: The raw ADC value to convert 81 :return: The converted voltage value 82 """ 83 match self.twenty_four_v_reference_v: 84 case None: 85 error_msg = "24V reference voltage not configured" 86 raise hil_errors.ConfigurationError(error_msg) 87 case v24r: 88 return (self.raw_to_v(raw_value) / v24r) * 24.0 89 90 91class DacConfig: 92 """Configuration for a DAC (Digital-to-Analog Converter).""" 93 94 def __init__(self, dac_config: dict): 95 """ 96 :param dac_config: The DAC configuration dictionary 97 """ 98 match dac_config: 99 case {"bit_resolution": br, "reference_v": rv}: 100 self.bit_resolution = br 101 self.reference_v = rv 102 case _: 103 raise hil_errors.ConfigurationError("Invalid DAC configuration") 104 105 def v_to_raw(self, value: float) -> int: 106 """ 107 Convert a voltage value to a raw DAC value. 108 109 :param value: The voltage value to convert 110 :return: The converted raw DAC value 111 """ 112 if value < 0 or value > self.reference_v: 113 raise hil_errors.RangeError(f"DAC value {value} out of range") 114 return int((value / self.reference_v) * (2**self.bit_resolution - 1)) 115 116 117class PotConfig: 118 """Configuration for a POT (Potentiometer).""" 119 120 def __init__(self, pot_config: dict): 121 """ 122 :param pot_config: The POT configuration dictionary 123 """ 124 match pot_config: 125 case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}: 126 self.bit_resolution = br 127 self.reference_ohms = r 128 self.wiper_ohms = w 129 case _: 130 raise hil_errors.ConfigurationError("Invalid POT configuration") 131 132 def ohms_to_raw(self, value: float) -> int: 133 """ 134 Convert an ohm value to a raw POT value. 135 136 :param value: The ohm value to convert 137 :return: The converted raw POT value 138 """ 139 if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms: 140 raise hil_errors.RangeError(f"POT value {value} out of range") 141 steps = self.bit_resolution**2 - 1 142 return int((steps * (value - self.wiper_ohms)) / self.reference_ohms) 143 144 145# Interface configuration -------------------------------------------------------------# 146class Port: 147 """Configuration for a port.""" 148 149 def __init__(self, port: dict): 150 """ 151 :param port: The port configuration dictionary 152 """ 153 match port: 154 case {"name": name, "port": port, "mode": mode}: 155 self.name: str = name 156 self.port: int = port 157 self.mode: str = mode 158 case _: 159 raise hil_errors.ConfigurationError("Invalid Port configuration") 160 161 162class Mux: 163 """Configuration for a MUX (Multiplexer).""" 164 165 def __init__(self, mux: dict): 166 """ 167 :param mux: The MUX configuration dictionary 168 """ 169 match mux: 170 case { 171 "name": name, 172 "mode": mode, 173 "select_ports": select_ports, 174 "port": port, 175 }: 176 self.name: str = name 177 self.mode: str = mode 178 self.select_ports: list[int] = select_ports 179 self.port: int = port 180 case _: 181 raise hil_errors.ConfigurationError("Invalid Mux configuration") 182 183 def select_from_name(self, name: str) -> Optional["MuxSelect"]: 184 """ 185 Attempt to see if self is the base mux that is being referenced. 186 For example: DMUX_6 means DMUX is the base mux and 6 is the select line. 187 188 :param name: The name of the MUX select line (ex: DMUX_6) 189 :return: The MuxSelect instance if found, None otherwise 190 """ 191 name_parts = name.rsplit("_", 1) 192 if len(name_parts) < 2: 193 return None 194 if name_parts[0] != self.name: 195 return None 196 try: 197 return MuxSelect(self, int(name_parts[1])) 198 except ValueError: 199 return None 200 201 202class MuxSelect: 203 """Represents a selected MUX (Multiplexer) line.""" 204 205 def __init__(self, mux: Mux, select: int): 206 """ 207 :param mux: The MUX instance 208 :param select: The selected line number (0 indexed) 209 """ 210 self.mux: Mux = mux 211 self.select: int = select 212 213 214class CanBus: 215 """Configuration for a CAN (Controller Area Network) bus.""" 216 217 def __init__(self, can_bus: dict): 218 """ 219 :param can_bus: The CAN bus configuration dictionary 220 """ 221 match can_bus: 222 case {"name": name, "bus": bus, "dbc_file": dbc_file}: 223 self.name: str = name 224 self.bus: int = bus 225 self.dbc_file: str = dbc_file 226 case _: 227 raise hil_errors.ConfigurationError("Invalid CAN Bus configuration") 228 229 def find_dbc( 230 self, can_dbcs: dict[str, cantools_db.Database] 231 ) -> cantools_db.Database: 232 """ 233 Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error. 234 235 :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name 236 :return: The CAN database if found 237 """ 238 match can_dbcs.get(self.dbc_file, None): 239 case None: 240 error_msg = ( 241 f"DBC file {self.dbc_file} not found for CAN bus {self.name}" 242 ) 243 raise hil_errors.ConfigurationError(error_msg) 244 case db: 245 return db 246 247 248# Test device -------------------------------------------------------------------------# 249class TestDevice: 250 # Init ----------------------------------------------------------------------------# 251 def __init__( 252 self, 253 hil_id: int, 254 name: str, 255 ports: dict[str, Port], 256 muxs: dict[str, Mux], 257 can_busses: dict[str, CanBus], 258 adc_config: AdcConfig, 259 dac_config: Optional[DacConfig], 260 pot_config: Optional[PotConfig], 261 ): 262 """ 263 :param hil_id: The HIL ID of the device 264 :param name: The name of the device 265 :param ports: The port configurations 266 :param muxs: The MUX configurations 267 :param can_busses: The CAN bus configurations 268 :param adc_config: The ADC configuration 269 :param dac_config: The DAC configuration 270 :param pot_config: The potentiometer configuration 271 """ 272 self.hil_id: int = hil_id 273 self._name: str = name 274 self._ports: dict[str, Port] = ports 275 self._muxs: dict[str, Mux] = muxs 276 self._can_busses: dict[str, CanBus] = can_busses 277 self._adc_config: AdcConfig = adc_config 278 self._dac_config: Optional[DacConfig] = dac_config 279 self._pot_config: Optional[PotConfig] = pot_config 280 281 # Please use set_serial() to set the serial! 282 self._ser: Optional[serial_helper.ThreadedSerial] = None 283 284 self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict( 285 map( 286 lambda c: (c.bus, can_helper.CanMessageManager()), 287 self._can_busses.values(), 288 ) 289 ) 290 291 @classmethod 292 def from_json(cls, hil_id: int, name: str, device_config_path: str): 293 """ 294 Create a TestDevice instance from a JSON configuration file. 295 296 :param hil_id: The HIL ID of the device 297 :param name: The name of the device 298 :param device_config_path: The path to the device configuration JSON file 299 """ 300 with open(device_config_path, "r") as device_config_path: 301 device_config = json.load(device_config_path) 302 303 ports = dict( 304 map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", [])) 305 ) 306 muxs = dict( 307 map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", [])) 308 ) 309 can_busses = dict( 310 map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", [])) 311 ) 312 313 match device_config: 314 case {"adc_config": adc_config_data}: 315 adc_config = AdcConfig(adc_config_data) 316 case _: 317 error_msg = f"ADC configuration missing for device {name}" 318 raise hil_errors.ConfigurationError(error_msg) 319 320 match device_config: 321 case {"dac_config": dac_config_data}: 322 dac_config = DacConfig(dac_config_data) 323 case _: 324 dac_config = None 325 326 match device_config: 327 case {"pot_config": pot_config_data}: 328 pot_config = PotConfig(pot_config_data) 329 case _: 330 pot_config = None 331 332 return cls( 333 hil_id, 334 name, 335 ports, 336 muxs, 337 can_busses, 338 adc_config, 339 dac_config, 340 pot_config, 341 ) 342 343 def set_serial(self, ser: serial_helper.ThreadedSerial) -> None: 344 """ 345 Set the serial connection for the TestDevice. 346 The caller is responsible for starting the serial connection's thread. 347 348 :param ser: The serial connection to set 349 """ 350 self._ser = ser 351 352 def close(self) -> None: 353 """ 354 Close the serial connection for the TestDevice. 355 """ 356 match self._ser: 357 case None: 358 error_msg = f"Cannot close TestDevice {self._name}: serial not set" 359 raise hil_errors.EngineError(error_msg) 360 case ser: 361 ser.stop() 362 363 # Command handling ----------------------------------------------------------------# 364 def _select_mux(self, mux_select: MuxSelect) -> None: 365 """ 366 Select a MUX (Multiplexer) line. 367 368 :param mux_select: The MUX selection information 369 """ 370 for i, p in enumerate(mux_select.mux.select_ports): 371 select_bit = True if (mux_select.select & (1 << i)) else False 372 self._set_do(p, select_bit) 373 374 def _set_do(self, pin: int, value: bool) -> None: 375 """ 376 Set a digital output (DO) pin. 377 378 :param pin: The pin number to set 379 :param value: The value to set the pin to (low = False, high = True) 380 """ 381 match self._ser: 382 case None: 383 error_msg = f"Cannot set DO on TestDevice {self._name}: serial not set" 384 raise hil_errors.EngineError(error_msg) 385 case ser: 386 commands.write_gpio(ser, pin, value) 387 388 def _hiZ_do(self, pin: int) -> None: 389 """ 390 Set a digital output (DO) pin to high impedance (HiZ). 391 392 :param pin: The pin number to set 393 """ 394 match self._ser: 395 case None: 396 error_msg = ( 397 f"Cannot set HiZ DO on TestDevice {self._name}: serial not set" 398 ) 399 raise hil_errors.EngineError(error_msg) 400 case ser: 401 commands.hiZ_gpio(ser, pin) 402 403 def _get_di(self, pin: int) -> bool: 404 """ 405 Get the digital input (DI) state of a pin. 406 407 :param pin: The pin number to read 408 :return: The state of the pin (True for high, False for low) 409 """ 410 match self._ser: 411 case None: 412 error_msg = f"Cannot get DI on TestDevice {self._name}: serial not set" 413 raise hil_errors.EngineError(error_msg) 414 case ser: 415 return commands.read_gpio(ser, pin) 416 417 def _set_ao(self, pin: int, value: float) -> None: 418 """ 419 Set an analog output (AO) pin after converting the volts value to raw. 420 421 :param pin: The pin/offset number to set 422 :param value: The voltage value to set the pin to 423 """ 424 match (self._ser, self._dac_config): 425 case (ser, dac_config) if ser is not None and dac_config is not None: 426 raw_value = dac_config.v_to_raw(value) 427 commands.write_dac(ser, pin, raw_value) 428 case _: 429 error_msg = f"Cannot set AO on TestDevice {self._name}: serial or DAC config not set" 430 raise hil_errors.EngineError(error_msg) 431 432 def _hiZ_ao(self, pin: int) -> None: 433 """ 434 Set an analog output (AO) pin to high impedance (HiZ). 435 436 :param pin: The pin/offset number to set 437 """ 438 match self._ser: 439 case None: 440 error_msg = ( 441 f"Cannot set HiZ AO on TestDevice {self._name}: serial not set" 442 ) 443 raise hil_errors.EngineError(error_msg) 444 case ser: 445 commands.hiZ_dac(ser, pin) 446 447 def _get_ai(self, pin: int, mode: str) -> float: 448 """ 449 Get an analog input (AI) reading from a pin and convert the reading to volts. 450 451 :param pin: The pin number to read 452 :param mode: The mode to use for the reading (AI5, AI24, or AI) 453 :return: The voltage value read from the pin 454 """ 455 match self._ser: 456 case None: 457 error_msg = f"Cannot get AI on TestDevice {self._name}: serial not set" 458 raise hil_errors.EngineError(error_msg) 459 case ser: 460 raw_value = commands.read_adc(ser, pin) 461 462 if mode == "AI5": 463 return self._adc_config.raw_to_5v(raw_value) 464 elif mode == "AI24": 465 return self._adc_config.raw_to_24v(raw_value) 466 elif mode == "AI": 467 return self._adc_config.raw_to_v(raw_value) 468 else: 469 raise ValueError(f"Unsupported AI mode: {mode}") 470 471 def _set_pot(self, pin: int, value: float) -> None: 472 """ 473 Set a potentiometer (POT) pin after converting the ohms value to raw. 474 475 :param pin: The pin/offset to set 476 :param value: The resistance value to set the pin to (in ohms) 477 """ 478 match (self._ser, self._pot_config): 479 case (ser, pot_config) if ser is not None and pot_config is not None: 480 raw_value = pot_config.ohms_to_raw(value) 481 commands.write_pot(ser, pin, raw_value) 482 case _: 483 error_msg = f"Cannot set POT on TestDevice {self._name}: serial not set" 484 raise hil_errors.EngineError(error_msg) 485 486 def _update_can_messages(self, bus: int, can_dbc: cantools_db.Database) -> None: 487 """ 488 Update the CAN message store by decoding the saved parsed can messages from the Serial. 489 490 :param bus: The CAN bus to update 491 :param can_dbc: The CAN database to use for decoding 492 """ 493 match self._ser: 494 case None: 495 error_msg = ( 496 f"Cannot update CAN messages on TestDevice {self._name}: " 497 "serial not set" 498 ) 499 raise hil_errors.EngineError(error_msg) 500 case ser: 501 self.device_can_busses[bus].add_multiple( 502 commands.parse_can_messages(ser, bus, can_dbc) 503 ) 504 505 def _send_can( 506 self, bus: int, signal: str | int, data: dict, can_dbc: cantools_db.Database 507 ) -> None: 508 """ 509 Send a CAN message on the specified bus. 510 511 :param bus: The CAN bus to send the message on 512 :param signal: The CAN signal to send 513 :param data: The data to include in the CAN message 514 :param can_dbc: The CAN database to use for encoding 515 """ 516 raw_data = list(can_dbc.encode_message(signal, data)) 517 if isinstance(signal, int): 518 msg_id = can_dbc.get_message_by_frame_id(signal).frame_id 519 else: 520 msg_id = can_dbc.get_message_by_name(signal).frame_id 521 522 match self._ser: 523 case None: 524 error_msg = ( 525 f"Cannot send CAN message on TestDevice {self._name}: " 526 "serial not set" 527 ) 528 raise hil_errors.EngineError(error_msg) 529 case ser: 530 commands.send_can(ser, bus, msg_id, raw_data) 531 532 # Action --------------------------------------------------------------------------# 533 def do_action(self, action_type: action.ActionType, port: str) -> Any: 534 """ 535 Perform a HIL action on a specific port. 536 537 :param action_type: The type of action to perform (+ includes all needed info) 538 :param port: The HIL port to perform the action on 539 :return: depends on the action type 540 """ 541 542 maybe_port = self._ports.get(port, None) 543 maybe_mux_select = next( 544 ( 545 val 546 for m in self._muxs.values() 547 if (val := m.select_from_name(port)) is not None 548 ), 549 None, 550 ) 551 maybe_can_bus = self._can_busses.get(port, None) 552 553 match (action_type, maybe_port, maybe_mux_select, maybe_can_bus): 554 # Set DO + direct port 555 case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO": 556 self._set_do(mp.port, value) 557 # Set DO + mux select 558 case (action.SetDo(value), _, mms, _) if ( 559 mms is not None and mms.mux.mode == "DO" 560 ): 561 self._select_mux(mms) 562 self._set_do(mms.mux.port, value) 563 # HiZ DO + direct port 564 case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO": 565 self._hiZ_do(mp.port) 566 # HiZ DO + mux select 567 case (action.HiZDo(), _, mms, _) if ( 568 mms is not None and mms.mux.mode == "DO" 569 ): 570 self._select_mux(mms) 571 self._hiZ_do(mms.mux.port) 572 # Get DI + direct port 573 case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI": 574 return self._get_di(mp.port) 575 # Get DI + mux select 576 case (action.GetDi(), _, mms, _) if ( 577 mms is not None and mms.mux.mode == "DI" 578 ): 579 self._select_mux(mms) 580 return self._get_di(mms.mux.port) 581 # Set AO + direct port 582 case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO": 583 self._set_ao(mp.port, value) 584 # HiZ AO + direct port 585 case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO": 586 self._hiZ_ao(mp.port) 587 # Get AI + direct port 588 case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith( 589 "AI" 590 ): 591 return self._get_ai(mp.port, mp.mode) 592 # Get AI + mux select 593 case ( 594 action.GetAi(), 595 _, 596 mms, 597 _, 598 ) if mms is not None and mms.mux.mode.startswith("AI"): 599 self._select_mux(mms) 600 return self._get_ai(mms.mux.port, mms.mux.mode) 601 # Set Pot + direct port 602 case (action.SetPot(value), mp, _, _) if ( 603 mp is not None and mp.mode == "POT" 604 ): 605 self._set_pot(mp.port, value) 606 # Send CAN msg + can bus name 607 case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None: 608 can_dbc = mcb.find_dbc(can_dbcs) 609 self._update_can_messages(mcb.bus, can_dbc) 610 self._send_can(mcb.bus, signal, data, can_dbc) 611 # Get last CAN msg + can bus name 612 case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 613 can_dbc = mcb.find_dbc(can_dbcs) 614 self._update_can_messages(mcb.bus, can_dbc) 615 return self.device_can_busses[mcb.bus].get_last(signal) 616 # Get all CAN msgs + can bus name 617 case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 618 can_dbc = mcb.find_dbc(can_dbcs) 619 self._update_can_messages(mcb.bus, can_dbc) 620 return self.device_can_busses[mcb.bus].get_all(signal) 621 # Clear CAN msgs + can bus name 622 case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 623 can_dbc = mcb.find_dbc(can_dbcs) 624 self._update_can_messages(mcb.bus, can_dbc) 625 self.device_can_busses[mcb.bus].clear(signal) 626 # Unsupported action 627 case _: 628 error_msg = ( 629 f"Action {type(action)} not supported for " 630 f"port {port} on device {self._name}" 631 ) 632 raise hil_errors.EngineError(error_msg) 633 634 635# Test device manager -----------------------------------------------------------------# 636class TestDeviceManager: 637 """ 638 Manages test devices for HIL (Hardware-in-the-Loop) simulation. 639 """ 640 641 def __init__(self, test_devices: dict[str, TestDevice]): 642 """ 643 :param test_devices: A dictionary of test devices managed by this manager. 644 key = device name, value = TestDevice instance 645 """ 646 self._test_devices: dict[str, TestDevice] = test_devices 647 648 @classmethod 649 def from_json( 650 cls, test_config_path: str, device_config_fpath: str 651 ) -> "TestDeviceManager": 652 """ 653 Create a TestDeviceManager instance from JSON configuration files. 654 Is responsible for starting all of the ThreadedSerial instances. 655 656 :param test_config_path: The path to the test configuration JSON file. 657 :param device_config_fpath: The file path to the directory containing device 658 configuration files. 659 :return: A TestDeviceManager instance. 660 """ 661 662 with open(test_config_path, "r") as test_config_file: 663 test_config = json.load(test_config_file) 664 665 hil_ids = [] 666 stop_events = {} 667 test_devices = {} 668 match test_config: 669 case {"hil_devices": hil_devices}: 670 for device in hil_devices: 671 match device: 672 case { 673 "id": hil_id, 674 "name": name, 675 "config": config_file_name, 676 } if (not hil_id in hil_ids): 677 hil_ids.append(hil_id) 678 stop_events[hil_id] = threading.Event() 679 test_devices[name] = TestDevice.from_json( 680 hil_id, 681 name, 682 os.path.join(device_config_fpath, config_file_name), 683 ) 684 case {"id": hil_id}: 685 error_msg = f"Duplicate HIL device ID found: {hil_id}" 686 raise hil_errors.ConfigurationError(error_msg) 687 case _: 688 error_msg = f"Invalid HIL device configuration: {device}" 689 raise hil_errors.ConfigurationError(error_msg) 690 case _: 691 error_msg = "Invalid test configuration: missing 'hil_devices' key" 692 raise hil_errors.ConfigurationError(error_msg) 693 694 hil_devices = serial_helper.discover_devices(hil_ids) 695 696 sers = { 697 hil_id: serial_helper.ThreadedSerial( 698 hil_devices[hil_id][0], 699 hil_devices[hil_id][1], 700 stop_events[hil_id], 701 ) 702 for hil_id in hil_ids 703 } 704 for test_device in test_devices.values(): 705 ser = sers[test_device.hil_id] 706 t = threading.Thread(target=ser.run) 707 t.start() 708 test_device.set_serial(ser) 709 710 return cls(test_devices) 711 712 def maybe_hil_con_from_net( 713 self, board: str, net: str 714 ) -> Optional[dut_cons.HilDutCon]: 715 """ 716 Check to see if a board is a HIL device. 717 718 :param board: The name of the board to check. 719 :param net: The network to use for the connection. 720 :return: A HilDutCon instance if the board is a HIL device, None otherwise. 721 """ 722 if board in self._test_devices: 723 return dut_cons.HilDutCon(board, net) 724 else: 725 return None 726 727 def do_action( 728 self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon 729 ) -> Any: 730 """ 731 Perform an action on a HIL device. 732 733 :param action_type: The type of action to perform. 734 :param hil_dut_con: The HIL DUT connection information. 735 :return: The result of the action (if any). 736 """ 737 if hil_dut_con.device in self._test_devices: 738 return self._test_devices[hil_dut_con.device].do_action( 739 action_type, hil_dut_con.port 740 ) 741 else: 742 error_msg = f"Device {hil_dut_con.device} not found" 743 raise hil_errors.ConnectionError(error_msg) 744 745 def close(self) -> None: 746 """ 747 Close all HIL devices. 748 """ 749 for device in self._test_devices.values(): 750 device.close()
19class AdcConfig: 20 """Configuration for an ADC (Analog-to-Digital Converter).""" 21 22 def __init__(self, adc_config: dict): 23 """ 24 :param adc_config: The ADC configuration dictionary 25 """ 26 match adc_config: 27 # PCB 28 case { 29 "bit_resolution": br, 30 "adc_reference_v": ar, 31 "5v_reference_v": v5r, 32 "24v_reference_v": v24r, 33 }: 34 self.bit_resolution: int = br 35 self.adc_reference_v: float = ar 36 self.five_v_reference_v: Optional[float] = v5r 37 self.twenty_four_v_reference_v: Optional[float] = v24r 38 # Breadboard 39 case { 40 "bit_resolution": br, 41 "adc_reference_v": ar, 42 }: 43 self.bit_resolution: int = br 44 self.adc_reference_v: float = ar 45 self.five_v_reference_v: Optional[float] = None 46 self.twenty_four_v_reference_v: Optional[float] = None 47 case _: 48 raise hil_errors.ConfigurationError("Invalid ADC configuration") 49 50 def raw_to_v(self, raw_value: int) -> float: 51 """ 52 Convert a raw ADC value to a voltage. 53 54 :param raw_value: The raw ADC value to convert 55 :return: The converted voltage value 56 """ 57 if raw_value < 0 or raw_value > (2**self.bit_resolution - 1): 58 raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range") 59 return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v 60 61 def raw_to_5v(self, raw_value: int) -> float: 62 """ 63 Convert a raw ADC value to a voltage using the 5V reference. 64 If on a PCB, AI5 means it when through a voltage divider. 65 66 :param raw_value: The raw ADC value to convert 67 :return: The converted voltage value 68 """ 69 match self.five_v_reference_v: 70 case None: 71 error_msg = "5V reference voltage not configured" 72 raise hil_errors.ConfigurationError(error_msg) 73 case v5r: 74 return (self.raw_to_v(raw_value) / v5r) * 5.0 75 76 def raw_to_24v(self, raw_value: int) -> float: 77 """ 78 Convert a raw ADC value to a voltage using the 24V reference. 79 If on a PCB, AI24 means it when through a voltage divider. 80 81 :param raw_value: The raw ADC value to convert 82 :return: The converted voltage value 83 """ 84 match self.twenty_four_v_reference_v: 85 case None: 86 error_msg = "24V reference voltage not configured" 87 raise hil_errors.ConfigurationError(error_msg) 88 case v24r: 89 return (self.raw_to_v(raw_value) / v24r) * 24.0
Configuration for an ADC (Analog-to-Digital Converter).
22 def __init__(self, adc_config: dict): 23 """ 24 :param adc_config: The ADC configuration dictionary 25 """ 26 match adc_config: 27 # PCB 28 case { 29 "bit_resolution": br, 30 "adc_reference_v": ar, 31 "5v_reference_v": v5r, 32 "24v_reference_v": v24r, 33 }: 34 self.bit_resolution: int = br 35 self.adc_reference_v: float = ar 36 self.five_v_reference_v: Optional[float] = v5r 37 self.twenty_four_v_reference_v: Optional[float] = v24r 38 # Breadboard 39 case { 40 "bit_resolution": br, 41 "adc_reference_v": ar, 42 }: 43 self.bit_resolution: int = br 44 self.adc_reference_v: float = ar 45 self.five_v_reference_v: Optional[float] = None 46 self.twenty_four_v_reference_v: Optional[float] = None 47 case _: 48 raise hil_errors.ConfigurationError("Invalid ADC configuration")
Parameters
- adc_config: The ADC configuration dictionary
50 def raw_to_v(self, raw_value: int) -> float: 51 """ 52 Convert a raw ADC value to a voltage. 53 54 :param raw_value: The raw ADC value to convert 55 :return: The converted voltage value 56 """ 57 if raw_value < 0 or raw_value > (2**self.bit_resolution - 1): 58 raise hil_errors.RangeError(f"ADC raw value {raw_value} out of range") 59 return (raw_value / (2**self.bit_resolution - 1)) * self.adc_reference_v
Convert a raw ADC value to a voltage.
Parameters
- raw_value: The raw ADC value to convert
Returns
The converted voltage value
61 def raw_to_5v(self, raw_value: int) -> float: 62 """ 63 Convert a raw ADC value to a voltage using the 5V reference. 64 If on a PCB, AI5 means it when through a voltage divider. 65 66 :param raw_value: The raw ADC value to convert 67 :return: The converted voltage value 68 """ 69 match self.five_v_reference_v: 70 case None: 71 error_msg = "5V reference voltage not configured" 72 raise hil_errors.ConfigurationError(error_msg) 73 case v5r: 74 return (self.raw_to_v(raw_value) / v5r) * 5.0
Convert a raw ADC value to a voltage using the 5V reference. If on a PCB, AI5 means it when through a voltage divider.
Parameters
- raw_value: The raw ADC value to convert
Returns
The converted voltage value
76 def raw_to_24v(self, raw_value: int) -> float: 77 """ 78 Convert a raw ADC value to a voltage using the 24V reference. 79 If on a PCB, AI24 means it when through a voltage divider. 80 81 :param raw_value: The raw ADC value to convert 82 :return: The converted voltage value 83 """ 84 match self.twenty_four_v_reference_v: 85 case None: 86 error_msg = "24V reference voltage not configured" 87 raise hil_errors.ConfigurationError(error_msg) 88 case v24r: 89 return (self.raw_to_v(raw_value) / v24r) * 24.0
Convert a raw ADC value to a voltage using the 24V reference. If on a PCB, AI24 means it when through a voltage divider.
Parameters
- raw_value: The raw ADC value to convert
Returns
The converted voltage value
92class DacConfig: 93 """Configuration for a DAC (Digital-to-Analog Converter).""" 94 95 def __init__(self, dac_config: dict): 96 """ 97 :param dac_config: The DAC configuration dictionary 98 """ 99 match dac_config: 100 case {"bit_resolution": br, "reference_v": rv}: 101 self.bit_resolution = br 102 self.reference_v = rv 103 case _: 104 raise hil_errors.ConfigurationError("Invalid DAC configuration") 105 106 def v_to_raw(self, value: float) -> int: 107 """ 108 Convert a voltage value to a raw DAC value. 109 110 :param value: The voltage value to convert 111 :return: The converted raw DAC value 112 """ 113 if value < 0 or value > self.reference_v: 114 raise hil_errors.RangeError(f"DAC value {value} out of range") 115 return int((value / self.reference_v) * (2**self.bit_resolution - 1))
Configuration for a DAC (Digital-to-Analog Converter).
95 def __init__(self, dac_config: dict): 96 """ 97 :param dac_config: The DAC configuration dictionary 98 """ 99 match dac_config: 100 case {"bit_resolution": br, "reference_v": rv}: 101 self.bit_resolution = br 102 self.reference_v = rv 103 case _: 104 raise hil_errors.ConfigurationError("Invalid DAC configuration")
Parameters
- dac_config: The DAC configuration dictionary
106 def v_to_raw(self, value: float) -> int: 107 """ 108 Convert a voltage value to a raw DAC value. 109 110 :param value: The voltage value to convert 111 :return: The converted raw DAC value 112 """ 113 if value < 0 or value > self.reference_v: 114 raise hil_errors.RangeError(f"DAC value {value} out of range") 115 return int((value / self.reference_v) * (2**self.bit_resolution - 1))
Convert a voltage value to a raw DAC value.
Parameters
- value: The voltage value to convert
Returns
The converted raw DAC value
118class PotConfig: 119 """Configuration for a POT (Potentiometer).""" 120 121 def __init__(self, pot_config: dict): 122 """ 123 :param pot_config: The POT configuration dictionary 124 """ 125 match pot_config: 126 case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}: 127 self.bit_resolution = br 128 self.reference_ohms = r 129 self.wiper_ohms = w 130 case _: 131 raise hil_errors.ConfigurationError("Invalid POT configuration") 132 133 def ohms_to_raw(self, value: float) -> int: 134 """ 135 Convert an ohm value to a raw POT value. 136 137 :param value: The ohm value to convert 138 :return: The converted raw POT value 139 """ 140 if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms: 141 raise hil_errors.RangeError(f"POT value {value} out of range") 142 steps = self.bit_resolution**2 - 1 143 return int((steps * (value - self.wiper_ohms)) / self.reference_ohms)
Configuration for a POT (Potentiometer).
121 def __init__(self, pot_config: dict): 122 """ 123 :param pot_config: The POT configuration dictionary 124 """ 125 match pot_config: 126 case {"bit_resolution": br, "reference_ohms": r, "wiper_ohms": w}: 127 self.bit_resolution = br 128 self.reference_ohms = r 129 self.wiper_ohms = w 130 case _: 131 raise hil_errors.ConfigurationError("Invalid POT configuration")
Parameters
- pot_config: The POT configuration dictionary
133 def ohms_to_raw(self, value: float) -> int: 134 """ 135 Convert an ohm value to a raw POT value. 136 137 :param value: The ohm value to convert 138 :return: The converted raw POT value 139 """ 140 if value < self.wiper_ohms or value > self.reference_ohms + self.wiper_ohms: 141 raise hil_errors.RangeError(f"POT value {value} out of range") 142 steps = self.bit_resolution**2 - 1 143 return int((steps * (value - self.wiper_ohms)) / self.reference_ohms)
Convert an ohm value to a raw POT value.
Parameters
- value: The ohm value to convert
Returns
The converted raw POT value
147class Port: 148 """Configuration for a port.""" 149 150 def __init__(self, port: dict): 151 """ 152 :param port: The port configuration dictionary 153 """ 154 match port: 155 case {"name": name, "port": port, "mode": mode}: 156 self.name: str = name 157 self.port: int = port 158 self.mode: str = mode 159 case _: 160 raise hil_errors.ConfigurationError("Invalid Port configuration")
Configuration for a port.
150 def __init__(self, port: dict): 151 """ 152 :param port: The port configuration dictionary 153 """ 154 match port: 155 case {"name": name, "port": port, "mode": mode}: 156 self.name: str = name 157 self.port: int = port 158 self.mode: str = mode 159 case _: 160 raise hil_errors.ConfigurationError("Invalid Port configuration")
Parameters
- port: The port configuration dictionary
163class Mux: 164 """Configuration for a MUX (Multiplexer).""" 165 166 def __init__(self, mux: dict): 167 """ 168 :param mux: The MUX configuration dictionary 169 """ 170 match mux: 171 case { 172 "name": name, 173 "mode": mode, 174 "select_ports": select_ports, 175 "port": port, 176 }: 177 self.name: str = name 178 self.mode: str = mode 179 self.select_ports: list[int] = select_ports 180 self.port: int = port 181 case _: 182 raise hil_errors.ConfigurationError("Invalid Mux configuration") 183 184 def select_from_name(self, name: str) -> Optional["MuxSelect"]: 185 """ 186 Attempt to see if self is the base mux that is being referenced. 187 For example: DMUX_6 means DMUX is the base mux and 6 is the select line. 188 189 :param name: The name of the MUX select line (ex: DMUX_6) 190 :return: The MuxSelect instance if found, None otherwise 191 """ 192 name_parts = name.rsplit("_", 1) 193 if len(name_parts) < 2: 194 return None 195 if name_parts[0] != self.name: 196 return None 197 try: 198 return MuxSelect(self, int(name_parts[1])) 199 except ValueError: 200 return None
Configuration for a MUX (Multiplexer).
166 def __init__(self, mux: dict): 167 """ 168 :param mux: The MUX configuration dictionary 169 """ 170 match mux: 171 case { 172 "name": name, 173 "mode": mode, 174 "select_ports": select_ports, 175 "port": port, 176 }: 177 self.name: str = name 178 self.mode: str = mode 179 self.select_ports: list[int] = select_ports 180 self.port: int = port 181 case _: 182 raise hil_errors.ConfigurationError("Invalid Mux configuration")
Parameters
- mux: The MUX configuration dictionary
184 def select_from_name(self, name: str) -> Optional["MuxSelect"]: 185 """ 186 Attempt to see if self is the base mux that is being referenced. 187 For example: DMUX_6 means DMUX is the base mux and 6 is the select line. 188 189 :param name: The name of the MUX select line (ex: DMUX_6) 190 :return: The MuxSelect instance if found, None otherwise 191 """ 192 name_parts = name.rsplit("_", 1) 193 if len(name_parts) < 2: 194 return None 195 if name_parts[0] != self.name: 196 return None 197 try: 198 return MuxSelect(self, int(name_parts[1])) 199 except ValueError: 200 return None
Attempt to see if self is the base mux that is being referenced. For example: DMUX_6 means DMUX is the base mux and 6 is the select line.
Parameters
- name: The name of the MUX select line (ex: DMUX_6)
Returns
The MuxSelect instance if found, None otherwise
203class MuxSelect: 204 """Represents a selected MUX (Multiplexer) line.""" 205 206 def __init__(self, mux: Mux, select: int): 207 """ 208 :param mux: The MUX instance 209 :param select: The selected line number (0 indexed) 210 """ 211 self.mux: Mux = mux 212 self.select: int = select
Represents a selected MUX (Multiplexer) line.
206 def __init__(self, mux: Mux, select: int): 207 """ 208 :param mux: The MUX instance 209 :param select: The selected line number (0 indexed) 210 """ 211 self.mux: Mux = mux 212 self.select: int = select
Parameters
- mux: The MUX instance
- select: The selected line number (0 indexed)
215class CanBus: 216 """Configuration for a CAN (Controller Area Network) bus.""" 217 218 def __init__(self, can_bus: dict): 219 """ 220 :param can_bus: The CAN bus configuration dictionary 221 """ 222 match can_bus: 223 case {"name": name, "bus": bus, "dbc_file": dbc_file}: 224 self.name: str = name 225 self.bus: int = bus 226 self.dbc_file: str = dbc_file 227 case _: 228 raise hil_errors.ConfigurationError("Invalid CAN Bus configuration") 229 230 def find_dbc( 231 self, can_dbcs: dict[str, cantools_db.Database] 232 ) -> cantools_db.Database: 233 """ 234 Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error. 235 236 :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name 237 :return: The CAN database if found 238 """ 239 match can_dbcs.get(self.dbc_file, None): 240 case None: 241 error_msg = ( 242 f"DBC file {self.dbc_file} not found for CAN bus {self.name}" 243 ) 244 raise hil_errors.ConfigurationError(error_msg) 245 case db: 246 return db
Configuration for a CAN (Controller Area Network) bus.
218 def __init__(self, can_bus: dict): 219 """ 220 :param can_bus: The CAN bus configuration dictionary 221 """ 222 match can_bus: 223 case {"name": name, "bus": bus, "dbc_file": dbc_file}: 224 self.name: str = name 225 self.bus: int = bus 226 self.dbc_file: str = dbc_file 227 case _: 228 raise hil_errors.ConfigurationError("Invalid CAN Bus configuration")
Parameters
- can_bus: The CAN bus configuration dictionary
230 def find_dbc( 231 self, can_dbcs: dict[str, cantools_db.Database] 232 ) -> cantools_db.Database: 233 """ 234 Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error. 235 236 :param can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name 237 :return: The CAN database if found 238 """ 239 match can_dbcs.get(self.dbc_file, None): 240 case None: 241 error_msg = ( 242 f"DBC file {self.dbc_file} not found for CAN bus {self.name}" 243 ) 244 raise hil_errors.ConfigurationError(error_msg) 245 case db: 246 return db
Attempt to find the CAN DBC database for this CAN bus. If not found, raise an error.
Parameters
- can_dbcs: The dictionary of loaded CAN databases, keyed by DBC file name
Returns
The CAN database if found
250class TestDevice: 251 # Init ----------------------------------------------------------------------------# 252 def __init__( 253 self, 254 hil_id: int, 255 name: str, 256 ports: dict[str, Port], 257 muxs: dict[str, Mux], 258 can_busses: dict[str, CanBus], 259 adc_config: AdcConfig, 260 dac_config: Optional[DacConfig], 261 pot_config: Optional[PotConfig], 262 ): 263 """ 264 :param hil_id: The HIL ID of the device 265 :param name: The name of the device 266 :param ports: The port configurations 267 :param muxs: The MUX configurations 268 :param can_busses: The CAN bus configurations 269 :param adc_config: The ADC configuration 270 :param dac_config: The DAC configuration 271 :param pot_config: The potentiometer configuration 272 """ 273 self.hil_id: int = hil_id 274 self._name: str = name 275 self._ports: dict[str, Port] = ports 276 self._muxs: dict[str, Mux] = muxs 277 self._can_busses: dict[str, CanBus] = can_busses 278 self._adc_config: AdcConfig = adc_config 279 self._dac_config: Optional[DacConfig] = dac_config 280 self._pot_config: Optional[PotConfig] = pot_config 281 282 # Please use set_serial() to set the serial! 283 self._ser: Optional[serial_helper.ThreadedSerial] = None 284 285 self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict( 286 map( 287 lambda c: (c.bus, can_helper.CanMessageManager()), 288 self._can_busses.values(), 289 ) 290 ) 291 292 @classmethod 293 def from_json(cls, hil_id: int, name: str, device_config_path: str): 294 """ 295 Create a TestDevice instance from a JSON configuration file. 296 297 :param hil_id: The HIL ID of the device 298 :param name: The name of the device 299 :param device_config_path: The path to the device configuration JSON file 300 """ 301 with open(device_config_path, "r") as device_config_path: 302 device_config = json.load(device_config_path) 303 304 ports = dict( 305 map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", [])) 306 ) 307 muxs = dict( 308 map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", [])) 309 ) 310 can_busses = dict( 311 map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", [])) 312 ) 313 314 match device_config: 315 case {"adc_config": adc_config_data}: 316 adc_config = AdcConfig(adc_config_data) 317 case _: 318 error_msg = f"ADC configuration missing for device {name}" 319 raise hil_errors.ConfigurationError(error_msg) 320 321 match device_config: 322 case {"dac_config": dac_config_data}: 323 dac_config = DacConfig(dac_config_data) 324 case _: 325 dac_config = None 326 327 match device_config: 328 case {"pot_config": pot_config_data}: 329 pot_config = PotConfig(pot_config_data) 330 case _: 331 pot_config = None 332 333 return cls( 334 hil_id, 335 name, 336 ports, 337 muxs, 338 can_busses, 339 adc_config, 340 dac_config, 341 pot_config, 342 ) 343 344 def set_serial(self, ser: serial_helper.ThreadedSerial) -> None: 345 """ 346 Set the serial connection for the TestDevice. 347 The caller is responsible for starting the serial connection's thread. 348 349 :param ser: The serial connection to set 350 """ 351 self._ser = ser 352 353 def close(self) -> None: 354 """ 355 Close the serial connection for the TestDevice. 356 """ 357 match self._ser: 358 case None: 359 error_msg = f"Cannot close TestDevice {self._name}: serial not set" 360 raise hil_errors.EngineError(error_msg) 361 case ser: 362 ser.stop() 363 364 # Command handling ----------------------------------------------------------------# 365 def _select_mux(self, mux_select: MuxSelect) -> None: 366 """ 367 Select a MUX (Multiplexer) line. 368 369 :param mux_select: The MUX selection information 370 """ 371 for i, p in enumerate(mux_select.mux.select_ports): 372 select_bit = True if (mux_select.select & (1 << i)) else False 373 self._set_do(p, select_bit) 374 375 def _set_do(self, pin: int, value: bool) -> None: 376 """ 377 Set a digital output (DO) pin. 378 379 :param pin: The pin number to set 380 :param value: The value to set the pin to (low = False, high = True) 381 """ 382 match self._ser: 383 case None: 384 error_msg = f"Cannot set DO on TestDevice {self._name}: serial not set" 385 raise hil_errors.EngineError(error_msg) 386 case ser: 387 commands.write_gpio(ser, pin, value) 388 389 def _hiZ_do(self, pin: int) -> None: 390 """ 391 Set a digital output (DO) pin to high impedance (HiZ). 392 393 :param pin: The pin number to set 394 """ 395 match self._ser: 396 case None: 397 error_msg = ( 398 f"Cannot set HiZ DO on TestDevice {self._name}: serial not set" 399 ) 400 raise hil_errors.EngineError(error_msg) 401 case ser: 402 commands.hiZ_gpio(ser, pin) 403 404 def _get_di(self, pin: int) -> bool: 405 """ 406 Get the digital input (DI) state of a pin. 407 408 :param pin: The pin number to read 409 :return: The state of the pin (True for high, False for low) 410 """ 411 match self._ser: 412 case None: 413 error_msg = f"Cannot get DI on TestDevice {self._name}: serial not set" 414 raise hil_errors.EngineError(error_msg) 415 case ser: 416 return commands.read_gpio(ser, pin) 417 418 def _set_ao(self, pin: int, value: float) -> None: 419 """ 420 Set an analog output (AO) pin after converting the volts value to raw. 421 422 :param pin: The pin/offset number to set 423 :param value: The voltage value to set the pin to 424 """ 425 match (self._ser, self._dac_config): 426 case (ser, dac_config) if ser is not None and dac_config is not None: 427 raw_value = dac_config.v_to_raw(value) 428 commands.write_dac(ser, pin, raw_value) 429 case _: 430 error_msg = f"Cannot set AO on TestDevice {self._name}: serial or DAC config not set" 431 raise hil_errors.EngineError(error_msg) 432 433 def _hiZ_ao(self, pin: int) -> None: 434 """ 435 Set an analog output (AO) pin to high impedance (HiZ). 436 437 :param pin: The pin/offset number to set 438 """ 439 match self._ser: 440 case None: 441 error_msg = ( 442 f"Cannot set HiZ AO on TestDevice {self._name}: serial not set" 443 ) 444 raise hil_errors.EngineError(error_msg) 445 case ser: 446 commands.hiZ_dac(ser, pin) 447 448 def _get_ai(self, pin: int, mode: str) -> float: 449 """ 450 Get an analog input (AI) reading from a pin and convert the reading to volts. 451 452 :param pin: The pin number to read 453 :param mode: The mode to use for the reading (AI5, AI24, or AI) 454 :return: The voltage value read from the pin 455 """ 456 match self._ser: 457 case None: 458 error_msg = f"Cannot get AI on TestDevice {self._name}: serial not set" 459 raise hil_errors.EngineError(error_msg) 460 case ser: 461 raw_value = commands.read_adc(ser, pin) 462 463 if mode == "AI5": 464 return self._adc_config.raw_to_5v(raw_value) 465 elif mode == "AI24": 466 return self._adc_config.raw_to_24v(raw_value) 467 elif mode == "AI": 468 return self._adc_config.raw_to_v(raw_value) 469 else: 470 raise ValueError(f"Unsupported AI mode: {mode}") 471 472 def _set_pot(self, pin: int, value: float) -> None: 473 """ 474 Set a potentiometer (POT) pin after converting the ohms value to raw. 475 476 :param pin: The pin/offset to set 477 :param value: The resistance value to set the pin to (in ohms) 478 """ 479 match (self._ser, self._pot_config): 480 case (ser, pot_config) if ser is not None and pot_config is not None: 481 raw_value = pot_config.ohms_to_raw(value) 482 commands.write_pot(ser, pin, raw_value) 483 case _: 484 error_msg = f"Cannot set POT on TestDevice {self._name}: serial not set" 485 raise hil_errors.EngineError(error_msg) 486 487 def _update_can_messages(self, bus: int, can_dbc: cantools_db.Database) -> None: 488 """ 489 Update the CAN message store by decoding the saved parsed can messages from the Serial. 490 491 :param bus: The CAN bus to update 492 :param can_dbc: The CAN database to use for decoding 493 """ 494 match self._ser: 495 case None: 496 error_msg = ( 497 f"Cannot update CAN messages on TestDevice {self._name}: " 498 "serial not set" 499 ) 500 raise hil_errors.EngineError(error_msg) 501 case ser: 502 self.device_can_busses[bus].add_multiple( 503 commands.parse_can_messages(ser, bus, can_dbc) 504 ) 505 506 def _send_can( 507 self, bus: int, signal: str | int, data: dict, can_dbc: cantools_db.Database 508 ) -> None: 509 """ 510 Send a CAN message on the specified bus. 511 512 :param bus: The CAN bus to send the message on 513 :param signal: The CAN signal to send 514 :param data: The data to include in the CAN message 515 :param can_dbc: The CAN database to use for encoding 516 """ 517 raw_data = list(can_dbc.encode_message(signal, data)) 518 if isinstance(signal, int): 519 msg_id = can_dbc.get_message_by_frame_id(signal).frame_id 520 else: 521 msg_id = can_dbc.get_message_by_name(signal).frame_id 522 523 match self._ser: 524 case None: 525 error_msg = ( 526 f"Cannot send CAN message on TestDevice {self._name}: " 527 "serial not set" 528 ) 529 raise hil_errors.EngineError(error_msg) 530 case ser: 531 commands.send_can(ser, bus, msg_id, raw_data) 532 533 # Action --------------------------------------------------------------------------# 534 def do_action(self, action_type: action.ActionType, port: str) -> Any: 535 """ 536 Perform a HIL action on a specific port. 537 538 :param action_type: The type of action to perform (+ includes all needed info) 539 :param port: The HIL port to perform the action on 540 :return: depends on the action type 541 """ 542 543 maybe_port = self._ports.get(port, None) 544 maybe_mux_select = next( 545 ( 546 val 547 for m in self._muxs.values() 548 if (val := m.select_from_name(port)) is not None 549 ), 550 None, 551 ) 552 maybe_can_bus = self._can_busses.get(port, None) 553 554 match (action_type, maybe_port, maybe_mux_select, maybe_can_bus): 555 # Set DO + direct port 556 case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO": 557 self._set_do(mp.port, value) 558 # Set DO + mux select 559 case (action.SetDo(value), _, mms, _) if ( 560 mms is not None and mms.mux.mode == "DO" 561 ): 562 self._select_mux(mms) 563 self._set_do(mms.mux.port, value) 564 # HiZ DO + direct port 565 case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO": 566 self._hiZ_do(mp.port) 567 # HiZ DO + mux select 568 case (action.HiZDo(), _, mms, _) if ( 569 mms is not None and mms.mux.mode == "DO" 570 ): 571 self._select_mux(mms) 572 self._hiZ_do(mms.mux.port) 573 # Get DI + direct port 574 case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI": 575 return self._get_di(mp.port) 576 # Get DI + mux select 577 case (action.GetDi(), _, mms, _) if ( 578 mms is not None and mms.mux.mode == "DI" 579 ): 580 self._select_mux(mms) 581 return self._get_di(mms.mux.port) 582 # Set AO + direct port 583 case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO": 584 self._set_ao(mp.port, value) 585 # HiZ AO + direct port 586 case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO": 587 self._hiZ_ao(mp.port) 588 # Get AI + direct port 589 case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith( 590 "AI" 591 ): 592 return self._get_ai(mp.port, mp.mode) 593 # Get AI + mux select 594 case ( 595 action.GetAi(), 596 _, 597 mms, 598 _, 599 ) if mms is not None and mms.mux.mode.startswith("AI"): 600 self._select_mux(mms) 601 return self._get_ai(mms.mux.port, mms.mux.mode) 602 # Set Pot + direct port 603 case (action.SetPot(value), mp, _, _) if ( 604 mp is not None and mp.mode == "POT" 605 ): 606 self._set_pot(mp.port, value) 607 # Send CAN msg + can bus name 608 case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None: 609 can_dbc = mcb.find_dbc(can_dbcs) 610 self._update_can_messages(mcb.bus, can_dbc) 611 self._send_can(mcb.bus, signal, data, can_dbc) 612 # Get last CAN msg + can bus name 613 case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 614 can_dbc = mcb.find_dbc(can_dbcs) 615 self._update_can_messages(mcb.bus, can_dbc) 616 return self.device_can_busses[mcb.bus].get_last(signal) 617 # Get all CAN msgs + can bus name 618 case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 619 can_dbc = mcb.find_dbc(can_dbcs) 620 self._update_can_messages(mcb.bus, can_dbc) 621 return self.device_can_busses[mcb.bus].get_all(signal) 622 # Clear CAN msgs + can bus name 623 case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 624 can_dbc = mcb.find_dbc(can_dbcs) 625 self._update_can_messages(mcb.bus, can_dbc) 626 self.device_can_busses[mcb.bus].clear(signal) 627 # Unsupported action 628 case _: 629 error_msg = ( 630 f"Action {type(action)} not supported for " 631 f"port {port} on device {self._name}" 632 ) 633 raise hil_errors.EngineError(error_msg)
252 def __init__( 253 self, 254 hil_id: int, 255 name: str, 256 ports: dict[str, Port], 257 muxs: dict[str, Mux], 258 can_busses: dict[str, CanBus], 259 adc_config: AdcConfig, 260 dac_config: Optional[DacConfig], 261 pot_config: Optional[PotConfig], 262 ): 263 """ 264 :param hil_id: The HIL ID of the device 265 :param name: The name of the device 266 :param ports: The port configurations 267 :param muxs: The MUX configurations 268 :param can_busses: The CAN bus configurations 269 :param adc_config: The ADC configuration 270 :param dac_config: The DAC configuration 271 :param pot_config: The potentiometer configuration 272 """ 273 self.hil_id: int = hil_id 274 self._name: str = name 275 self._ports: dict[str, Port] = ports 276 self._muxs: dict[str, Mux] = muxs 277 self._can_busses: dict[str, CanBus] = can_busses 278 self._adc_config: AdcConfig = adc_config 279 self._dac_config: Optional[DacConfig] = dac_config 280 self._pot_config: Optional[PotConfig] = pot_config 281 282 # Please use set_serial() to set the serial! 283 self._ser: Optional[serial_helper.ThreadedSerial] = None 284 285 self.device_can_busses: dict[int, can_helper.CanMessageManager] = dict( 286 map( 287 lambda c: (c.bus, can_helper.CanMessageManager()), 288 self._can_busses.values(), 289 ) 290 )
Parameters
- hil_id: The HIL ID of the device
- name: The name of the device
- ports: The port configurations
- muxs: The MUX configurations
- can_busses: The CAN bus configurations
- adc_config: The ADC configuration
- dac_config: The DAC configuration
- pot_config: The potentiometer configuration
292 @classmethod 293 def from_json(cls, hil_id: int, name: str, device_config_path: str): 294 """ 295 Create a TestDevice instance from a JSON configuration file. 296 297 :param hil_id: The HIL ID of the device 298 :param name: The name of the device 299 :param device_config_path: The path to the device configuration JSON file 300 """ 301 with open(device_config_path, "r") as device_config_path: 302 device_config = json.load(device_config_path) 303 304 ports = dict( 305 map(lambda p: (p.get("name"), Port(p)), device_config.get("ports", [])) 306 ) 307 muxs = dict( 308 map(lambda m: (m.get("name"), Mux(m)), device_config.get("muxs", [])) 309 ) 310 can_busses = dict( 311 map(lambda c: (c.get("name"), CanBus(c)), device_config.get("can", [])) 312 ) 313 314 match device_config: 315 case {"adc_config": adc_config_data}: 316 adc_config = AdcConfig(adc_config_data) 317 case _: 318 error_msg = f"ADC configuration missing for device {name}" 319 raise hil_errors.ConfigurationError(error_msg) 320 321 match device_config: 322 case {"dac_config": dac_config_data}: 323 dac_config = DacConfig(dac_config_data) 324 case _: 325 dac_config = None 326 327 match device_config: 328 case {"pot_config": pot_config_data}: 329 pot_config = PotConfig(pot_config_data) 330 case _: 331 pot_config = None 332 333 return cls( 334 hil_id, 335 name, 336 ports, 337 muxs, 338 can_busses, 339 adc_config, 340 dac_config, 341 pot_config, 342 )
Create a TestDevice instance from a JSON configuration file.
Parameters
- hil_id: The HIL ID of the device
- name: The name of the device
- device_config_path: The path to the device configuration JSON file
344 def set_serial(self, ser: serial_helper.ThreadedSerial) -> None: 345 """ 346 Set the serial connection for the TestDevice. 347 The caller is responsible for starting the serial connection's thread. 348 349 :param ser: The serial connection to set 350 """ 351 self._ser = ser
Set the serial connection for the TestDevice. The caller is responsible for starting the serial connection's thread.
Parameters
- ser: The serial connection to set
353 def close(self) -> None: 354 """ 355 Close the serial connection for the TestDevice. 356 """ 357 match self._ser: 358 case None: 359 error_msg = f"Cannot close TestDevice {self._name}: serial not set" 360 raise hil_errors.EngineError(error_msg) 361 case ser: 362 ser.stop()
Close the serial connection for the TestDevice.
534 def do_action(self, action_type: action.ActionType, port: str) -> Any: 535 """ 536 Perform a HIL action on a specific port. 537 538 :param action_type: The type of action to perform (+ includes all needed info) 539 :param port: The HIL port to perform the action on 540 :return: depends on the action type 541 """ 542 543 maybe_port = self._ports.get(port, None) 544 maybe_mux_select = next( 545 ( 546 val 547 for m in self._muxs.values() 548 if (val := m.select_from_name(port)) is not None 549 ), 550 None, 551 ) 552 maybe_can_bus = self._can_busses.get(port, None) 553 554 match (action_type, maybe_port, maybe_mux_select, maybe_can_bus): 555 # Set DO + direct port 556 case (action.SetDo(value), mp, _, _) if mp is not None and mp.mode == "DO": 557 self._set_do(mp.port, value) 558 # Set DO + mux select 559 case (action.SetDo(value), _, mms, _) if ( 560 mms is not None and mms.mux.mode == "DO" 561 ): 562 self._select_mux(mms) 563 self._set_do(mms.mux.port, value) 564 # HiZ DO + direct port 565 case (action.HiZDo(), mp, _, _) if mp is not None and mp.mode == "DO": 566 self._hiZ_do(mp.port) 567 # HiZ DO + mux select 568 case (action.HiZDo(), _, mms, _) if ( 569 mms is not None and mms.mux.mode == "DO" 570 ): 571 self._select_mux(mms) 572 self._hiZ_do(mms.mux.port) 573 # Get DI + direct port 574 case (action.GetDi(), mp, _, _) if mp is not None and mp.mode == "DI": 575 return self._get_di(mp.port) 576 # Get DI + mux select 577 case (action.GetDi(), _, mms, _) if ( 578 mms is not None and mms.mux.mode == "DI" 579 ): 580 self._select_mux(mms) 581 return self._get_di(mms.mux.port) 582 # Set AO + direct port 583 case (action.SetAo(value), mp, _, _) if mp is not None and mp.mode == "AO": 584 self._set_ao(mp.port, value) 585 # HiZ AO + direct port 586 case (action.HiZAo(), mp, _, _) if mp is not None and mp.mode == "AO": 587 self._hiZ_ao(mp.port) 588 # Get AI + direct port 589 case (action.GetAi(), mp, _, _) if mp is not None and mp.mode.startswith( 590 "AI" 591 ): 592 return self._get_ai(mp.port, mp.mode) 593 # Get AI + mux select 594 case ( 595 action.GetAi(), 596 _, 597 mms, 598 _, 599 ) if mms is not None and mms.mux.mode.startswith("AI"): 600 self._select_mux(mms) 601 return self._get_ai(mms.mux.port, mms.mux.mode) 602 # Set Pot + direct port 603 case (action.SetPot(value), mp, _, _) if ( 604 mp is not None and mp.mode == "POT" 605 ): 606 self._set_pot(mp.port, value) 607 # Send CAN msg + can bus name 608 case (action.SendCan(signal, data, can_dbcs), _, _, mcb) if mcb is not None: 609 can_dbc = mcb.find_dbc(can_dbcs) 610 self._update_can_messages(mcb.bus, can_dbc) 611 self._send_can(mcb.bus, signal, data, can_dbc) 612 # Get last CAN msg + can bus name 613 case (action.GetLastCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 614 can_dbc = mcb.find_dbc(can_dbcs) 615 self._update_can_messages(mcb.bus, can_dbc) 616 return self.device_can_busses[mcb.bus].get_last(signal) 617 # Get all CAN msgs + can bus name 618 case (action.GetAllCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 619 can_dbc = mcb.find_dbc(can_dbcs) 620 self._update_can_messages(mcb.bus, can_dbc) 621 return self.device_can_busses[mcb.bus].get_all(signal) 622 # Clear CAN msgs + can bus name 623 case (action.ClearCan(signal, can_dbcs), _, _, mcb) if mcb is not None: 624 can_dbc = mcb.find_dbc(can_dbcs) 625 self._update_can_messages(mcb.bus, can_dbc) 626 self.device_can_busses[mcb.bus].clear(signal) 627 # Unsupported action 628 case _: 629 error_msg = ( 630 f"Action {type(action)} not supported for " 631 f"port {port} on device {self._name}" 632 ) 633 raise hil_errors.EngineError(error_msg)
Perform a HIL action on a specific port.
Parameters
- action_type: The type of action to perform (+ includes all needed info)
- port: The HIL port to perform the action on
Returns
depends on the action type
637class TestDeviceManager: 638 """ 639 Manages test devices for HIL (Hardware-in-the-Loop) simulation. 640 """ 641 642 def __init__(self, test_devices: dict[str, TestDevice]): 643 """ 644 :param test_devices: A dictionary of test devices managed by this manager. 645 key = device name, value = TestDevice instance 646 """ 647 self._test_devices: dict[str, TestDevice] = test_devices 648 649 @classmethod 650 def from_json( 651 cls, test_config_path: str, device_config_fpath: str 652 ) -> "TestDeviceManager": 653 """ 654 Create a TestDeviceManager instance from JSON configuration files. 655 Is responsible for starting all of the ThreadedSerial instances. 656 657 :param test_config_path: The path to the test configuration JSON file. 658 :param device_config_fpath: The file path to the directory containing device 659 configuration files. 660 :return: A TestDeviceManager instance. 661 """ 662 663 with open(test_config_path, "r") as test_config_file: 664 test_config = json.load(test_config_file) 665 666 hil_ids = [] 667 stop_events = {} 668 test_devices = {} 669 match test_config: 670 case {"hil_devices": hil_devices}: 671 for device in hil_devices: 672 match device: 673 case { 674 "id": hil_id, 675 "name": name, 676 "config": config_file_name, 677 } if (not hil_id in hil_ids): 678 hil_ids.append(hil_id) 679 stop_events[hil_id] = threading.Event() 680 test_devices[name] = TestDevice.from_json( 681 hil_id, 682 name, 683 os.path.join(device_config_fpath, config_file_name), 684 ) 685 case {"id": hil_id}: 686 error_msg = f"Duplicate HIL device ID found: {hil_id}" 687 raise hil_errors.ConfigurationError(error_msg) 688 case _: 689 error_msg = f"Invalid HIL device configuration: {device}" 690 raise hil_errors.ConfigurationError(error_msg) 691 case _: 692 error_msg = "Invalid test configuration: missing 'hil_devices' key" 693 raise hil_errors.ConfigurationError(error_msg) 694 695 hil_devices = serial_helper.discover_devices(hil_ids) 696 697 sers = { 698 hil_id: serial_helper.ThreadedSerial( 699 hil_devices[hil_id][0], 700 hil_devices[hil_id][1], 701 stop_events[hil_id], 702 ) 703 for hil_id in hil_ids 704 } 705 for test_device in test_devices.values(): 706 ser = sers[test_device.hil_id] 707 t = threading.Thread(target=ser.run) 708 t.start() 709 test_device.set_serial(ser) 710 711 return cls(test_devices) 712 713 def maybe_hil_con_from_net( 714 self, board: str, net: str 715 ) -> Optional[dut_cons.HilDutCon]: 716 """ 717 Check to see if a board is a HIL device. 718 719 :param board: The name of the board to check. 720 :param net: The network to use for the connection. 721 :return: A HilDutCon instance if the board is a HIL device, None otherwise. 722 """ 723 if board in self._test_devices: 724 return dut_cons.HilDutCon(board, net) 725 else: 726 return None 727 728 def do_action( 729 self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon 730 ) -> Any: 731 """ 732 Perform an action on a HIL device. 733 734 :param action_type: The type of action to perform. 735 :param hil_dut_con: The HIL DUT connection information. 736 :return: The result of the action (if any). 737 """ 738 if hil_dut_con.device in self._test_devices: 739 return self._test_devices[hil_dut_con.device].do_action( 740 action_type, hil_dut_con.port 741 ) 742 else: 743 error_msg = f"Device {hil_dut_con.device} not found" 744 raise hil_errors.ConnectionError(error_msg) 745 746 def close(self) -> None: 747 """ 748 Close all HIL devices. 749 """ 750 for device in self._test_devices.values(): 751 device.close()
Manages test devices for HIL (Hardware-in-the-Loop) simulation.
642 def __init__(self, test_devices: dict[str, TestDevice]): 643 """ 644 :param test_devices: A dictionary of test devices managed by this manager. 645 key = device name, value = TestDevice instance 646 """ 647 self._test_devices: dict[str, TestDevice] = test_devices
Parameters
- test_devices: A dictionary of test devices managed by this manager. key = device name, value = TestDevice instance
649 @classmethod 650 def from_json( 651 cls, test_config_path: str, device_config_fpath: str 652 ) -> "TestDeviceManager": 653 """ 654 Create a TestDeviceManager instance from JSON configuration files. 655 Is responsible for starting all of the ThreadedSerial instances. 656 657 :param test_config_path: The path to the test configuration JSON file. 658 :param device_config_fpath: The file path to the directory containing device 659 configuration files. 660 :return: A TestDeviceManager instance. 661 """ 662 663 with open(test_config_path, "r") as test_config_file: 664 test_config = json.load(test_config_file) 665 666 hil_ids = [] 667 stop_events = {} 668 test_devices = {} 669 match test_config: 670 case {"hil_devices": hil_devices}: 671 for device in hil_devices: 672 match device: 673 case { 674 "id": hil_id, 675 "name": name, 676 "config": config_file_name, 677 } if (not hil_id in hil_ids): 678 hil_ids.append(hil_id) 679 stop_events[hil_id] = threading.Event() 680 test_devices[name] = TestDevice.from_json( 681 hil_id, 682 name, 683 os.path.join(device_config_fpath, config_file_name), 684 ) 685 case {"id": hil_id}: 686 error_msg = f"Duplicate HIL device ID found: {hil_id}" 687 raise hil_errors.ConfigurationError(error_msg) 688 case _: 689 error_msg = f"Invalid HIL device configuration: {device}" 690 raise hil_errors.ConfigurationError(error_msg) 691 case _: 692 error_msg = "Invalid test configuration: missing 'hil_devices' key" 693 raise hil_errors.ConfigurationError(error_msg) 694 695 hil_devices = serial_helper.discover_devices(hil_ids) 696 697 sers = { 698 hil_id: serial_helper.ThreadedSerial( 699 hil_devices[hil_id][0], 700 hil_devices[hil_id][1], 701 stop_events[hil_id], 702 ) 703 for hil_id in hil_ids 704 } 705 for test_device in test_devices.values(): 706 ser = sers[test_device.hil_id] 707 t = threading.Thread(target=ser.run) 708 t.start() 709 test_device.set_serial(ser) 710 711 return cls(test_devices)
Create a TestDeviceManager instance from JSON configuration files. Is responsible for starting all of the ThreadedSerial instances.
Parameters
- test_config_path: The path to the test configuration JSON file.
- device_config_fpath: The file path to the directory containing device configuration files.
Returns
A TestDeviceManager instance.
713 def maybe_hil_con_from_net( 714 self, board: str, net: str 715 ) -> Optional[dut_cons.HilDutCon]: 716 """ 717 Check to see if a board is a HIL device. 718 719 :param board: The name of the board to check. 720 :param net: The network to use for the connection. 721 :return: A HilDutCon instance if the board is a HIL device, None otherwise. 722 """ 723 if board in self._test_devices: 724 return dut_cons.HilDutCon(board, net) 725 else: 726 return None
Check to see if a board is a HIL device.
Parameters
- board: The name of the board to check.
- net: The network to use for the connection.
Returns
A HilDutCon instance if the board is a HIL device, None otherwise.
728 def do_action( 729 self, action_type: action.ActionType, hil_dut_con: dut_cons.HilDutCon 730 ) -> Any: 731 """ 732 Perform an action on a HIL device. 733 734 :param action_type: The type of action to perform. 735 :param hil_dut_con: The HIL DUT connection information. 736 :return: The result of the action (if any). 737 """ 738 if hil_dut_con.device in self._test_devices: 739 return self._test_devices[hil_dut_con.device].do_action( 740 action_type, hil_dut_con.port 741 ) 742 else: 743 error_msg = f"Device {hil_dut_con.device} not found" 744 raise hil_errors.ConnectionError(error_msg)
Perform an action on a HIL device.
Parameters
- action_type: The type of action to perform.
- hil_dut_con: The HIL DUT connection information.
Returns
The result of the action (if any).