Files
EHS-Sentinel-Addon_for_Home…/helpertils/messageFinder.py
echoDaveD f0222d750f Feature/v1.0.0 release (#12)
v1.0.0 - 2025-03-13
EHS-Sentinel has been heavily modified to incorporate the control mechanism
The read-in behavior of the modbus registers has been revised from chunks to single byte
MessageProcessor now runs asynchronously
MessageProducer added which takes over the writing communication with the WP
Configuration of HASS entities has moved from hardcoded to NASA Repository
NASA Repository has been fundamentally changed
All FSV Values, NASA_POWER, VAR_IN_TEMP_WATER_LAW_TARGET_F, NASA_INDOOR_OPMODE are allowed for writing mode
NASA_OUTDOOR_DEFROST_STEP DEFROST STEP 10 (b'0xa') added
ENUM_IN_SG_READY_MODE_STATE ACTIVE (b'0x2') added
New configuration point allowControl to allow control of the Samsung EHS heat pump (deactivated by default).
[!CAUTION]
This functionality requires that EHS-Sentinel actively communicates with the Samsung EHS, so EHS-Sentinel intervenes here in the Modbus data traffic between the components (it sends its own messages). The activation of this functionality is exclusively at your own risk. I assume no liability for any damage caused.

new configuration points in logging
controlMessage (default False) to print out the controlled mesagges
invalidPacket (default False) prints out invalid messages (length not ok, x34 not at end...)
Dashboard template has been split, ressources/dashboard_readonly_template.yaml is for readonly mode and the ressources/dashboard_controlmode_template.yaml for control mode
2025-03-13 19:57:33 +01:00

159 lines
6.0 KiB
Python

import os
import sys
import inspect
import asyncio
import yaml
import traceback
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0, parentdir)
from NASAPacket import NASAPacket, AddressClassEnum, DataType, PacketType
from NASAMessage import NASAMessage
# Generate a list of all possible 2-byte hex values, always padded to 4 characters
two_byte_hex_values = [f"0x{i:04X}" for i in range(0x0000, 0xFFFF)]
send_message_list = []
seen_message_list = []
with open('data/NasaRepository.yml', mode='r') as file:
NASA_REPO = yaml.safe_load(file)
async def main():
# load config
with open('config.yml', mode='r') as file:
config = yaml.safe_load(file)
# Print the total count to confirm all values are included
print(f"Total values: {len(two_byte_hex_values)}")
reader, writer = await asyncio.open_connection('172.19.2.240', 4196)
print(" serial_connection fertig")
await asyncio.gather(
serial_read(reader, config),
serial_write(writer, config),
)
async def serial_write(writer: asyncio.StreamWriter, config):
_CHUNKSIZE=10
chunks = [two_byte_hex_values[i:i + _CHUNKSIZE] for i in range(0, len(two_byte_hex_values), _CHUNKSIZE)]
for chunk in chunks:
nasa_msg = NASAPacket()
nasa_msg.set_packet_source_address_class(AddressClassEnum.JIGTester)
nasa_msg.set_packet_source_channel(240)
nasa_msg.set_packet_source_address(0)
nasa_msg.set_packet_dest_address_class(AddressClassEnum.BroadcastSetLayer)
nasa_msg.set_packet_dest_channel(0)
nasa_msg.set_packet_dest_address(32)
nasa_msg.set_packet_information(True)
nasa_msg.set_packet_version(2)
nasa_msg.set_packet_retry_count(0)
nasa_msg.set_packet_type(PacketType.Normal)
nasa_msg.set_packet_data_type(DataType.Read)
nasa_msg.set_packet_number(166)
msglist=[]
for msg in chunk:
if msg not in send_message_list and msg not in seen_message_list:
tmpmsg = NASAMessage()
tmpmsg.set_packet_message(int(msg, 16))
value = 0
if tmpmsg.packet_message_type == 0:
value_raw = value.to_bytes(1, byteorder='big')
elif tmpmsg.packet_message_type == 1:
value_raw = value.to_bytes(2, byteorder='big')
elif tmpmsg.packet_message_type == 2:
value_raw = value.to_bytes(4, byteorder='big')
else:
value_raw = value.to_bytes(1, byteorder='big')
tmpmsg.set_packet_payload_raw(value_raw)
msglist.append(tmpmsg)
nasa_msg.set_packet_messages(msglist)
raw = nasa_msg.to_raw()
writer.write(raw)
await writer.drain()
send_message_list.extend(chunk)
if len(send_message_list) % 100 == 0:
print(f"Sended count: {len(send_message_list)}")
await asyncio.sleep(1)
async def serial_read(reader: asyncio.StreamReader, config):
prev_byte = 0x00
packet_started = False
data = bytearray()
packet_size = 0
while True:
current_byte = await reader.read(1) # read bitewise
#data = await reader.read(1024)
#data = await reader.readuntil(b'\x34fd')
if current_byte:
if packet_started:
data.extend(current_byte)
if len(data) == 3:
packet_size = ((data[1] << 8) | data[2]) + 2
if packet_size <= len(data):
asyncio.create_task(process_packet(data, config))
data = bytearray()
packet_started = False
# identify packet start
if current_byte == b'\x00' and prev_byte == b'\x32':
packet_started = True
data.extend(prev_byte)
data.extend(current_byte)
prev_byte = current_byte
def search_nasa_table(address):
for key in NASA_REPO:
if NASA_REPO[key]['address'].lower() == address.lower():
return key
def is_valid_rawvalue(rawvalue: bytes) -> bool:
return all(0x20 <= b <= 0x7E or b in (0x00, 0xFF) for b in rawvalue)
async def process_packet(buffer, config):
try:
nasa_packet = NASAPacket()
nasa_packet.parse(buffer)
for msg in nasa_packet.packet_messages:
if msg.packet_message not in seen_message_list:
seen_message_list.append(msg.packet_message)
msgkey = search_nasa_table(f"0x{msg.packet_message:04X}")
if msgkey is None:
msgkey = ""
msgvalue = None
if msg.packet_message_type == 3:
msgvalue = ""
if is_valid_rawvalue(msg.packet_payload[1:-1]):
for byte in msg.packet_payload[1:-1]:
if byte != 0x00 and byte != 0xFF:
char = chr(byte) if 32 <= byte <= 126 else f"{byte}"
msgvalue += char
else:
msgvalue += " "
msgvalue = msgvalue.strip()
else:
msgvalue = "".join([f"{int(x)}" for x in msg.packet_payload])
else:
msgvalue = int.from_bytes(msg.packet_payload, byteorder='big', signed=True)
line = f"| {len(seen_message_list):<6} | {hex(msg.packet_message):<6} | {msgkey:<50} | {msg.packet_message_type} | {msgvalue:<20} | {msg.packet_payload} |"
with open('helpertils/messagesFound.txt', "a") as dumpWriter:
dumpWriter.write(f"{line}\n")
except Exception as e:
pass
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except RuntimeError as e:
print(f"Runtime error: {e}")