Overwritting Issue with telegraf mqtt in influxdb

I am recievig the following input data via telegraf mqtt in influxdb. The issue i am facing is that mqtt is overwriting the Latitude and Longitude values. Instead of displaying all three object values in influxdb. I can only see last object value and rest are overwritten.

Input data:
{“Latitude”:48.8108060, “Longitude”:13.5473266},
{“Latitude”:48.8103460, “Longitude”:13.5481769},
{“Latitude”:48.8070218, “Longitude”:13.5567778},

To address the above issue, what i want to do is that process the incomming data and convert the input data to the processed data below:
{“wp1”: {“Latitude”:48.8108060, “Longitude”:13.5473266}},
{“wp2”:{“Latitude”:48.8103460, “Longitude”:13.5481769}},
{“wp3”:{“Latitude”:48.8070218, “Longitude”:13.5567778}},

With the above processed data i can see all three json object in the database as there _field is different.

How can i achieve this task ?

Hello @asad_khan,
You could use the execd processor plugin

I’m not sure if there is a more convenient way to do this though.
@Jay_Clifford do you know?

Thanks for your reply. It means alot to me. I am trying to fix this issue for last 7 days. I am new to influxdb so i am unable to debug the error.

I have used the same approch you suggested. But i always get some error that i am unable to debug

  servers = ["abc:8883"]
  username = "abc"
  password = "xyz"
  topics = ["abc/+/mission/waypoints"]
  data_format = "json"
  name_override = "mqtt_json1"

  command = ["C:\Users\abc\AppData\Local\Programs\Python\Python312\python.exe", "D:\Docker-influxdb-mqtt\transform_waypoints.py"]
  data_format = "json"

and here is my transform_waypoints.py that i am using to transform my input data

import json
import sys

def transform_waypoints(data):
    processed_data = []
    for i, waypoint in enumerate(data, start=1):
        processed_data.append({f"wp{i}": waypoint})
    return processed_data

def main():
    for line in sys.stdin:
            data = json.loads(line)
            if isinstance(data, list):
                processed_data = transform_waypoints(data)
                # Pass through non-list data as is
                print(line, end='')
        except json.JSONDecodeError as e:
            sys.stderr.write(f"Error decoding JSON: {e}\n")

if __name__ == "__main__":

I have added the path in docker compose file too

    image: telegraf:latest
    container_name: telegraf
    restart: always
      - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
      - ./transform_waypoints.py:/etc/telegraf/transform_waypoints.py:ro

But i always get this error when i run the docker container

Error 1:

telegraf exited with code 0
telegraf  | 2024-01-30T01:10:01Z I! Loading config: /etc/telegraf/telegraf.conf
telegraf  | 2024-01-30T01:10:01Z E! error loading config file /etc/telegraf/telegraf.conf: error parsing data: line 52: invalid TOML syntax
telegraf exited with code 1

Current Error 2:

telegraf  | 2024-01-30T02:40:56Z I! [processors.execd] Starting process: C:/Users/akhan/AppData/Local/Programs/Python/Python312/python.exe [D:/Docker-influxdb-mqtt/transform_waypoints.py]
telegraf  | 2024-01-30T02:40:56Z E! [telegraf] Error running agent: starting processor processors.execd: failed to start process [C:/Users/akhan/AppData/Local/Programs/Python/Python312/python.exe D:/Docker-influxdb-mqtt/transform_waypoints.py]: error starting process: fork/exec C:/Users/akhan/AppData/Local/Programs/Python/Python312/python.exe: no such file or directory
telegraf exited with code 1

FYI this is my line 52 in which i am having error

 command = ["C:/Users/akhan/AppData/Local/Programs/Python/Python312/python.exe", "D:/Docker-influxdb-mqtt/transform_waypoints.py"]