Mismatch between data from Telegraf and Influx

Greetings!

Unfortunately I have a small yet annoying issue with Telegraf and Influx:

I am getting MQTT-Messages in json.v2 format containing energy consumption of the last 30 days.
Since telegraf cant handle arrays, the data is parsed via a python script. I can see in the telegraf log that the data is parsed correctly and there are no issues so far. Now when I look into my InfluxDB Data explorer the Numbers change. They change randomly and there is no specific pattern that I can make out.

This is how the message comes in(the message has 30 values, for better overview I focused on the last 4):

{
  "msg_id" : 3070,
  "device_info" : {
    "device_id" : "TF00006",
    "mac" : "XXXXXXXXXXXX"
  },
  "data" : {
    "timestamp" : "2025-08-05T15:04:19+00:00",
    "num" : 30,
    "energy" : [ ... 165, 164, 168, 175 ]
  }
} 

Heres how my script parses them(messages labbeled with “QMSG are being sent out to influx):

2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 165 at date 2025-07-10 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=165,time_plug=\"2025-08-05T15:04:19+00:00\" 1752098400"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 164 at date 2025-07-09 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=164,time_plug=\"2025-08-05T15:04:19+00:00\" 1752012000"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 168 at date 2025-07-08 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=168,time_plug=\"2025-08-05T15:04:19+00:00\" 1751925600"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "TS:"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "setting value 175 at date 2025-07-07 00:00:00"
2025-08-05T15:04:19Z E! [processors.execd] stderr: "QMSG: mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=175,time_plug=\"2025-08-05T15:04:19+00:00\" 1751839200"

And this is how they end up in Influx:

Does anybody have an idea what is going on?

Heres the parser script(cant upload a textfile, sorry for the headache):

import sysfrom datetime import datetime as dtimport datetime

def filter_by_msgid(data: str):# sort out all messeages with msg_id != 3070if “msg_id=3070” not in data:print(data.rstrip(), flush=True, file=sys.stdout)return {}# ignore all msg_id=3070 messages without usable dataif “num” not in data or “energy_list_30d” not in data:print(f"Filter:\n{data.rstrip()}“, flush=True, file=sys.stderr)  # remreturn {}if “energy_list_30d=” in data:print(f"Filter:\n{data.rstrip()}”, flush=True, file=sys.stderr)  # remreturn {}# mqtt_consumer,device_id=TF00002,host=Fred,mac=C8BF1FF9FC29,msg_id=3070,plugs=MK117NB-FC29/TF00002/device_to_app num=30,energy_list_30d=33,time_plug=“2025-07-29T15:48:35+00:00” 1753804117594667885print(f"pre_parsing:\n{data}“, flush=True, file=sys.stderr)  # remdata_dict = {“device_id”: None,“host”: None,“mac”: None,“msg_id”: None,“plugs”: None,“num”: None,“energy_list_30d”: None,“time_plug”: None,}cleaned_data = data.replace(” “, “,”).split(”,“)cleaned_data.pop()# [“mqtt_consumer”,“device_id=TF00002”,“host=Fred”,“mac=C8BF1FF9FC29”,“msg_id=3070”,“plugs=MK117NB-FC29/TF00002/device_to_app”,“num=30”,“energy_list_30d=33”,“time_plug="2025-07-29T15:48:35+00:00"”]for x in cleaned_data:if “device_id” in x:data_dict[“device_id”] = x.split(“device_id=”)[1]elif “host” in x:data_dict[“host”] = x.split(“host=”)[1]elif “mac” in x:data_dict[“mac”] = x.split(“mac=”)[1]elif “msg_id” in x:data_dict[“msg_id”] = x.split(“msg_id=”)[1]elif “plugs” in x:data_dict[“plugs”] = x.split(“plugs=”)[1]elif “num” in x:data_dict[“num”] = x.split(“num=”)[1]elif “energy_list_30d” in x:data_dict[“energy_list_30d”] = x.split(“energy_list_30d=”)[1]elif “time_plug” in x:data_dict[“time_plug”] = x.split('time_plug=”‘)[1].rstrip(’“')print(f"parsed:\n{data_dict}”, flush=True, file=sys.stderr)  # remreturn data_dict
def merge_messages(jsondata: dict, message_data: dict):print(f"Merger:\n{message_data}“, flush=True, file=sys.stderr)  # remidentifier = message_data[“plugs”]if identifier not in jsondata.keys():message_data[“energy_list_30d”] = [message_data[“energy_list_30d”]]jsondata[identifier] = message_dataprint(f"Merger_2:\n{jsondata}”, flush=True, file=sys.stderr)  # remreturnjsondata[identifier][“energy_list_30d”].append(message_data[“energy_list_30d”])
def parse_and_send_new_datapoint(data: dict):parsed_time = data[“time_plug”].split(“+”)[0].replace(“T”, " “)dt0 = dt.strptime(parsed_time, “%Y-%m-%d %H:%M:%S”).replace(hour=0, minute=0, second=0)for it, datapoint in enumerate(data[“energy_list_30d”]):  # TODO: zero to begin of day# 2025-07-29 15:48:35timestamp = int((dt0 - datetime.timedelta(days=it)).timestamp())print(f"TS:\nsetting value {datapoint} at date {dt.fromtimestamp(timestamp).strftime(‘%Y-%m-%d %H:%M:%S’)}”,flush=True,file=sys.stderr,)  # remprint(f’mqtt_consumer,device_id={data[“device_id”]},host={data[“host”]},mac={data[“mac”]},msg_id={data[“msg_id”]},plugs={data[“plugs”]} num={data[“num”]},energy_list_30d={datapoint},time_plug=“{data[“time_plug”]}” {timestamp}‘,flush=True,file=sys.stdout,)print(f’QMSG: mqtt_consumer,device_id={data[“device_id”]},host={data[“host”]},mac={data[“mac”]},msg_id={data[“msg_id”]},plugs={data[“plugs”]} num={data[“num”]},energy_list_30d={datapoint},time_plug=“{data[“time_plug”]}” {timestamp}’,flush=True,file=sys.stderr,)return True
def main():message = “”  # remtry:key_list = jsondata = {}  # {MAC: data_dict}# with open(“telegraf.data”, “a”, encoding=“utf-8”) as fifo:with open(“/etc/telegraf/telegraf.data”, mode=“a”, encoding=“utf-8”) as fifo:for line in sys.stdin:message = line.rstrip()datapoint = filter_by_msgid(line)if datapoint:merge_messages(jsondata, datapoint)fifo.write(f"{datapoint}\n")  # remfifo.write(f"{‘*’ * 50}\n")  # remfor key, value in jsondata.items():if int(value[“num”]) == len(value[“energy_list_30d”]):if parse_and_send_new_datapoint(value):key_list.append(key)fifo.write(f"{‘-’ * 50}\n")  # remfifo.write(f"{jsondata}\n")  # remfifo.write(f"{‘-’ * 50}\n")  # remfor key in key_list:print(f"Key_Clean:\n{key}“, flush=True, file=sys.stderr)  # remdel jsondata[key]if not jsondata and key_list:print(“Keylist_Clean”, flush=True, file=sys.stderr)  # remkey_list.clear()except Exception as excp:print(f"Exception:\n{excp}\n{message}”,flush=True,file=sys.stderr,)
if name == “main”:main()

@Nils_Franke How exactly are you hoping for this to be parsed. The structure of the line protocol that’s being generated matches the structure of the data that I’m seeing in your screenshot (obviously different values though). These are the lines of line protocol being written:

mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=165,time_plug=\"2025-08-05T15:04:19+00:00\" 1752098400
mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=164,time_plug=\"2025-08-05T15:04:19+00:00\" 1752012000
mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=168,time_plug=\"2025-08-05T15:04:19+00:00\" 1751925600
mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MK117NB-136C/TF00006/device_to_app num=30,energy_list_30d=175,time_plug=\"2025-08-05T15:04:19+00:00\" 1751839200

The schema, when stored in InfluxDB would be:

  • measurement: mqtt_consumer
  • tags:
    • device_id
    • host
    • mac
    • msg_id
    • plugs
  • fields
    • num
    • energy_list_30d
    • time_plug

The last number on each line is parsed as the timestamp for the data point. I assume this is a second-based Unix timestamp in your example. By default, query results are sorted by _time, which they are in your screenshot (it’s the 6th column from the left).

Hi @scott ,
thank you for your quick reply, appreciate it. I am sorry, for not making my problem clear.

The problem is not the parsing itself, fields and tags are handled accordingly, although there is a mismatch between “energy_list” and “energy_list_30d” which does not have priority atm, but the problem is indeed the different values as soon as influx stores them. They change on a random pattern.

First position in the example message, with its correct value and timestamp: 165, 2025-07-10
same position handled by the script, with correct value and timestamp: 165, 2025-07-10
position stored in Influx, wrong value correct timestamp: 152, 2025-07-10
In effect the value 165 changes to the value 152 for no reason. The value 152 is nowhere in the telegraf-feed, nor in the sent message. The difference of 13(165-152=13) is not consistent with the other values and there is no explanation where the 152 comes from

There are 30 positions(values) in each message and none of them gets stored correctly, all other information is good, it is just the value that changes. None of the four values you can see in the influx-screenshot appear anywhere else in the array, so it is not a sorting issue.
I have direct access to the sensor, so I know that the numbers arrive correctly in telegraf and the script does not change them.

Let me know if you need more information.

But how are you generating the second-based epoch timestamp for the line protocol? If you use the same timestamp for all values in the energy array, these values are just going to overwrite each other when sent to InfluxDB. If multiple points are written to InfluxDB with the same timestamp, tag set, and field, it sees those as a duplicate point and overwrites the old field value with the new field value. If you don’t include the timestamp in the line protocol, InfluxDB uses the time it receives the batch as the timestamp, so you could potentially have multiple values from different arrays inheriting the same timestamp.

def parse_and_send_new_datapoint(data: dict):
    parsed_time = data["time_plug"].split("+")[0].replace("T", " ")
    dt0 = dt.strptime(parsed_time, "%Y-%m-%d %H:%M:%S").replace(
        hour=0, minute=0, second=0
    )
    for it, datapoint in enumerate(
        data["energy_list_30d"]
    ):  # TODO: zero to begin of day
        # 2025-07-29 15:48:35
        timestamp = int((dt0 - datetime.timedelta(days=it)).timestamp())

We take the timestamp value from the initial message, create a unix timestamp and subtract each day “passed” and then add that to the each value in the array and create a new message for that datapoint.

When the next message comes in the following day, position 1 becomes position 2 in the array, gets the same timestamp and the value will be overwritten, which we intended, because yesterdays power consumption is the only value that is going to change in the future and thus needs to be overwritten. Any other energy value in the array wont change so it can be overwritten.
Yet the energy values are stored incorrectly in influx.

So as you mentioned before “_time” should be our newly created timestamp, which it is not. Does influx ignore/replace the timestamp if it doesnt match the arrival time of the message?

Can I set the logging from influx in a way, that I can see what is happening with the data?
Right now it only seems to log the startup process, when restarted, even though I set trace_logging=true.

No, if the line protocol includes a Unix timestamp, it uses that as the timestamp. It only uses an implicit timestamp if the line protocol doesn’t include one.

You can define a custom log level when you start InfluxDB. This may or may not produce helpful logs on write. The writes themselves aren’t producing any errors, so I don’t know if anything additional is going to be logged.

Question–looking back at the data, should the time_plug field match the unix timestamp?

No, time_plug we only keep as a reference for the arrival time of the messages.
We calculate the timestamp for the specific values in the array based on time_plug(but we could also do that based on the timestamp). The only information relevant to the entire calculation is which day the message arrived.
We dont really need time_plug at all only for later validation, but the timestamp needs to match the day for which the value has been recorded.

This definitely has be scratching my head :thinking:. What query are you using to query the data?

I feel you^^

The query looks like this:

from(bucket: "untouchedBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
  |> filter(fn: (r) => r["_field"] == "energy_list")
  |> filter(fn: (r) => r["device_id"] == "TF00006")
  |> filter(fn: (r) => r["host"] == "Fred")
  |> filter(fn: (r) => r["msg_id"] == "3070")

Or am I misunderstanding?

Ok, so it’s just a from() |> range() |> filter() query, so there isn’t anything that would cause any weirdness in producing values. Sometimes, if you’re using the Query Builder, it will automatically add downsampling and aggregation to the query, but it’s not doing it here. I’m curious to see if the line protocol that’s being output to InfluxDB matches the line protocol that is output in your Telegraf logs. Try adding an output to a file just to confirm.

# ...

[[outputs.file]]
  files = ["/path/to/output_file.txt"]
  data_format = "influx"

Good to know, that this function is kind of inbuilt. As you see in this Topic, I had a python script to do exactly this: Understanding Telegraf

Anyway, this is the output from the “output_file.txt”, we adjusted the parser script to only return the values for today and yesterday:

mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MyTopic num=30,energy_list_30d=90,time_plug="2025-08-14T12:49:22+00:00" 1755122400
mqtt_consumer,device_id=TF00006,host=Fred,mac=XXXXXXXXXXXX,msg_id=3070,plugs=MyTopic num=30,energy_list_30d=171,time_plug="2025-08-14T12:49:22+00:00" 1755036000

This is the original message sent out from the Broker:

{
  "msg_id" : 3070,
  "device_info" : {
    "device_id" : "TF00006",
    "mac" : "XXXXXXXXXXXX"
  },
  "data" : {
    "timestamp" : "2025-08-14T12:49:22+00:00",
    "num" : 30,
    "energy" : [ 90, 171, 170, 178, 172, 171, 158, 146, 154, 147, 145, 139, 146, 147, 146, 161, 159, 184, 164, 164, 172, 159, 146, 163, 183, 181, 159, 152, 158, 144 ]
  }
}

And the numbers are correct, except inside of influx.

Is the data written to InfluxDB by these two specific lines of line protocol wrong after you write it?

Yes it is:


The 14th doesnt show up.
And “_time” is not the timestamp we set(which should be 2025-08-13T00:00:00Z)

Heres the query for this screenshot:

from(bucket: "energybucket1")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
  |> filter(fn: (r) => r["_field"] == "energy_list_30d")
  |> filter(fn: (r) => r["device_id"] == "TF00006")
  |> filter(fn: (r) => r["msg_id"] == "3070")

I chose “Past 2d” as Timerange option

The timestamps in the line protocol are acutally:

2025-08-13T22:00:00.000000000Z
2025-08-12T22:00:00.000000000Z

When you write data to InfluxDB, InfluxDB parses them as UTC timestamps. It looks like these may be generated from a local timestamp without the correct offset.