Files
uvr67_multical603_wifi_logger/python_scripts/plot_script.py
2026-06-12 21:45:31 +02:00

145 lines
3.2 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Fri Jun 12 21:31:53 2026
@author: chris
"""
import json
import time
from collections import defaultdict, deque
import numpy as np
import paho.mqtt.client as mqtt
import matplotlib.pyplot as plt
MQTT_BROKER = "s960411f.ala.eu-central-1.emqxsl.com"
MQTT_PORT = 8883
MQTT_USER = "cma"
MQTT_PASS = "cmatest"
TOPIC = "can/raw"
# store last N samples per CAN ID
history = defaultdict(lambda: deque(maxlen=200))
# candidate sensor signals
candidates = defaultdict(list)
# ----------------------------
# MQTT callback
# ----------------------------
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
can_id = data["id"]
dlc = data["dlc"]
payload = data["data"]
timestamp = data.get("ts", time.time())
history[can_id].append((timestamp, payload))
except Exception as e:
print("Parse error:", e)
# ----------------------------
# Find best "sensor-like" byte index
# ----------------------------
def detect_sensor_like_signals():
sensor_map = {}
for can_id, samples in history.items():
if len(samples) < 20:
continue
arr = np.array([s[1][:8] for s in samples])
# check each byte position
for i in range(arr.shape[1]):
series = arr[:, i]
# ignore constant / noisy bytes
if np.std(series) < 0.5:
continue
# sensor heuristic:
# - slowly varying
# - not binary toggling
# - not full random noise
diff = np.abs(np.diff(series))
stability_score = np.mean(diff)
if 0.01 < stability_score < 5.0:
sensor_map.setdefault(can_id, []).append((i, stability_score))
return sensor_map
# ----------------------------
# Print likely sensors
# ----------------------------
def print_candidates():
sensors = detect_sensor_like_signals()
print("\n=== SENSOR CANDIDATES ===")
idx = 1
for can_id, signals in sensors.items():
for byte_index, score in signals:
print(f"Sensor{idx}: CAN ID {hex(can_id)} byte[{byte_index}] score={score:.3f}")
idx += 1
# ----------------------------
# Live plotting (simple)
# ----------------------------
def plot_sensor(can_id, byte_index):
samples = history[can_id]
if len(samples) < 10:
return
y = [s[1][byte_index] for s in samples]
x = list(range(len(y)))
plt.clf()
plt.title(f"CAN {hex(can_id)} byte[{byte_index}]")
plt.plot(x, y)
plt.pause(0.1)
# ----------------------------
# MQTT setup
# ----------------------------
client = mqtt.Client()
client.username_pw_set(MQTT_USER, MQTT_PASS)
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.subscribe(TOPIC)
client.loop_start()
print("Listening for CAN frames...\n")
# ----------------------------
# Main loop
# ----------------------------
plt.ion()
while True:
time.sleep(5)
print_candidates()
# try plotting strongest candidate (if any)
sensors = detect_sensor_like_signals()
for can_id, signals in sensors.items():
if signals:
byte_index = signals[0][0]
plot_sensor(can_id, byte_index)
break