Feature/v0.2.0 (#7)

* heateroutput limit 0 - 15000 w

* heatoutput always positive

* ENUM_IN_FSV_2041 value 0c00 unknwon added

* recalculate crc6 checksum and check it

* skip exception to reduce logging

* NASAPacket and NASAMessage prepared for write mode
MQTT Auto Discovery changed single entitity to all entities
NASA Packet crc16 Checksum verificated

* - Changed MQTT Auto Discovery Config Message from single Entitiy to all Entities at ones, known devices are fully configured, not known empty (markt to delete)
- NASAPacket and NASAMessage are now bidirectional, can decode and encode Packets
- Added crc16 Checksum check for any Packet to reduce incorrect value changes
- Folling warnings moved to SkipInvalidPacketException and from warning to debug log level to reduce Logentries
  - Source Adress Class out of enum
  - Destination Adress Class out of enum
  - Checksum for package could not be validatet calculated
  - Message with structure type must have capacity of 1.

* NASA_OUTDOOR_HP as kw unit

* NASA Repository, measurements enums completed

* filter wifikit heartbeat

* process only packets from indoor or outdoor

* correct readme

* remove expire

* device discovery status

* new mqtt hass configuration approach

* added new measurements

* added new logging features from config


* NASA_EHSSENTINEL_TOTAL_COP added

* removed silentMode, added logging proccessedMessage

* loaded devices

* loaded devices counter fix

* only if retain true

* final 0.2.0 commit
This commit is contained in:
echoDaveD
2025-02-22 22:45:18 +01:00
committed by GitHub
parent cce625dabb
commit 48ef003f22
13 changed files with 1069 additions and 386 deletions

View File

@@ -1,6 +1,8 @@
from enum import Enum
from NASAMessage import NASAMessage
from EHSExceptions import MessageCapacityStructureWarning
from EHSExceptions import SkipInvalidPacketException
import binascii
import struct
class AddressClassEnum(Enum):
"""
@@ -210,13 +212,21 @@ class NASAPacket:
self._packet_raw = packet
if len(packet) < 14:
raise ValueError("Data too short to be a valid NASAPacket")
crc_checkusm=binascii.crc_hqx(bytearray(packet[3:-3]), 0)
self.packet_start = packet[0]
self.packet_size = ((packet[1] << 8) | packet[2])
self.packet_source_address_class = AddressClassEnum(packet[3])
try:
self.packet_source_address_class = AddressClassEnum(packet[3])
except ValueError as e:
raise SkipInvalidPacketException(f"Source Adress Class out of enum {packet[3]}")
self.packet_source_channel = packet[4]
self.packet_source_address = packet[5]
self.packet_dest_address_class = AddressClassEnum(packet[6])
try:
self.packet_dest_address_class = AddressClassEnum(packet[6])
except ValueError as e:
raise SkipInvalidPacketException(f"Destination Adress Class out of enum {packet[6]}")
self.packet_dest_channel = packet[7]
self.packet_dest_address = packet[8]
self.packet_information = (int(packet[9]) & 128) >> 7 == 1
@@ -226,10 +236,13 @@ class NASAPacket:
self.packet_data_type = DataType(int(packet[10]) & 15)
self.packet_number = packet[11]
self.packet_capacity = packet[12]
self.packet_crc16 = ((packet[-3] << 8) | packet[-2]) + 2
self.packet_crc16 = ((packet[-3] << 8) | packet[-2]) # + 2
self.packet_end = packet[-1]
self.packet_messages = self._extract_messages(0, self.packet_capacity, packet[13:-3], [])
if crc_checkusm != self.packet_crc16:
raise SkipInvalidPacketException(f"Checksum for package could not be validated. Calculated: {crc_checkusm} in packet: {self.packet_crc16}: packet:{self}")
def _extract_messages(self, depth: int, capacity: int, msg_rest: bytearray, return_list: list):
"""
Recursively extracts messages from a bytearray and appends them to a list.
@@ -260,7 +273,7 @@ class NASAPacket:
elif message_type == 3:
payload_size = len(msg_rest)
if capacity != 1:
raise MessageCapacityStructureWarning("Message with structure type must have capacity of 1.")
raise SkipInvalidPacketException("Message with structure type must have capacity of 1.")
else:
raise ValueError(f"Mssage type unknown: {message_type}")
@@ -303,6 +316,84 @@ class NASAPacket:
def __repr__(self):
return self.__str__()
# Setter methods
def set_packet_source_address_class(self, value: AddressClassEnum):
self.packet_source_address_class = value
def set_packet_source_channel(self, value: int):
self.packet_source_channel = value
def set_packet_source_address(self, value: int):
self.packet_source_address = value
def set_packet_dest_address_class(self, value: AddressClassEnum):
self.packet_dest_address_class = value
def set_packet_dest_channel(self, value: int):
self.packet_dest_channel = value
def set_packet_dest_address(self, value: int):
self.packet_dest_address = value
def set_packet_information(self, value: bool):
self.packet_information = value
def set_packet_version(self, value: int):
self.packet_version = value
def set_packet_retry_count(self, value: int):
self.packet_retry_count = value
def set_packet_type(self, value: PacketType):
self.packet_type = value
def set_packet_data_type(self, value: DataType):
self.packet_data_type = value
def set_packet_number(self, value: int):
self.packet_number = value
def set_packet_messages(self, value: list[NASAMessage]):
self.packet_messages = value
def to_raw(self) -> bytearray:
"""
Converts the NASAPacket object back to its raw byte representation.
Returns:
bytearray: The raw byte representation of the packet.
"""
self.packet_start = 50
self.packet_end = 52
packet = bytearray()
packet.append(int(self.packet_start))
packet.append(0)
packet.append(0)
packet.append(self.packet_source_address_class.value)
packet.append(self.packet_source_channel)
packet.append(self.packet_source_address)
packet.append(self.packet_dest_address_class.value)
packet.append(self.packet_dest_channel)
packet.append(self.packet_dest_address)
packet.append((self.packet_information << 7) | (self.packet_version << 5) | (self.packet_retry_count << 3))
packet.append((self.packet_type.value << 4) | self.packet_data_type.value)
packet.append(self.packet_number)
packet.append(len(self.packet_messages))
# Add messages to the packet
for msg in self.packet_messages:
for msg_pack in msg.to_raw():
packet.append(msg_pack)
self.packet_capacity = len(self.packet_messages)
self.packet_size = len(packet)+2+2
packet[1] = (self.packet_size >> 8) & 0xFF
packet[2] = self.packet_size & 0xFF
self.packet_crc16=binascii.crc_hqx(packet[3:], 0)
final_packet = struct.pack(">BH", packet[0], len(packet[1:])+2) + packet[3:] + struct.pack(">HB", self.packet_crc16, 0x34)
print([x for x in final_packet])
return final_packet
# Example usage:
# packet = NASAPacket()
# packet.parse(bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06]))