Hi, beginner here.
This far I’ve only ever used the API using the Line Protokoll for writing data.
I am trying to setup a logger to an InfluxDB via telegraf with MQTT broker.
I managed to esablish a connection and even got some data into the database,
but I’m having trouble getting it into the right format.
The upper one is what i managed to get in there this far.
But it should look more like the lower one:
The payload of the MQTT topic is just a float.
Below is the config, i am currently using along with the error it gives me:
2025-05-20T13:53:00Z E! [inputs.mqtt_consumer] Error in plugin: unable to convert field 'Waschtank-Ist-Temperatur' to type float: strconv.ParseFloat: parsing "Waschtank-Ist-Temperatur": invalid syntax
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "5s"
precision = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
urls = ["http://10.*.*.*:8086"]
## Token for authentication.
token = "$INFLUX_TOKEN"
## Organization is the name of the organization you wish to write to; must exist.
organization = "SMI"
## Destination bucket to write into.
bucket = "testbucket"
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
servers = ["tcp://10.*.*.*:1883"]
## Topics that will be subscribed to.
topics = ["Weichenachse/BVL/SPS/LOG/9999/+"]
#
#
# ## The message topic will be stored in a tag specified by this value. If set
# ## to the empty string no topic tag will be created.
#
client_id = "InfluxDB-WeichenachseBVL"
#
# ## Username and password to connect MQTT server.
username = "username"
password = "password"
##test json payload
#data_format = "json"
#json_string_fields = ["uplink_message_frm_payload"]
#
data_format = "value"
data_type = "float"
#
[[inputs.mqtt_consumer.topic_parsing]]
topic = "Weichenachse/BVL/SPS/LOG/9999/+"
measurement = "_/measurement/_/_/_/_"
tags = "_/_/_/_/tag/_"
fields = "_/_/_/_/_/value"
#[inputs.mqtt_consumer.topic_parsing.types]
# value = "float"
I see that you’ve made significant progress by changing your approach. Using JSON format is a good solution since it allows you to include both the measurement name and its value in a structured way.
Let’s analyze your current configuration:
You’ve shortened the MQTT topic to Weichenachse/BVL/SPS/LOG/9999 (removing the measurement name from the topic)
You’re sending the data in JSON format: {"Waschtank-Ist-Temperatur":24.7}
You’ve configured Telegraf to parse this JSON with data_format = "json"
You’re setting a static measurement name using name_override = "BVL"
However, there’s a potential issue with your configuration. The topic_parsing section might not be working as expected because:
Your actual topic is Weichenachse/BVL/SPS/LOG/9999 but your parsing pattern is Weichenachse/BVL/SPS/LOG/+
You’re trying to extract a tag called “Maschinennummer” but that doesn’t appear to be in your topic
Here’s a refined configuration that should work better with your JSON payload approach:
[[inputs.mqtt_consumer]]
servers = ["tcp://10.*.*.*:1883"]
topics = ["Weichenachse/BVL/SPS/LOG/9999"]
client_id = "InfluxDB-WeichenachseBVL"
username = "username"
password = "password"
# Parse the payload as JSON
data_format = "json"
# Set static tags that you want for all measurements
[inputs.mqtt_consumer.tags]
Maschinennummer = "9999"
location = "BVL"
This configuration:
Parses the JSON payload, which will create a field for each key-value pair in your JSON
Adds static tags for machine number and location
If you want to make the measurement name dynamic (instead of using the field name from JSON), you might need a processor to transform the data after it’s ingested:
[[processors.rename]]
# Convert specific field to measurement name
# This processor will run after the MQTT consumer
[[processors.rename.replace]]
field = "Waschtank-Ist-Temperatur"
dest = "temperature"
measurement = "Waschtank-Ist"
Alternatively, if you want to have different machine numbers in the tag, you could extract it from the topic:
[[inputs.mqtt_consumer]]
servers = ["tcp://10.*.*.*:1883"]
topics = ["Weichenachse/BVL/SPS/LOG/+"] # Use + wildcard for machine number
client_id = "InfluxDB-WeichenachseBVL"
username = "username"
password = "password"
data_format = "json"
# Use topic templates to extract machine number
topic_tag = "" # Don't add the full topic as a tag
topic_parsing = true
topic_templates = [
"Weichenachse/BVL/SPS/LOG/{{ .Maschinennummer }}"
]
Let me know if this aligns with what you’re trying to achieve, or if you need further adjustments!
Is there a way to make telegraf forget a message it recieved, but can not wirte into a bucket becouse the token does not have permission to write there?