# -*- coding: utf-8 -*- """ Created on Fri Jun 12 21:31:53 2026 @author: chris """ import json import time from collections import defaultdict, deque import numpy as np import paho.mqtt.client as mqtt import matplotlib.pyplot as plt MQTT_BROKER = "s960411f.ala.eu-central-1.emqxsl.com" MQTT_PORT = 8883 MQTT_USER = "cma" MQTT_PASS = "testcma" TOPIC = "can/raw" # store last N samples per CAN ID history = defaultdict(lambda: deque(maxlen=200)) # candidate sensor signals candidates = defaultdict(list) # ---------------------------- # MQTT callback # ---------------------------- def on_message(client, userdata, msg): try: data = json.loads(msg.payload.decode()) can_id = data["id"] dlc = data["dlc"] payload = data["data"] timestamp = data.get("ts", time.time()) history[can_id].append((timestamp, payload)) if can_id != 0x70D and can_id != 0x1CD: print("ID:", hex(can_id), "Data:", payload) except Exception as e: print("Parse error:", e) # ---------------------------- # Find best "sensor-like" byte index # ---------------------------- def detect_sensor_like_signals(): sensor_map = {} for can_id, samples in history.items(): if len(samples) < 20: continue arr = np.array([s[1][:8] for s in samples]) # check each byte position for i in range(arr.shape[1]): series = arr[:, i] # ignore constant / noisy bytes if np.std(series) < 0.5: continue # sensor heuristic: # - slowly varying # - not binary toggling # - not full random noise diff = np.abs(np.diff(series)) stability_score = np.mean(diff) if 0.01 < stability_score < 5.0: sensor_map.setdefault(can_id, []).append((i, stability_score)) return sensor_map # ---------------------------- # Print likely sensors # ---------------------------- def print_candidates(): sensors = detect_sensor_like_signals() print("\n=== SENSOR CANDIDATES ===") idx = 1 for can_id, signals in sensors.items(): for byte_index, score in signals: print(f"Sensor{idx}: CAN ID {hex(can_id)} byte[{byte_index}] score={score:.3f}") idx += 1 # ---------------------------- # Live plotting (simple) # ---------------------------- def plot_sensor(can_id, byte_index): samples = history[can_id] if len(samples) < 10: return y = [s[1][byte_index] for s in samples] x = list(range(len(y))) plt.clf() plt.title(f"CAN {hex(can_id)} byte[{byte_index}]") plt.plot(x, y) plt.pause(0.1) # ---------------------------- # MQTT setup # ---------------------------- client = mqtt.Client() client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_message = on_message client.tls_set(ca_certs='./server-ca.crt') client.connect(MQTT_BROKER, MQTT_PORT, 60) client.subscribe(TOPIC,qos=1) client.loop_start() print("Listening for CAN frames...\n") # ---------------------------- # Main loop # ---------------------------- plt.ion() while True: time.sleep(5) # print_candidates() # try plotting strongest candidate (if any) # sensors = detect_sensor_like_signals() # for can_id, signals in sensors.items(): # if signals: # byte_index = signals[0][0] # plot_sensor(can_id, byte_index) # break