How does MQTT consumer topic parsing work

Good day! I have recently updated my Telegraf and InfluxDB docker environments to v1.21.4 and v2.1.1. I am adding aggregate tasks in InfluxDB, and for that I want to transform the data that is sent by my Telegraf clients that use the MQTT_consumer input plugin. I would like to use the topic levels to determine the measurement and topic names, so I can aggregate based on the topic. I came across the topic parsing feature, which sounds like it does what I want. I could not get it to work, so I setup a separate test environment, with the following Telegraf configuration:

[agent]
  interval = "60s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  flush_interval = "60s"
  debug = true
  omit_hostname = true

[[outputs.influxdb_v2]]
  urls = ["$INFLUX_URL"]
  token = "$INFLUX_TOKEN"
  organization = "$INFLUX_ORG"
  bucket = "$INFLUX_BUCKET"
  timeout = "10s"

[[inputs.mqtt_consumer]]
  servers = [
    "tcp://$MQTT_IP:$MQTT_PORT",
  ]

  qos = 1
  connection_timeout = "300s"
  topics = [
    "homewizard/#",
    "parser/#"
  ]
  persistent_session = false
  client_id = "Telegraf_HomeWizard_Test"
  data_format = "json"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "_/topic"
    measurement = "measurement/_"

I added the topic_parsing section. I only specified the topic and measurement parsings, as the field is in the json data. The parser/# topic was added for testing. I have a Docker container that sends MQTT messages to topic homewizard/dbtemperatureall for example, with a JSON message as contents. The message format is:
[{"Id":1,"Temp":8.2,"Date":"2022-02-20T11:27:00.000Z","Name":"Buiten"}]
I also tried sending test messages interactively to topic parser/test with message: {"contents": 1}
I expected these messages to be put in measurements homewizard and parser, with topics being dbtemperatureall and test. So I expected the first level in the topic to become the measurement name, and the second level the topic. However, data is stored in measurement mqtt_consumer, and topic is the complete topic level, so homewizard/dbtemperatureall for example. Same for my test message. I know that the parser is active, because Telegraf won’t start if I just comment out the measurement line in the parser section.I’m also sending to an InfluxDB v2 environment setup specifically for parse testing, so I know the data is being received into InfluxDB. It looks like the messages are not parsed at all.

I read this article but do not see where my logic is failing. The feature is fairly new, so there is not a lot of information to be found.

Does anyone see where I go wrong?

Telegraf will use the input plugin name by default to set the measurement name. You can override this with the name_override option. If you want to have different measurements, you would want to have multiple mqtt_consumer definitions, one for each measurement.

Hope that helps!

1 Like

Thanks for your reply. However, I do not understand. I suppose that what you describe here is the way to go if you do not use topic parsing. However, I thought that topic parsing could be used to extract information from the actual topic, and use it for measurement name and tags, so one would be more flexible. As described in the MQTT consumer input plugin code / example config:

[[inputs.mqtt_consumer.topic_parsing]]
topic = “telegraf/one/cpu/23”
measurement = “//measurement/"
tags = "tag/
//”
fields = “//_/test”
[inputs.mqtt_consumer.topic_parsing.types]
test = “int”


Result:

```shell
cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291

As I read this, the measurement used is not the default, but changed to “cpu” based on topic parsing?

Ah right, when using topic parsing you can set the measurement name as a part of the config. If you are not seeing your custom measurement name, then something is up with the config.

As far as your config, can you provide what output you are currently getting?

I deleted the bucket parser and created it anew
I sent to topic parser/lvl1 this: {“temp”: 21}
I use the InfluxDB Explore and downloaded all the bucket data as CSV. This is what I get:

#group	FALSE	FALSE	TRUE	TRUE	FALSE	FALSE	TRUE	TRUE	TRUE
#datatype	string	long	dateTime:RFC3339	dateTime:RFC3339	dateTime:RFC3339	double	string	string	string
#default	mean								
	result	table	_start	_stop	_time	_value	_field	_measurement	topic
		0	2022-03-02T15:42:03.085912783Z	2022-03-02T16:42:03.085912783Z	2022-03-02T16:40:40Z	21	temp	mqtt_consumer	parser/lvl1

I expected the measurement to be parser and the topic lvl1, based on the topic parsing in my config (which is exactly the same as in the original post):

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "_/topic"
    measurement = "measurement/_"