v1.0.0 - 2025-03-13 EHS-Sentinel has been heavily modified to incorporate the control mechanism The read-in behavior of the modbus registers has been revised from chunks to single byte MessageProcessor now runs asynchronously MessageProducer added which takes over the writing communication with the WP Configuration of HASS entities has moved from hardcoded to NASA Repository NASA Repository has been fundamentally changed All FSV Values, NASA_POWER, VAR_IN_TEMP_WATER_LAW_TARGET_F, NASA_INDOOR_OPMODE are allowed for writing mode NASA_OUTDOOR_DEFROST_STEP DEFROST STEP 10 (b'0xa') added ENUM_IN_SG_READY_MODE_STATE ACTIVE (b'0x2') added New configuration point allowControl to allow control of the Samsung EHS heat pump (deactivated by default). [!CAUTION] This functionality requires that EHS-Sentinel actively communicates with the Samsung EHS, so EHS-Sentinel intervenes here in the Modbus data traffic between the components (it sends its own messages). The activation of this functionality is exclusively at your own risk. I assume no liability for any damage caused. new configuration points in logging controlMessage (default False) to print out the controlled mesagges invalidPacket (default False) prints out invalid messages (length not ok, x34 not at end...) Dashboard template has been split, ressources/dashboard_readonly_template.yaml is for readonly mode and the ressources/dashboard_controlmode_template.yaml for control mode
267 lines
10 KiB
Python
267 lines
10 KiB
Python
from enum import Enum
|
|
from NASAMessage import NASAMessage
|
|
from EHSExceptions import SkipInvalidPacketException
|
|
import binascii
|
|
import struct
|
|
from CustomLogger import logger
|
|
|
|
class AddressClassEnum(Enum):
|
|
"""
|
|
Enum class representing various address classes for NASA packets.
|
|
"""
|
|
|
|
Outdoor = 0x10
|
|
HTU = 0x11
|
|
Indoor = 0x20
|
|
ERV = 0x30
|
|
Diffuser = 0x35
|
|
MCU = 0x38
|
|
RMC = 0x40
|
|
WiredRemote = 0x50
|
|
PIM = 0x58
|
|
SIM = 0x59
|
|
Peak = 0x5A
|
|
PowerDivider = 0x5B
|
|
OnOffController = 0x60
|
|
WiFiKit = 0x62
|
|
CentralController = 0x65
|
|
DMS = 0x6A
|
|
JIGTester = 0x80
|
|
BroadcastSelfLayer = 0xB0
|
|
BroadcastControlLayer = 0xB1
|
|
BroadcastSetLayer = 0xB2
|
|
BroadcastCS = 0xB3
|
|
BroadcastControlAndSetLayer = 0xB3
|
|
BroadcastModuleLayer = 0xB4
|
|
BroadcastCSM = 0xB7
|
|
BroadcastLocalLayer = 0xB8
|
|
BroadcastCSML = 0xBF
|
|
Undefined = 0xFF
|
|
|
|
class PacketType(Enum):
|
|
"""
|
|
Enum class representing different types of packets in the EHS-Sentinel system.
|
|
"""
|
|
|
|
StandBy = 0
|
|
Normal = 1
|
|
Gathering = 2
|
|
Install = 3
|
|
Download = 4
|
|
|
|
class DataType(Enum):
|
|
"""
|
|
Enum representing different types of data operations.
|
|
"""
|
|
|
|
Undefined = 0
|
|
Read = 1
|
|
Write = 2
|
|
Request = 3
|
|
Notification = 4
|
|
Resposne = 5
|
|
Ack = 6
|
|
Nack = 7
|
|
|
|
class NASAPacket:
|
|
"""
|
|
A class to represent a NASA Packet.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._packet_raw: bytearray = None
|
|
self.packet_start: int = None
|
|
self.packet_size: int = None
|
|
self.packet_source_address_class: AddressClassEnum = None
|
|
self.packet_source_channel: int = None
|
|
self.packet_source_address: int = None
|
|
self.packet_dest_address_class: AddressClassEnum = None
|
|
self.packet_dest_channel: int = None
|
|
self.packet_dest_address: int = None
|
|
self.packet_information: int = None
|
|
self.packet_version: int = None
|
|
self.packet_retry_count: int = None
|
|
self.packet_type: PacketType = None
|
|
self.packet_data_type: DataType = None
|
|
self.packet_number: int = None
|
|
self.packet_capacity: int = None
|
|
self.packet_messages: list[NASAMessage] = None
|
|
self.packet_crc16: int = None
|
|
self.packet_end: int = None
|
|
|
|
def parse(self, packet: bytearray):
|
|
self._packet_raw = packet
|
|
if len(packet) < 14:
|
|
raise ValueError("Data too short to be a valid NASAPacket")
|
|
|
|
crc_checkusm=binascii.crc_hqx(bytearray(packet[3:-3]), 0)
|
|
|
|
self.packet_start = packet[0]
|
|
self.packet_size = ((packet[1] << 8) | packet[2])
|
|
|
|
if self.packet_size+2 != len(packet):
|
|
logger.info(f"length not correct {self.packet_size+2} -> {len(packet)}")
|
|
logger.info(f"{packet.hex()}")
|
|
logger.info(f"{hex(packet[self.packet_size+1])}")
|
|
|
|
try:
|
|
self.packet_source_address_class = AddressClassEnum(packet[3])
|
|
except ValueError as e:
|
|
raise SkipInvalidPacketException(f"Source Adress Class out of enum {packet[3]}")
|
|
self.packet_source_channel = packet[4]
|
|
self.packet_source_address = packet[5]
|
|
try:
|
|
self.packet_dest_address_class = AddressClassEnum(packet[6])
|
|
except ValueError as e:
|
|
raise SkipInvalidPacketException(f"Destination Adress Class out of enum {packet[6]}")
|
|
self.packet_dest_channel = packet[7]
|
|
self.packet_dest_address = packet[8]
|
|
self.packet_information = (int(packet[9]) & 128) >> 7 == 1
|
|
self.packet_version = (int(packet[9]) & 96) >> 5
|
|
self.packet_retry_count = (int(packet[9]) & 24) >> 3
|
|
self.packet_type = PacketType((int(packet[10]) & 240) >> 4)
|
|
self.packet_data_type = DataType(int(packet[10]) & 15)
|
|
self.packet_number = packet[11]
|
|
self.packet_capacity = packet[12]
|
|
self.packet_crc16 = ((packet[-3] << 8) | packet[-2]) # + 2
|
|
self.packet_end = packet[-1]
|
|
self.packet_messages = self._extract_messages(0, self.packet_capacity, packet[13:-3], [])
|
|
|
|
if crc_checkusm != self.packet_crc16:
|
|
raise SkipInvalidPacketException(f"Checksum for package could not be validated. Calculated: {crc_checkusm} in packet: {self.packet_crc16}: packet:{self}")
|
|
|
|
def _extract_messages(self, depth: int, capacity: int, msg_rest: bytearray, return_list: list):
|
|
if depth > capacity or len(msg_rest) <= 2:
|
|
return return_list
|
|
|
|
message_number = (msg_rest[0] << 8) | msg_rest[1]
|
|
message_type = (message_number & 1536) >> 9
|
|
|
|
if message_type == 0:
|
|
payload_size = 1
|
|
elif message_type == 1:
|
|
payload_size = 2
|
|
elif message_type == 2:
|
|
payload_size = 4
|
|
elif message_type == 3:
|
|
payload_size = len(msg_rest)
|
|
if capacity != 1:
|
|
raise SkipInvalidPacketException("Message with structure type must have capacity of 1.")
|
|
else:
|
|
raise ValueError(f"Mssage type unknown: {message_type}")
|
|
|
|
payload = msg_rest[2:2 + payload_size]
|
|
if len(payload) > 255:
|
|
raise ValueError(f"Payload for Submessage {hex(message_number)} too large at index {depth}: {len(payload)} bytes.")
|
|
|
|
return_list.append(NASAMessage(packet_message=message_number, packet_message_type=message_type, packet_payload=payload))
|
|
return self._extract_messages(depth+1, capacity, msg_rest[2 + payload_size:], return_list)
|
|
|
|
def __str__(self):
|
|
text = f"NASAPacket(\n"
|
|
text += f" start={self.packet_start} ({hex(self.packet_start)}),\n"
|
|
text += f" size={self.packet_size} ({hex(self.packet_size)}),\n"
|
|
text += f" source_address_class={self.packet_source_address_class} ({hex(self.packet_source_address_class.value)}),\n"
|
|
text += f" source_channel={self.packet_source_channel} ({hex(self.packet_source_channel)}),\n"
|
|
text += f" source_address={self.packet_source_address} ({hex(self.packet_source_address)}),\n"
|
|
text += f" dest_address_class={self.packet_dest_address_class} ({hex(self.packet_dest_address_class.value)}),\n"
|
|
text += f" dest_channel={self.packet_dest_channel} ({hex(self.packet_dest_channel)}),\n"
|
|
text += f" dest_address={self.packet_dest_address} ({hex(self.packet_dest_address)}),\n"
|
|
text += f" information={self.packet_information},\n"
|
|
text += f" version={self.packet_version} ({hex(self.packet_version)}),\n"
|
|
text += f" retry_count={self.packet_retry_count} ({hex(self.packet_retry_count)}),\n"
|
|
text += f" type={self.packet_type} ({hex(self.packet_type.value)}),\n"
|
|
text += f" data_type={self.packet_data_type} ({hex(self.packet_data_type.value)}),\n"
|
|
text += f" number={self.packet_number} ({hex(self.packet_number)}),\n"
|
|
text += f" capacity={self.packet_capacity} ({hex(self.packet_capacity)}),\n"
|
|
text += f" messages=[\n"
|
|
for msg in self.packet_messages:
|
|
lines = f"{msg}".splitlines()
|
|
text += f" {lines[0]}\n"
|
|
for line in lines[1:-1]:
|
|
text += f" {line}\n"
|
|
text += f" {lines[-1]}\n"
|
|
text += " ],\n"
|
|
text += f" crc16={self.packet_crc16} ({hex(self.packet_crc16)}),\n"
|
|
text += f" end={self.packet_end} ({hex(self.packet_end)})\n"
|
|
text += f")"
|
|
return text
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
# Setter methods
|
|
def set_packet_source_address_class(self, value: AddressClassEnum):
|
|
self.packet_source_address_class = value
|
|
|
|
def set_packet_source_channel(self, value: int):
|
|
self.packet_source_channel = value
|
|
|
|
def set_packet_source_address(self, value: int):
|
|
self.packet_source_address = value
|
|
|
|
def set_packet_dest_address_class(self, value: AddressClassEnum):
|
|
self.packet_dest_address_class = value
|
|
|
|
def set_packet_dest_channel(self, value: int):
|
|
self.packet_dest_channel = value
|
|
|
|
def set_packet_dest_address(self, value: int):
|
|
self.packet_dest_address = value
|
|
|
|
def set_packet_information(self, value: bool):
|
|
self.packet_information = value
|
|
|
|
def set_packet_version(self, value: int):
|
|
self.packet_version = value
|
|
|
|
def set_packet_retry_count(self, value: int):
|
|
self.packet_retry_count = value
|
|
|
|
def set_packet_type(self, value: PacketType):
|
|
self.packet_type = value
|
|
|
|
def set_packet_data_type(self, value: DataType):
|
|
self.packet_data_type = value
|
|
|
|
def set_packet_number(self, value: int):
|
|
self.packet_number = value
|
|
|
|
def set_packet_messages(self, value: list[NASAMessage]):
|
|
self.packet_messages = value
|
|
|
|
def to_raw(self) -> bytearray:
|
|
self.packet_start = 50
|
|
self.packet_end = 52
|
|
|
|
|
|
packet = bytearray()
|
|
packet.append(int(self.packet_start))
|
|
packet.append(0)
|
|
packet.append(0)
|
|
packet.append(self.packet_source_address_class.value)
|
|
packet.append(self.packet_source_channel)
|
|
packet.append(self.packet_source_address)
|
|
packet.append(self.packet_dest_address_class.value)
|
|
packet.append(self.packet_dest_channel)
|
|
packet.append(self.packet_dest_address)
|
|
packet.append((self.packet_information << 7) | (self.packet_version << 5) | (self.packet_retry_count << 3))
|
|
packet.append((self.packet_type.value << 4) | self.packet_data_type.value)
|
|
packet.append(self.packet_number)
|
|
packet.append(len(self.packet_messages))
|
|
# Add messages to the packet
|
|
for msg in self.packet_messages:
|
|
for msg_pack in msg.to_raw():
|
|
packet.append(msg_pack)
|
|
self.packet_capacity = len(self.packet_messages)
|
|
self.packet_size = len(packet)+2+2
|
|
packet[1] = (self.packet_size >> 8) & 0xFF
|
|
packet[2] = self.packet_size & 0xFF
|
|
self.packet_crc16=binascii.crc_hqx(packet[3:], 0)
|
|
final_packet = struct.pack(">BH", packet[0], len(packet[1:])+2) + packet[3:] + struct.pack(">HB", self.packet_crc16, 0x34)
|
|
return final_packet
|
|
|
|
# Example usage:
|
|
# packet = NASAPacket()
|
|
# packet.parse(bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06]))
|
|
# print(packet) |