hil2.can_helper

 1from typing import Optional
 2
 3import os
 4import logging
 5
 6import cantools.database as cantools_db
 7
 8
 9# Helper functions --------------------------------------------------------------------#
10def load_can_dbcs(dbc_fpath: str) -> dict[str, cantools_db.Database]:
11    """
12    Scans a folder for DBC files and loads them as CAN databases.
13
14    :param dbc_fpath: The path to the CAN DBC folder
15    :return: A dictionary of CAN databases, keyed by DBC file name
16    """
17    dbs: dict[str, cantools_db.Database] = {}
18
19    if not dbc_fpath or not os.path.isdir(dbc_fpath):
20        logging.warning(f"Invalid DBC folder path: {dbc_fpath}")
21        return dbs
22    else:
23        for file in os.listdir(dbc_fpath):
24            if file.endswith(".dbc"):
25                logging.debug(f"Loading DBC file: {os.path.join(dbc_fpath, file)}")
26                db = cantools_db.load_file(os.path.join(dbc_fpath, file))
27                if db.messages:
28                    dbs[file] = db
29                else:
30                    logging.error(f"No messages found in DBC file: {file}")
31
32    return dbs
33
34
35# CAN Message struct ------------------------------------------------------------------#
36class CanMessage:
37    """Represents a parsed/decoded CAN message"""
38
39    def __init__(self, signal: str | int, data: dict):
40        """
41        :param signal: The signal name or message ID
42        :param data: The data contained in the CAN message
43        """
44
45        self.signal: str | int = signal
46        self.data: dict = data
47
48
49# CAN Message Manager class -----------------------------------------------------------#
50class CanMessageManager:
51    """Manages a collection of CAN messages"""
52
53    def __init__(self):
54        self._messages: list[CanMessage] = []
55
56    def add_multiple(self, messages: list[CanMessage]) -> None:
57        """
58        :param messages: The list of CAN messages to add
59        """
60        self._messages.extend(messages)
61
62    def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]:
63        """
64        :param signal: The signal name or message ID to get. If None, the last message
65                       will be returned (if any) regardless of the signal/id
66        :return: The last CAN message with the specified signal, or None if not found
67        """
68        if signal is None:
69            return self._messages[-1] if self._messages else None
70        for msg in reversed(self._messages):
71            if msg.signal == signal:
72                return msg
73        return None
74
75    def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]:
76        """
77        :param signal: The signal name or message ID to get. If None, all messages will
78                       be returned (if any) regardless of the signal/id
79        :return: A list of all CAN messages with the specified signal (or all)
80        """
81        return list(
82            filter(lambda msg: signal is None or msg.signal == signal, self._messages)
83        )
84
85    def clear(self, signal: Optional[str | int] = None) -> None:
86        """
87        :param signal: The signal name or message ID to clear. If None, all messages
88                       will be cleared (if any) regardless of the signal/id
89        """
90        if signal is None:
91            self._messages.clear()
92        else:
93            self._messages = list(
94                filter(lambda msg: msg.signal != signal, self._messages)
95            )
def load_can_dbcs(dbc_fpath: str) -> dict[str, cantools.database.can.database.Database]:
11def load_can_dbcs(dbc_fpath: str) -> dict[str, cantools_db.Database]:
12    """
13    Scans a folder for DBC files and loads them as CAN databases.
14
15    :param dbc_fpath: The path to the CAN DBC folder
16    :return: A dictionary of CAN databases, keyed by DBC file name
17    """
18    dbs: dict[str, cantools_db.Database] = {}
19
20    if not dbc_fpath or not os.path.isdir(dbc_fpath):
21        logging.warning(f"Invalid DBC folder path: {dbc_fpath}")
22        return dbs
23    else:
24        for file in os.listdir(dbc_fpath):
25            if file.endswith(".dbc"):
26                logging.debug(f"Loading DBC file: {os.path.join(dbc_fpath, file)}")
27                db = cantools_db.load_file(os.path.join(dbc_fpath, file))
28                if db.messages:
29                    dbs[file] = db
30                else:
31                    logging.error(f"No messages found in DBC file: {file}")
32
33    return dbs

Scans a folder for DBC files and loads them as CAN databases.

Parameters
  • dbc_fpath: The path to the CAN DBC folder
Returns

A dictionary of CAN databases, keyed by DBC file name

class CanMessage:
37class CanMessage:
38    """Represents a parsed/decoded CAN message"""
39
40    def __init__(self, signal: str | int, data: dict):
41        """
42        :param signal: The signal name or message ID
43        :param data: The data contained in the CAN message
44        """
45
46        self.signal: str | int = signal
47        self.data: dict = data

Represents a parsed/decoded CAN message

CanMessage(signal: str | int, data: dict)
40    def __init__(self, signal: str | int, data: dict):
41        """
42        :param signal: The signal name or message ID
43        :param data: The data contained in the CAN message
44        """
45
46        self.signal: str | int = signal
47        self.data: dict = data
Parameters
  • signal: The signal name or message ID
  • data: The data contained in the CAN message
signal: str | int
data: dict
class CanMessageManager:
51class CanMessageManager:
52    """Manages a collection of CAN messages"""
53
54    def __init__(self):
55        self._messages: list[CanMessage] = []
56
57    def add_multiple(self, messages: list[CanMessage]) -> None:
58        """
59        :param messages: The list of CAN messages to add
60        """
61        self._messages.extend(messages)
62
63    def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]:
64        """
65        :param signal: The signal name or message ID to get. If None, the last message
66                       will be returned (if any) regardless of the signal/id
67        :return: The last CAN message with the specified signal, or None if not found
68        """
69        if signal is None:
70            return self._messages[-1] if self._messages else None
71        for msg in reversed(self._messages):
72            if msg.signal == signal:
73                return msg
74        return None
75
76    def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]:
77        """
78        :param signal: The signal name or message ID to get. If None, all messages will
79                       be returned (if any) regardless of the signal/id
80        :return: A list of all CAN messages with the specified signal (or all)
81        """
82        return list(
83            filter(lambda msg: signal is None or msg.signal == signal, self._messages)
84        )
85
86    def clear(self, signal: Optional[str | int] = None) -> None:
87        """
88        :param signal: The signal name or message ID to clear. If None, all messages
89                       will be cleared (if any) regardless of the signal/id
90        """
91        if signal is None:
92            self._messages.clear()
93        else:
94            self._messages = list(
95                filter(lambda msg: msg.signal != signal, self._messages)
96            )

Manages a collection of CAN messages

def add_multiple(self, messages: list[CanMessage]) -> None:
57    def add_multiple(self, messages: list[CanMessage]) -> None:
58        """
59        :param messages: The list of CAN messages to add
60        """
61        self._messages.extend(messages)
Parameters
  • messages: The list of CAN messages to add
def get_last( self, signal: Union[str, int, NoneType]) -> Optional[CanMessage]:
63    def get_last(self, signal: Optional[str | int]) -> Optional[CanMessage]:
64        """
65        :param signal: The signal name or message ID to get. If None, the last message
66                       will be returned (if any) regardless of the signal/id
67        :return: The last CAN message with the specified signal, or None if not found
68        """
69        if signal is None:
70            return self._messages[-1] if self._messages else None
71        for msg in reversed(self._messages):
72            if msg.signal == signal:
73                return msg
74        return None
Parameters
  • signal: The signal name or message ID to get. If None, the last message will be returned (if any) regardless of the signal/id
Returns

The last CAN message with the specified signal, or None if not found

def get_all( self, signal: Union[str, int, NoneType] = None) -> list[CanMessage]:
76    def get_all(self, signal: Optional[str | int] = None) -> list[CanMessage]:
77        """
78        :param signal: The signal name or message ID to get. If None, all messages will
79                       be returned (if any) regardless of the signal/id
80        :return: A list of all CAN messages with the specified signal (or all)
81        """
82        return list(
83            filter(lambda msg: signal is None or msg.signal == signal, self._messages)
84        )
Parameters
  • signal: The signal name or message ID to get. If None, all messages will be returned (if any) regardless of the signal/id
Returns

A list of all CAN messages with the specified signal (or all)

def clear(self, signal: Union[str, int, NoneType] = None) -> None:
86    def clear(self, signal: Optional[str | int] = None) -> None:
87        """
88        :param signal: The signal name or message ID to clear. If None, all messages
89                       will be cleared (if any) regardless of the signal/id
90        """
91        if signal is None:
92            self._messages.clear()
93        else:
94            self._messages = list(
95                filter(lambda msg: msg.signal != signal, self._messages)
96            )
Parameters
  • signal: The signal name or message ID to clear. If None, all messages will be cleared (if any) regardless of the signal/id