Greetings!
Unfortunately I have a small yet annoying issue with Telegraf and Influx:
I am getting MQTT-Messages in json.v2 format containing energy consumption of the last 30 days.
Since telegraf cant handle arrays, the data is parsed via a python script. I can see in the telegraf log that the data is parsed correctly and there are no issues so far. Now when I look into my InfluxDB Data explorer the Numbers change. They change randomly and there is no specific pattern that I can make out.
This is how the message comes in(the message has 30 values, for better overview I focused on the last 4):
{
"msg_id" : 3070,
"device_info" : {
"device_id" : "TF00006",
"mac" : "XXXXXXXXXXXX"
},
"data" : {
"timestamp" : "2025-08-05T15:04:19+00:00",
"num" : 30,
"energy" : [ ... 165, 164, 168, 175 ]
}
}
Heres how my script parses them(messages labbeled with “QMSG are being sent out to influx):
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 165 at date 2025-07-10 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=165,time_plug=\"2025-08-05T15:04:19+00:00\" 1752098400"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 164 at date 2025-07-09 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=164,time_plug=\"2025-08-05T15:04:19+00:00\" 1752012000"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 168 at date 2025-07-08 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=168,time_plug=\"2025-08-05T15:04:19+00:00\" 1751925600"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 175 at date 2025-07-07 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=175,time_plug=\"2025-08-05T15:04:19+00:00\" 1751839200"
And this is how they end up in Influx:
Does anybody have an idea what is going on?
Heres the parser script(cant upload a textfile, sorry for the headache):
import sysfrom datetime import datetime as dtimport datetime
def filter_by_msgid(data: str):# sort out all messeages with msg_id != 3070if “msg_id=3070” not in data:print(data.rstrip(), flush=True, file=sys.stdout)return {}# ignore all msg_id=3070 messages without usable dataif “num” not in data or “energy_list_30d” not in data:print(f"Filter:\n{data.rstrip()}“, flush=True, file=sys.stderr) # remreturn {}if “energy_list_30d=” in data:print(f"Filter:\n{data.rstrip()}”, flush=True, file=sys.stderr) # remreturn {}# mqtt_consumer,device_id=TF00002,host=Fred,mac=C8BF1FF9FC29,msg_id=3070,plugs=MK117NB-FC29/TF00002/device_to_app num=30,energy_list_30d=33,time_plug=“2025-07-29T15:48:35+00:00” 1753804117594667885print(f"pre_parsing:\n{data}“, flush=True, file=sys.stderr) # remdata_dict = {“device_id”: None,“host”: None,“mac”: None,“msg_id”: None,“plugs”: None,“num”: None,“energy_list_30d”: None,“time_plug”: None,}cleaned_data = data.replace(” “, “,”).split(”,“)cleaned_data.pop()# [“mqtt_consumer”,“device_id=TF00002”,“host=Fred”,“mac=C8BF1FF9FC29”,“msg_id=3070”,“plugs=MK117NB-FC29/TF00002/device_to_app”,“num=30”,“energy_list_30d=33”,“time_plug="2025-07-29T15:48:35+00:00"”]for x in cleaned_data:if “device_id” in x:data_dict[“device_id”] = x.split(“device_id=”)[1]elif “host” in x:data_dict[“host”] = x.split(“host=”)[1]elif “mac” in x:data_dict[“mac”] = x.split(“mac=”)[1]elif “msg_id” in x:data_dict[“msg_id”] = x.split(“msg_id=”)[1]elif “plugs” in x:data_dict[“plugs”] = x.split(“plugs=”)[1]elif “num” in x:data_dict[“num”] = x.split(“num=”)[1]elif “energy_list_30d” in x:data_dict[“energy_list_30d”] = x.split(“energy_list_30d=”)[1]elif “time_plug” in x:data_dict[“time_plug”] = x.split('time_plug=”‘)[1].rstrip(’“')print(f"parsed:\n{data_dict}”, flush=True, file=sys.stderr) # remreturn data_dict
def merge_messages(jsondata: dict, message_data: dict):print(f"Merger:\n{message_data}“, flush=True, file=sys.stderr) # remidentifier = message_data[“plugs”]if identifier not in jsondata.keys():message_data[“energy_list_30d”] = [message_data[“energy_list_30d”]]jsondata[identifier] = message_dataprint(f"Merger_2:\n{jsondata}”, flush=True, file=sys.stderr) # remreturnjsondata[identifier][“energy_list_30d”].append(message_data[“energy_list_30d”])
def parse_and_send_new_datapoint(data: dict):parsed_time = data[“time_plug”].split(“+”)[0].replace(“T”, " “)dt0 = dt.strptime(parsed_time, “%Y-%m-%d %H:%M:%S”).replace(hour=0, minute=0, second=0)for it, datapoint in enumerate(data[“energy_list_30d”]): # TODO: zero to begin of day# 2025-07-29 15:48:35timestamp = int((dt0 - datetime.timedelta(days=it)).timestamp())print(f"TS:\nsetting value {datapoint} at date {dt.fromtimestamp(timestamp).strftime(‘%Y-%m-%d %H:%M:%S’)}”,flush=True,file=sys.stderr,) # remprint(f’mqtt_consumer,device_id={data[“device_id”]},host={data[“host”]},mac={data[“mac”]},msg_id={data[“msg_id”]},plugs={data[“plugs”]} num={data[“num”]},energy_list_30d={datapoint},time_plug=“{data[“time_plug”]}” {timestamp}‘,flush=True,file=sys.stdout,)print(f’QMSG: mqtt_consumer,device_id={data[“device_id”]},host={data[“host”]},mac={data[“mac”]},msg_id={data[“msg_id”]},plugs={data[“plugs”]} num={data[“num”]},energy_list_30d={datapoint},time_plug=“{data[“time_plug”]}” {timestamp}’,flush=True,file=sys.stderr,)return True
def main():message = “” # remtry:key_list = jsondata = {} # {MAC: data_dict}# with open(“telegraf.data”, “a”, encoding=“utf-8”) as fifo:with open(“/etc/telegraf/telegraf.data”, mode=“a”, encoding=“utf-8”) as fifo:for line in sys.stdin:message = line.rstrip()datapoint = filter_by_msgid(line)if datapoint:merge_messages(jsondata, datapoint)fifo.write(f"{datapoint}\n") # remfifo.write(f"{‘*’ * 50}\n") # remfor key, value in jsondata.items():if int(value[“num”]) == len(value[“energy_list_30d”]):if parse_and_send_new_datapoint(value):key_list.append(key)fifo.write(f"{‘-’ * 50}\n") # remfifo.write(f"{jsondata}\n") # remfifo.write(f"{‘-’ * 50}\n") # remfor key in key_list:print(f"Key_Clean:\n{key}“, flush=True, file=sys.stderr) # remdel jsondata[key]if not jsondata and key_list:print(“Keylist_Clean”, flush=True, file=sys.stderr) # remkey_list.clear()except Exception as excp:print(f"Exception:\n{excp}\n{message}”,flush=True,file=sys.stderr,)
if name == “main”:main()


