Mqtt over telegraf into influxdb

Hey there, I installed influxdb 2.7.5 on my server. I also installed telegraf to connect my mqtt devices to influx. It works so far accept one device and I dont know what to do. Here is my config for mqtt so far:

[[inputs.mqtt_consumer]]
servers = [“tcp://192.168.73.238:1883”]
topics = [“#”]
qos = 0
connection_timeout = “30s”
data_format = “json”
json_query = “.”

The data i am getting is formated like this:
{“counter”:147238,“temp1”:24,“temp2”:35.5}
With this formation it works fine, but I also have data formated like this:
{“value”: “662.75822”,“raw”: “00662.75822”,“pre”: “662.75822”}
This data is not getting captured/stored. Why? is it because its a String instead of a Float?

Yes - take a look at the json parser readme and look at the json_string_fields config option. You can then use a processor to convert those a numeric types if you want as well.

Thank you for your answer,
How do I use it?

from the readme:

Array of glob pattern strings or booleans keys that should be added as string fields.

You specify the field keys to read as a string. Give it a shot, however, because you have multiple topics that you are trying to parse with one parser this may require you to switch to the json_v2 or xpath parser, but try it first.

Hey, thanks for your engagement, I found another solution. I wrote a python script which formates the data into floats and now it works fine.
But despite that thank you for your help!!

2 Likes

Hello @mallo321123
Would you please be willing to share your solution with the community? Thanks!

oh, yea shure. Here is my Python script:

import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
    client.subscribe("watermeter/m3/json")

def on_message(client, userdata, msg):
    data = json.loads(msg.payload.decode())
    formated_data = {
        "zaelerstand": float(data ["value"]),
        "durchfluss": float(data["rate"])
    }
    formated_json = json.dumps(formated_data)
    client.publish("watermeter", formated_json) 


client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
client.on_connect = on_connect
try:
    client.on_message = on_message
except Exception as e:
    print("fail")

client.connect("broker-ip", broker-port, 60) #configure your broker here
client.loop_forever()

I made a Systemd job in a container. It crashes sometimes, but i just used two lines of config:

StartLimitInterval=120
StartLimitBurst=99

Its working great so far.
Just that I mentioned it, this script is for this project: GitHub - jomjol/AI-on-the-edge-device: Easy to use device for connecting "old" measuring units (water, power, gas, ...) to the digital world