v1.0.0 - 2025-03-13 EHS-Sentinel has been heavily modified to incorporate the control mechanism The read-in behavior of the modbus registers has been revised from chunks to single byte MessageProcessor now runs asynchronously MessageProducer added which takes over the writing communication with the WP Configuration of HASS entities has moved from hardcoded to NASA Repository NASA Repository has been fundamentally changed All FSV Values, NASA_POWER, VAR_IN_TEMP_WATER_LAW_TARGET_F, NASA_INDOOR_OPMODE are allowed for writing mode NASA_OUTDOOR_DEFROST_STEP DEFROST STEP 10 (b'0xa') added ENUM_IN_SG_READY_MODE_STATE ACTIVE (b'0x2') added New configuration point allowControl to allow control of the Samsung EHS heat pump (deactivated by default). [!CAUTION] This functionality requires that EHS-Sentinel actively communicates with the Samsung EHS, so EHS-Sentinel intervenes here in the Modbus data traffic between the components (it sends its own messages). The activation of this functionality is exclusively at your own risk. I assume no liability for any damage caused. new configuration points in logging controlMessage (default False) to print out the controlled mesagges invalidPacket (default False) prints out invalid messages (length not ok, x34 not at end...) Dashboard template has been split, ressources/dashboard_readonly_template.yaml is for readonly mode and the ressources/dashboard_controlmode_template.yaml for control mode
281 lines
12 KiB
Python
281 lines
12 KiB
Python
import asyncio
|
|
import os
|
|
import signal
|
|
import json
|
|
import time
|
|
|
|
import gmqtt
|
|
|
|
# Get the logger
|
|
from CustomLogger import logger
|
|
from EHSArguments import EHSArguments
|
|
from EHSConfig import EHSConfig
|
|
from MessageProducer import MessageProducer
|
|
|
|
class MQTTClient:
|
|
"""
|
|
MQTTClient is a singleton class that manages the connection and communication with an MQTT broker.
|
|
It handles the initialization, connection, subscription, and message publishing for the MQTT client.
|
|
The class also supports Home Assistant auto-discovery and maintains a list of known devices.
|
|
"""
|
|
_instance = None
|
|
STOP = asyncio.Event()
|
|
|
|
DEVICE_ID = "samsung_ehssentinel"
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
|
|
if not cls._instance:
|
|
cls._instance = super(MQTTClient, cls).__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
|
|
if self._initialized:
|
|
return
|
|
self.config = EHSConfig()
|
|
self.args = EHSArguments()
|
|
self.message_producer = None
|
|
self._initialized = True
|
|
self.broker = self.config.MQTT['broker-url']
|
|
self.port = self.config.MQTT['broker-port']
|
|
self.client_id = self.config.MQTT['client-id']
|
|
self.client = gmqtt.Client(self.client_id)
|
|
self.client.on_connect = self.on_connect
|
|
self.client.on_disconnect = self.on_disconnect
|
|
self.client.on_message = self.on_message
|
|
self.client.on_subscribe = self.on_subscribe
|
|
if self.config.MQTT['user'] and self.config.MQTT['password']:
|
|
self.client.set_auth_credentials(self.config.MQTT['user'], self.config.MQTT['password'])
|
|
self.topicPrefix = self.config.MQTT['topicPrefix']
|
|
self.homeAssistantAutoDiscoverTopic = self.config.MQTT['homeAssistantAutoDiscoverTopic']
|
|
self.useCamelCaseTopicNames = self.config.MQTT['useCamelCaseTopicNames']
|
|
|
|
self.initialized = True
|
|
self.known_topics: list = list() # Set to keep track of known topics
|
|
self.known_devices_topic = "known/devices" # Dedicated topic for storing known topics
|
|
|
|
async def connect(self):
|
|
logger.info("[MQTT] Connecting to broker...")
|
|
await self.client.connect(self.broker, self.port, keepalive=60, version=gmqtt.constants.MQTTv311)
|
|
|
|
if self.args.CLEAN_KNOWN_DEVICES:
|
|
self._publish(f"{self.topicPrefix.replace('/', '')}/{self.known_devices_topic}", " ", retain=True)
|
|
logger.info("Known Devices Topic have been cleared")
|
|
|
|
def subscribe_known_topics(self):
|
|
logger.info("Subscribe to known devices topic")
|
|
sublist = [
|
|
gmqtt.Subscription(f"{self.topicPrefix.replace('/', '')}/{self.known_devices_topic}", 1),
|
|
gmqtt.Subscription(f"{self.homeAssistantAutoDiscoverTopic}/status", 1)
|
|
]
|
|
if self.config.GENERAL['allowControl']:
|
|
sublist.append(gmqtt.Subscription(f"{self.topicPrefix.replace('/', '')}/entity/+/set", 1))
|
|
|
|
self.client.subscribe(sublist)
|
|
|
|
def on_subscribe(self, client, mid, qos, properties):
|
|
logger.debug('SUBSCRIBED')
|
|
|
|
def on_message(self, client, topic, payload, qos, properties):
|
|
if self.known_devices_topic in topic:
|
|
# Update the known devices set with the retained message
|
|
self.known_topics = list(filter(None, [x.strip() for x in payload.decode().split(",")]))
|
|
if properties['retain'] == True:
|
|
if self.config.LOGGING['deviceAdded']:
|
|
logger.info(f"Loaded devices from known devices Topic:")
|
|
|
|
for idx, devname in enumerate(self.known_topics, start=1):
|
|
logger.info(f"Device no. {idx:<3}: {devname} ")
|
|
else:
|
|
logger.debug(f"Loaded devices from known devices Topic:")
|
|
for idx, devname in enumerate(self.known_topics):
|
|
logger.debug(f"Device added no. {idx:<3}: {devname} ")
|
|
|
|
if f"{self.homeAssistantAutoDiscoverTopic}/status" == topic:
|
|
logger.info(f"HASS Status Messages {topic} received: {payload.decode()}")
|
|
if payload.decode() == "online":
|
|
self._publish(f"{self.topicPrefix.replace('/', '')}/{self.known_devices_topic}", " ", retain=True)
|
|
logger.info("Known Devices Topic have been cleared")
|
|
self.clear_hass()
|
|
logger.info("All configuration from HASS has been resetet")
|
|
|
|
if topic.startswith(f"{self.topicPrefix.replace('/', '')}/entity"):
|
|
logger.info(f"HASS Set Entity Messages {topic} received: {payload.decode()}")
|
|
parts = topic.split("/")
|
|
if self.message_producer is None:
|
|
self.message_producer = MessageProducer(None)
|
|
asyncio.create_task(self.message_producer.write_request(parts[2], payload.decode(), read_request_after=True))
|
|
|
|
def on_connect(self, client, flags, rc, properties):
|
|
if rc == 0:
|
|
logger.info(f"Connected to MQTT with result code {rc}")
|
|
if len(self.homeAssistantAutoDiscoverTopic) > 0:
|
|
self.subscribe_known_topics()
|
|
else:
|
|
logger.error(f"Failed to connect, return code {rc}")
|
|
|
|
def on_disconnect(self, client, packet, exc=None):
|
|
logger.info(f"Disconnected with result code ")
|
|
logger.warning("Unexpected disconnection. Reconnecting...")
|
|
while True:
|
|
try:
|
|
self.client.reconnect()
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Reconnection failed: {e}")
|
|
time.sleep(5)
|
|
|
|
def _publish(self, topic, payload, qos=0, retain=False):
|
|
logger.debug(f"MQTT Publish Topic: {topic} payload: {payload}")
|
|
self.client.publish(f"{topic}", payload, qos, retain)
|
|
#time.sleep(0.1)
|
|
|
|
def refresh_known_devices(self, devname):
|
|
self.known_topics.append(devname)
|
|
if self.config.LOGGING['deviceAdded']:
|
|
logger.info(f"Device added no. {len(self.known_topics):<3}: {devname} ")
|
|
else:
|
|
logger.debug(f"Device added no. {len(self.known_topics):<3}: {devname} ")
|
|
self._publish(f"{self.topicPrefix.replace('/', '')}/{self.known_devices_topic}", ",".join(self.known_topics), retain=True)
|
|
|
|
async def publish_message(self, name, value):
|
|
newname = f"{self._normalize_name(name)}"
|
|
|
|
if len(self.homeAssistantAutoDiscoverTopic) > 0:
|
|
|
|
if name not in self.known_topics:
|
|
self.auto_discover_hass(name)
|
|
self.refresh_known_devices(name)
|
|
|
|
if self.config.NASA_REPO[name]['hass_opts']['writable']:
|
|
sensor_type = self.config.NASA_REPO[name]['hass_opts']['platform']['type']
|
|
else:
|
|
sensor_type = self.config.NASA_REPO[name]['hass_opts']['default_platform']
|
|
topicname = f"{self.config.MQTT['homeAssistantAutoDiscoverTopic']}/{sensor_type}/{self.DEVICE_ID}_{newname.lower()}/state"
|
|
else:
|
|
topicname = f"{self.topicPrefix.replace('/', '')}/{newname}"
|
|
|
|
if isinstance(value, (int, float)) and not isinstance(value, bool):
|
|
value = round(value, 2) if isinstance(value, float) and "." in f"{value}" else value
|
|
|
|
self._publish(topicname, value, qos=2, retain=False)
|
|
|
|
def clear_hass(self):
|
|
entities = {}
|
|
for nasa in self.config.NASA_REPO:
|
|
namenorm = self._normalize_name(nasa)
|
|
if self.config.NASA_REPO[nasa]['hass_opts']['writable']:
|
|
sensor_type = self.config.NASA_REPO[nasa]['hass_opts']['platform']['type']
|
|
else:
|
|
sensor_type = self.config.NASA_REPO[nasa]['hass_opts']['default_platform']
|
|
entities[namenorm] = {"platform": sensor_type}
|
|
|
|
device = {
|
|
"device": self._get_device(),
|
|
"origin": self._get_origin(),
|
|
"components": entities,
|
|
"qos": 2
|
|
}
|
|
|
|
logger.debug(f"Auto Discovery HomeAssistant Clear Message: ")
|
|
logger.debug(f"{device}")
|
|
|
|
self._publish(f"{self.config.MQTT['homeAssistantAutoDiscoverTopic']}/device/{self.DEVICE_ID}/config",
|
|
json.dumps(device, ensure_ascii=False),
|
|
qos=2,
|
|
retain=True)
|
|
|
|
def auto_discover_hass(self, name):
|
|
entity = {}
|
|
namenorm = self._normalize_name(name)
|
|
entity = {
|
|
"name": f"{namenorm}",
|
|
"object_id": f"{self.DEVICE_ID}_{namenorm.lower()}",
|
|
"unique_id": f"{self.DEVICE_ID}_{name.lower()}",
|
|
"force_update": True,
|
|
#"expire_after": 86400, # 1 day (24h * 60m * 60s)
|
|
"value_template": "{{ value }}"
|
|
#"value_template": "{{ value if value | length > 0 else 'unavailable' }}",
|
|
}
|
|
if self.config.NASA_REPO[name]['hass_opts']['writable'] and self.config.GENERAL['allowControl']:
|
|
sensor_type = self.config.NASA_REPO[name]['hass_opts']['platform']['type']
|
|
if sensor_type == 'select':
|
|
entity['options'] = self.config.NASA_REPO[name]['hass_opts']['platform']['options']
|
|
if sensor_type == 'number':
|
|
entity['mode'] = self.config.NASA_REPO[name]['hass_opts']['platform']['mode']
|
|
entity['min'] = self.config.NASA_REPO[name]['hass_opts']['platform']['min']
|
|
entity['max'] = self.config.NASA_REPO[name]['hass_opts']['platform']['max']
|
|
if 'step' in self.config.NASA_REPO[name]['hass_opts']['platform']:
|
|
entity['step'] = self.config.NASA_REPO[name]['hass_opts']['platform']['step']
|
|
|
|
entity['command_topic'] = f"{self.topicPrefix.replace('/', '')}/entity/{name}/set"
|
|
entity['optimistic'] = False
|
|
else:
|
|
sensor_type = self.config.NASA_REPO[name]['hass_opts']['default_platform']
|
|
|
|
if 'unit' in self.config.NASA_REPO[name]['hass_opts']:
|
|
entity['unit_of_measurement'] = self.config.NASA_REPO[name]['hass_opts']['unit']
|
|
|
|
entity['platform'] = sensor_type
|
|
entity['state_topic'] = f"{self.config.MQTT['homeAssistantAutoDiscoverTopic']}/{sensor_type}/{self.DEVICE_ID}_{namenorm.lower()}/state"
|
|
|
|
if 'payload_off' in self.config.NASA_REPO[name]['hass_opts']['platform']:
|
|
entity['payload_off'] = "OFF"
|
|
if 'payload_on' in self.config.NASA_REPO[name]['hass_opts']['platform']:
|
|
entity['payload_on'] = "ON"
|
|
if 'state_class' in self.config.NASA_REPO[name]['hass_opts']:
|
|
entity['state_class'] = self.config.NASA_REPO[name]['hass_opts']['state_class']
|
|
if 'device_class' in self.config.NASA_REPO[name]['hass_opts']:
|
|
entity['device_class'] = self.config.NASA_REPO[name]['hass_opts']['device_class']
|
|
|
|
device = {
|
|
"device": self._get_device(),
|
|
"origin": self._get_origin(),
|
|
"qos": 2
|
|
}
|
|
device.update(entity)
|
|
|
|
logger.debug(f"Auto Discovery HomeAssistant Message: ")
|
|
logger.debug(f"{device}")
|
|
|
|
self._publish(f"{self.config.MQTT['homeAssistantAutoDiscoverTopic']}/{sensor_type}/{self.DEVICE_ID}_{name.lower()}/config",
|
|
json.dumps(device, ensure_ascii=False),
|
|
qos=2,
|
|
retain=True)
|
|
|
|
def _get_device(self):
|
|
return {
|
|
"identifiers": self.DEVICE_ID,
|
|
"name": "Samsung EHS",
|
|
"manufacturer": "Samsung",
|
|
"model": "Mono HQ Quiet",
|
|
"sw_version": "1.0.0"
|
|
}
|
|
|
|
def _get_origin(self):
|
|
return {
|
|
"name": "EHS-Sentinel",
|
|
"support_url": "https://github.com/echoDaveD/EHS-Sentinel"
|
|
}
|
|
|
|
def _normalize_name(self, name):
|
|
if self.useCamelCaseTopicNames:
|
|
prefix_to_remove = ['ENUM_', 'LVAR_', 'NASA_', 'VAR_']
|
|
# remove unnecessary prefixes of name
|
|
for prefix in prefix_to_remove:
|
|
if name.startswith(prefix):
|
|
name = name[len(prefix):]
|
|
break
|
|
|
|
name_parts = name.split("_")
|
|
tmpname = name_parts[0].lower()
|
|
# construct new name in CamelCase
|
|
for i in range(1, len(name_parts)):
|
|
tmpname += name_parts[i].capitalize()
|
|
else:
|
|
tmpname = name
|
|
|
|
return tmpname
|