Files
echoDaveD f0222d750f Feature/v1.0.0 release (#12)
v1.0.0 - 2025-03-13
EHS-Sentinel has been heavily modified to incorporate the control mechanism
The read-in behavior of the modbus registers has been revised from chunks to single byte
MessageProcessor now runs asynchronously
MessageProducer added which takes over the writing communication with the WP
Configuration of HASS entities has moved from hardcoded to NASA Repository
NASA Repository has been fundamentally changed
All FSV Values, NASA_POWER, VAR_IN_TEMP_WATER_LAW_TARGET_F, NASA_INDOOR_OPMODE are allowed for writing mode
NASA_OUTDOOR_DEFROST_STEP DEFROST STEP 10 (b'0xa') added
ENUM_IN_SG_READY_MODE_STATE ACTIVE (b'0x2') added
New configuration point allowControl to allow control of the Samsung EHS heat pump (deactivated by default).
[!CAUTION]
This functionality requires that EHS-Sentinel actively communicates with the Samsung EHS, so EHS-Sentinel intervenes here in the Modbus data traffic between the components (it sends its own messages). The activation of this functionality is exclusively at your own risk. I assume no liability for any damage caused.

new configuration points in logging
controlMessage (default False) to print out the controlled mesagges
invalidPacket (default False) prints out invalid messages (length not ok, x34 not at end...)
Dashboard template has been split, ressources/dashboard_readonly_template.yaml is for readonly mode and the ressources/dashboard_controlmode_template.yaml for control mode
2025-03-13 19:57:33 +01:00

267 lines
10 KiB
Python

from enum import Enum
from NASAMessage import NASAMessage
from EHSExceptions import SkipInvalidPacketException
import binascii
import struct
from CustomLogger import logger
class AddressClassEnum(Enum):
"""
Enum class representing various address classes for NASA packets.
"""
Outdoor = 0x10
HTU = 0x11
Indoor = 0x20
ERV = 0x30
Diffuser = 0x35
MCU = 0x38
RMC = 0x40
WiredRemote = 0x50
PIM = 0x58
SIM = 0x59
Peak = 0x5A
PowerDivider = 0x5B
OnOffController = 0x60
WiFiKit = 0x62
CentralController = 0x65
DMS = 0x6A
JIGTester = 0x80
BroadcastSelfLayer = 0xB0
BroadcastControlLayer = 0xB1
BroadcastSetLayer = 0xB2
BroadcastCS = 0xB3
BroadcastControlAndSetLayer = 0xB3
BroadcastModuleLayer = 0xB4
BroadcastCSM = 0xB7
BroadcastLocalLayer = 0xB8
BroadcastCSML = 0xBF
Undefined = 0xFF
class PacketType(Enum):
"""
Enum class representing different types of packets in the EHS-Sentinel system.
"""
StandBy = 0
Normal = 1
Gathering = 2
Install = 3
Download = 4
class DataType(Enum):
"""
Enum representing different types of data operations.
"""
Undefined = 0
Read = 1
Write = 2
Request = 3
Notification = 4
Resposne = 5
Ack = 6
Nack = 7
class NASAPacket:
"""
A class to represent a NASA Packet.
"""
def __init__(self):
self._packet_raw: bytearray = None
self.packet_start: int = None
self.packet_size: int = None
self.packet_source_address_class: AddressClassEnum = None
self.packet_source_channel: int = None
self.packet_source_address: int = None
self.packet_dest_address_class: AddressClassEnum = None
self.packet_dest_channel: int = None
self.packet_dest_address: int = None
self.packet_information: int = None
self.packet_version: int = None
self.packet_retry_count: int = None
self.packet_type: PacketType = None
self.packet_data_type: DataType = None
self.packet_number: int = None
self.packet_capacity: int = None
self.packet_messages: list[NASAMessage] = None
self.packet_crc16: int = None
self.packet_end: int = None
def parse(self, packet: bytearray):
self._packet_raw = packet
if len(packet) < 14:
raise ValueError("Data too short to be a valid NASAPacket")
crc_checkusm=binascii.crc_hqx(bytearray(packet[3:-3]), 0)
self.packet_start = packet[0]
self.packet_size = ((packet[1] << 8) | packet[2])
if self.packet_size+2 != len(packet):
logger.info(f"length not correct {self.packet_size+2} -> {len(packet)}")
logger.info(f"{packet.hex()}")
logger.info(f"{hex(packet[self.packet_size+1])}")
try:
self.packet_source_address_class = AddressClassEnum(packet[3])
except ValueError as e:
raise SkipInvalidPacketException(f"Source Adress Class out of enum {packet[3]}")
self.packet_source_channel = packet[4]
self.packet_source_address = packet[5]
try:
self.packet_dest_address_class = AddressClassEnum(packet[6])
except ValueError as e:
raise SkipInvalidPacketException(f"Destination Adress Class out of enum {packet[6]}")
self.packet_dest_channel = packet[7]
self.packet_dest_address = packet[8]
self.packet_information = (int(packet[9]) & 128) >> 7 == 1
self.packet_version = (int(packet[9]) & 96) >> 5
self.packet_retry_count = (int(packet[9]) & 24) >> 3
self.packet_type = PacketType((int(packet[10]) & 240) >> 4)
self.packet_data_type = DataType(int(packet[10]) & 15)
self.packet_number = packet[11]
self.packet_capacity = packet[12]
self.packet_crc16 = ((packet[-3] << 8) | packet[-2]) # + 2
self.packet_end = packet[-1]
self.packet_messages = self._extract_messages(0, self.packet_capacity, packet[13:-3], [])
if crc_checkusm != self.packet_crc16:
raise SkipInvalidPacketException(f"Checksum for package could not be validated. Calculated: {crc_checkusm} in packet: {self.packet_crc16}: packet:{self}")
def _extract_messages(self, depth: int, capacity: int, msg_rest: bytearray, return_list: list):
if depth > capacity or len(msg_rest) <= 2:
return return_list
message_number = (msg_rest[0] << 8) | msg_rest[1]
message_type = (message_number & 1536) >> 9
if message_type == 0:
payload_size = 1
elif message_type == 1:
payload_size = 2
elif message_type == 2:
payload_size = 4
elif message_type == 3:
payload_size = len(msg_rest)
if capacity != 1:
raise SkipInvalidPacketException("Message with structure type must have capacity of 1.")
else:
raise ValueError(f"Mssage type unknown: {message_type}")
payload = msg_rest[2:2 + payload_size]
if len(payload) > 255:
raise ValueError(f"Payload for Submessage {hex(message_number)} too large at index {depth}: {len(payload)} bytes.")
return_list.append(NASAMessage(packet_message=message_number, packet_message_type=message_type, packet_payload=payload))
return self._extract_messages(depth+1, capacity, msg_rest[2 + payload_size:], return_list)
def __str__(self):
text = f"NASAPacket(\n"
text += f" start={self.packet_start} ({hex(self.packet_start)}),\n"
text += f" size={self.packet_size} ({hex(self.packet_size)}),\n"
text += f" source_address_class={self.packet_source_address_class} ({hex(self.packet_source_address_class.value)}),\n"
text += f" source_channel={self.packet_source_channel} ({hex(self.packet_source_channel)}),\n"
text += f" source_address={self.packet_source_address} ({hex(self.packet_source_address)}),\n"
text += f" dest_address_class={self.packet_dest_address_class} ({hex(self.packet_dest_address_class.value)}),\n"
text += f" dest_channel={self.packet_dest_channel} ({hex(self.packet_dest_channel)}),\n"
text += f" dest_address={self.packet_dest_address} ({hex(self.packet_dest_address)}),\n"
text += f" information={self.packet_information},\n"
text += f" version={self.packet_version} ({hex(self.packet_version)}),\n"
text += f" retry_count={self.packet_retry_count} ({hex(self.packet_retry_count)}),\n"
text += f" type={self.packet_type} ({hex(self.packet_type.value)}),\n"
text += f" data_type={self.packet_data_type} ({hex(self.packet_data_type.value)}),\n"
text += f" number={self.packet_number} ({hex(self.packet_number)}),\n"
text += f" capacity={self.packet_capacity} ({hex(self.packet_capacity)}),\n"
text += f" messages=[\n"
for msg in self.packet_messages:
lines = f"{msg}".splitlines()
text += f" {lines[0]}\n"
for line in lines[1:-1]:
text += f" {line}\n"
text += f" {lines[-1]}\n"
text += " ],\n"
text += f" crc16={self.packet_crc16} ({hex(self.packet_crc16)}),\n"
text += f" end={self.packet_end} ({hex(self.packet_end)})\n"
text += f")"
return text
def __repr__(self):
return self.__str__()
# Setter methods
def set_packet_source_address_class(self, value: AddressClassEnum):
self.packet_source_address_class = value
def set_packet_source_channel(self, value: int):
self.packet_source_channel = value
def set_packet_source_address(self, value: int):
self.packet_source_address = value
def set_packet_dest_address_class(self, value: AddressClassEnum):
self.packet_dest_address_class = value
def set_packet_dest_channel(self, value: int):
self.packet_dest_channel = value
def set_packet_dest_address(self, value: int):
self.packet_dest_address = value
def set_packet_information(self, value: bool):
self.packet_information = value
def set_packet_version(self, value: int):
self.packet_version = value
def set_packet_retry_count(self, value: int):
self.packet_retry_count = value
def set_packet_type(self, value: PacketType):
self.packet_type = value
def set_packet_data_type(self, value: DataType):
self.packet_data_type = value
def set_packet_number(self, value: int):
self.packet_number = value
def set_packet_messages(self, value: list[NASAMessage]):
self.packet_messages = value
def to_raw(self) -> bytearray:
self.packet_start = 50
self.packet_end = 52
packet = bytearray()
packet.append(int(self.packet_start))
packet.append(0)
packet.append(0)
packet.append(self.packet_source_address_class.value)
packet.append(self.packet_source_channel)
packet.append(self.packet_source_address)
packet.append(self.packet_dest_address_class.value)
packet.append(self.packet_dest_channel)
packet.append(self.packet_dest_address)
packet.append((self.packet_information << 7) | (self.packet_version << 5) | (self.packet_retry_count << 3))
packet.append((self.packet_type.value << 4) | self.packet_data_type.value)
packet.append(self.packet_number)
packet.append(len(self.packet_messages))
# Add messages to the packet
for msg in self.packet_messages:
for msg_pack in msg.to_raw():
packet.append(msg_pack)
self.packet_capacity = len(self.packet_messages)
self.packet_size = len(packet)+2+2
packet[1] = (self.packet_size >> 8) & 0xFF
packet[2] = self.packet_size & 0xFF
self.packet_crc16=binascii.crc_hqx(packet[3:], 0)
final_packet = struct.pack(">BH", packet[0], len(packet[1:])+2) + packet[3:] + struct.pack(">HB", self.packet_crc16, 0x34)
return final_packet
# Example usage:
# packet = NASAPacket()
# packet.parse(bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06]))
# print(packet)