From 83f168a52fc66611c9045b7a403e9d57ca304876 Mon Sep 17 00:00:00 2001 From: Christian Lind Vie Madsen Date: Mon, 15 Jun 2026 11:35:42 +0200 Subject: [PATCH] did we just decode S1,S2,S3??! Wait until i come home to verify decoded data.. --- python_scripts/plot_script.py | 149 ++++++++++++++-------------------- 1 file changed, 62 insertions(+), 87 deletions(-) diff --git a/python_scripts/plot_script.py b/python_scripts/plot_script.py index b3218b5..32497c8 100644 --- a/python_scripts/plot_script.py +++ b/python_scripts/plot_script.py @@ -1,9 +1,5 @@ # -*- coding: utf-8 -*- -""" -Created on Fri Jun 12 21:31:53 2026 -@author: chris -""" import json import time from collections import defaultdict, deque @@ -19,100 +15,89 @@ MQTT_PASS = "testcma" TOPIC = "can/raw" -# store last N samples per CAN ID history = defaultdict(lambda: deque(maxlen=200)) -# candidate sensor signals -candidates = defaultdict(list) +# ---------------------------- +# decoded time series +# ---------------------------- +t_log = [] +s1_log = [] +s2_log = [] +s3_log = [] + +# ---------------------------- +# decode UVR67 frame (0x20D) +# ---------------------------- +def decode_20d(payload): + s1 = payload[0] | (payload[1] << 8) + s2 = payload[2] | (payload[3] << 8) + s3 = payload[4] | (payload[5] << 8) + + def conv(x): + if x == 0x3FFF or x == 0: + return None + return x / 10.0 + + return conv(s1), conv(s2), conv(s3) + # ---------------------------- # MQTT callback # ---------------------------- def on_message(client, userdata, msg): - try: data = json.loads(msg.payload.decode()) can_id = data["id"] - dlc = data["dlc"] payload = data["data"] - timestamp = data.get("ts", time.time()) history[can_id].append((timestamp, payload)) - + + # keep your debug print (but cleaner) if can_id != 0x70D and can_id != 0x1CD: print("ID:", hex(can_id), "Data:", payload) + # ---------------------------- + # ONLY decode UVR67 temps here + # ---------------------------- + if can_id == 0x20D: + s1, s2, s3 = decode_20d(payload) + + t_log.append(timestamp) + s1_log.append(s1) + s2_log.append(s2) + s3_log.append(s3) + + print(f"S1={s1}°C S2={s2}°C S3={s3}°C") + except Exception as e: print("Parse error:", e) - - - - -# ---------------------------- -# Find best "sensor-like" byte index -# ---------------------------- -def detect_sensor_like_signals(): - sensor_map = {} - - for can_id, samples in history.items(): - if len(samples) < 20: - continue - - arr = np.array([s[1][:8] for s in samples]) - - # check each byte position - for i in range(arr.shape[1]): - series = arr[:, i] - - # ignore constant / noisy bytes - if np.std(series) < 0.5: - continue - - # sensor heuristic: - # - slowly varying - # - not binary toggling - # - not full random noise - diff = np.abs(np.diff(series)) - stability_score = np.mean(diff) - - if 0.01 < stability_score < 5.0: - sensor_map.setdefault(can_id, []).append((i, stability_score)) - - return sensor_map # ---------------------------- -# Print likely sensors +# live plot # ---------------------------- -def print_candidates(): - sensors = detect_sensor_like_signals() +def plot_live(): + plt.clf() - print("\n=== SENSOR CANDIDATES ===") - - idx = 1 - for can_id, signals in sensors.items(): - for byte_index, score in signals: - print(f"Sensor{idx}: CAN ID {hex(can_id)} byte[{byte_index}] score={score:.3f}") - idx += 1 - - -# ---------------------------- -# Live plotting (simple) -# ---------------------------- -def plot_sensor(can_id, byte_index): - samples = history[can_id] - - if len(samples) < 10: + if len(t_log) < 2: return - y = [s[1][byte_index] for s in samples] - x = list(range(len(y))) + t0 = t_log[0] + t = [x - t0 for x in t_log] + + plt.title("UVR67 Solar Temperatures") + plt.xlabel("Time (s)") + plt.ylabel("Temperature (°C)") + + plt.plot(t, s1_log, label="S1 Collector") + plt.plot(t, s2_log, label="S2 Bottom tank") + plt.plot(t, s3_log, label="S3 Top tank") + + plt.legend() + plt.grid(True) - plt.clf() - plt.title(f"CAN {hex(can_id)} byte[{byte_index}]") - plt.plot(x, y) plt.pause(0.1) @@ -123,29 +108,19 @@ client = mqtt.Client() client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_message = on_message client.tls_set(ca_certs='./server-ca.crt') -client.connect(MQTT_BROKER, MQTT_PORT, 60) -client.subscribe(TOPIC,qos=1) +client.connect(MQTT_BROKER, MQTT_PORT, 60) +client.subscribe(TOPIC, qos=1) client.loop_start() print("Listening for CAN frames...\n") -# ---------------------------- -# Main loop -# ---------------------------- plt.ion() +# ---------------------------- +# main loop +# ---------------------------- while True: - time.sleep(5) - -# print_candidates() - - # try plotting strongest candidate (if any) -# sensors = detect_sensor_like_signals() - -# for can_id, signals in sensors.items(): -# if signals: -# byte_index = signals[0][0] -# plot_sensor(can_id, byte_index) -# break + time.sleep(1) + plot_live()