hil2.can_helper
1from typing import Optional 2 3import os 4import logging 5 6import cantools.database as cantools_db 7 8 9# Helper functions --------------------------------------------------------------------# 10def load_can_dbcs(dbc_fpath: str) -> dict[str, cantools_db.Database]: 11 """ 12 Scans a folder for DBC files and loads them as CAN databases. 13 14 :param dbc_fpath: The path to the CAN DBC folder 15 :return: A dictionary of CAN databases, keyed by DBC file name 16 """ 17 dbs: dict[str, cantools_db.Database] = {} 18 19 if not dbc_fpath or not os.path.isdir(dbc_fpath): 20 logging.warning(f"Invalid DBC folder path: {dbc_fpath}") 21 return dbs 22 else: 23 for file in os.listdir(dbc_fpath): 24 if file.endswith(".dbc"): 25 logging.debug(f"Loading DBC file: {os.path.join(dbc_fpath, file)}") 26 db = cantools_db.load_file(os.path.join(dbc_fpath, file)) 27 if db.messages: 28 dbs[file] = db 29 else: 30 logging.error(f"No messages found in DBC file: {file}") 31 32 return dbs 33 34 35# CAN Message struct ------------------------------------------------------------------# 36class CanMessage: 37 """Represents a parsed/decoded CAN message""" 38 39 def __init__(self, signal: str | int, data: dict): 40 """ 41 :param signal: The signal name or message ID 42 :param data: The data contained in the CAN message 43 """ 44 45 self.signal: str | int = signal 46 self.data: dict = data 47 48 49# CAN Message Manager class -----------------------------------------------------------# 50class CanMessageManager: 51 """Manages a collection of CAN messages""" 52 53 def __init__(self): 54 self._messages: list[CanMessage] = [] 55 56 def add_multiple(self, messages: list[CanMessage]) -> None: 57 """ 58 :param messages: The list of CAN messages to add 59 """ 60 self._messages.extend(messages) 61 62 def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]: 63 """ 64 :param signal: The signal name or message ID to get. If None, the last message 65 will be returned (if any) regardless of the signal/id 66 :return: The last CAN message with the specified signal, or None if not found 67 """ 68 if signal is None: 69 return self._messages[-1] if self._messages else None 70 for msg in reversed(self._messages): 71 if msg.signal == signal: 72 return msg 73 return None 74 75 def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]: 76 """ 77 :param signal: The signal name or message ID to get. If None, all messages will 78 be returned (if any) regardless of the signal/id 79 :return: A list of all CAN messages with the specified signal (or all) 80 """ 81 return list( 82 filter(lambda msg: signal is None or msg.signal == signal, self._messages) 83 ) 84 85 def clear(self, signal: Optional[str | int] = None) -> None: 86 """ 87 :param signal: The signal name or message ID to clear. If None, all messages 88 will be cleared (if any) regardless of the signal/id 89 """ 90 if signal is None: 91 self._messages.clear() 92 else: 93 self._messages = list( 94 filter(lambda msg: msg.signal != signal, self._messages) 95 )
def
load_can_dbcs(dbc_fpath: str) -> dict[str, cantools.database.can.database.Database]:
11def load_can_dbcs(dbc_fpath: str) -> dict[str, cantools_db.Database]: 12 """ 13 Scans a folder for DBC files and loads them as CAN databases. 14 15 :param dbc_fpath: The path to the CAN DBC folder 16 :return: A dictionary of CAN databases, keyed by DBC file name 17 """ 18 dbs: dict[str, cantools_db.Database] = {} 19 20 if not dbc_fpath or not os.path.isdir(dbc_fpath): 21 logging.warning(f"Invalid DBC folder path: {dbc_fpath}") 22 return dbs 23 else: 24 for file in os.listdir(dbc_fpath): 25 if file.endswith(".dbc"): 26 logging.debug(f"Loading DBC file: {os.path.join(dbc_fpath, file)}") 27 db = cantools_db.load_file(os.path.join(dbc_fpath, file)) 28 if db.messages: 29 dbs[file] = db 30 else: 31 logging.error(f"No messages found in DBC file: {file}") 32 33 return dbs
Scans a folder for DBC files and loads them as CAN databases.
Parameters
- dbc_fpath: The path to the CAN DBC folder
Returns
A dictionary of CAN databases, keyed by DBC file name
class
CanMessage:
37class CanMessage: 38 """Represents a parsed/decoded CAN message""" 39 40 def __init__(self, signal: str | int, data: dict): 41 """ 42 :param signal: The signal name or message ID 43 :param data: The data contained in the CAN message 44 """ 45 46 self.signal: str | int = signal 47 self.data: dict = data
Represents a parsed/decoded CAN message
CanMessage(signal: str | int, data: dict)
40 def __init__(self, signal: str | int, data: dict): 41 """ 42 :param signal: The signal name or message ID 43 :param data: The data contained in the CAN message 44 """ 45 46 self.signal: str | int = signal 47 self.data: dict = data
Parameters
- signal: The signal name or message ID
- data: The data contained in the CAN message
class
CanMessageManager:
51class CanMessageManager: 52 """Manages a collection of CAN messages""" 53 54 def __init__(self): 55 self._messages: list[CanMessage] = [] 56 57 def add_multiple(self, messages: list[CanMessage]) -> None: 58 """ 59 :param messages: The list of CAN messages to add 60 """ 61 self._messages.extend(messages) 62 63 def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]: 64 """ 65 :param signal: The signal name or message ID to get. If None, the last message 66 will be returned (if any) regardless of the signal/id 67 :return: The last CAN message with the specified signal, or None if not found 68 """ 69 if signal is None: 70 return self._messages[-1] if self._messages else None 71 for msg in reversed(self._messages): 72 if msg.signal == signal: 73 return msg 74 return None 75 76 def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]: 77 """ 78 :param signal: The signal name or message ID to get. If None, all messages will 79 be returned (if any) regardless of the signal/id 80 :return: A list of all CAN messages with the specified signal (or all) 81 """ 82 return list( 83 filter(lambda msg: signal is None or msg.signal == signal, self._messages) 84 ) 85 86 def clear(self, signal: Optional[str | int] = None) -> None: 87 """ 88 :param signal: The signal name or message ID to clear. If None, all messages 89 will be cleared (if any) regardless of the signal/id 90 """ 91 if signal is None: 92 self._messages.clear() 93 else: 94 self._messages = list( 95 filter(lambda msg: msg.signal != signal, self._messages) 96 )
Manages a collection of CAN messages
57 def add_multiple(self, messages: list[CanMessage]) -> None: 58 """ 59 :param messages: The list of CAN messages to add 60 """ 61 self._messages.extend(messages)
Parameters
- messages: The list of CAN messages to add
63 def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]: 64 """ 65 :param signal: The signal name or message ID to get. If None, the last message 66 will be returned (if any) regardless of the signal/id 67 :return: The last CAN message with the specified signal, or None if not found 68 """ 69 if signal is None: 70 return self._messages[-1] if self._messages else None 71 for msg in reversed(self._messages): 72 if msg.signal == signal: 73 return msg 74 return None
Parameters
- signal: The signal name or message ID to get. If None, the last message will be returned (if any) regardless of the signal/id
Returns
The last CAN message with the specified signal, or None if not found
76 def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]: 77 """ 78 :param signal: The signal name or message ID to get. If None, all messages will 79 be returned (if any) regardless of the signal/id 80 :return: A list of all CAN messages with the specified signal (or all) 81 """ 82 return list( 83 filter(lambda msg: signal is None or msg.signal == signal, self._messages) 84 )
Parameters
- signal: The signal name or message ID to get. If None, all messages will be returned (if any) regardless of the signal/id
Returns
A list of all CAN messages with the specified signal (or all)
def
clear(self, signal: Union[str, int, NoneType] = None) -> None:
86 def clear(self, signal: Optional[str | int] = None) -> None: 87 """ 88 :param signal: The signal name or message ID to clear. If None, all messages 89 will be cleared (if any) regardless of the signal/id 90 """ 91 if signal is None: 92 self._messages.clear() 93 else: 94 self._messages = list( 95 filter(lambda msg: msg.signal != signal, self._messages) 96 )
Parameters
- signal: The signal name or message ID to clear. If None, all messages will be cleared (if any) regardless of the signal/id