Struggling with processing MQTT topics having similar names

I’ve now spent several days to figure out how to write a proper Telegraf configuration but I couldn’t get it right:

I have a physical device and I want to record information from its components. I subscribe to several MQTT topics (which are named similarly) and they cover basic information like serial numbers, installed components, firmware versions and so on.

So I defined a bucket named device_info and should receive data extracted from multiple MQTT topics. One of the interesting data points are the product names of the various components of the device.

Example MQTT messages looks like this:

t/c0619/battery/1/ProductName
{“value”: “LiONbatt”}

t/c0619/cpu/1/ProductName
{“value”: “ARM Cortex-A72”}

I set the configuration like this:

...
[[inputs.mqtt_consumer]]
  servers = ["ssl://mqtt.myserver.com:8883"]
  topics = [
    "t/+/cpu/1/ProductName",
    ...
    "t/+/battery/1/ProductName",
    ...
  ]
  topic_tag = ""    # Don't add a tag with the individual topic which would prevent combining different fields into one measurement entry.
  name_override = "device_info"
...

And then I set the parsing rules to feed the MQTT data into the bucket:

  # cpu_product_name
  [[inputs.mqtt_consumer.topic_parsing]]
    alias = "device_info__cpu__product_name"
    topic = "t/+/cpu/1/ProductName"
    tags = "_/site_id/_/_/pivot_field"
    [[inputs.mqtt_consumer.json_v2]]
      [[inputs.mqtt_consumer.json_v2.field]]
        path = "value"
        type = "string"
    [[processors.pivot]]
      # Let the tag become the actual measurement value
      tag_key = "pivot_field"
      value_key = "value"
    [[processors.rename]]
      namepass = ["device_info__cpu__product_name"]
      [[processors.rename.replace]]
        field = "ProductName"
        dest = "cpu_product_name"

  # battery_product_name
  [[inputs.mqtt_consumer.topic_parsing]]
    alias = "device_info__battery__product_name"
    topic = "t/+/battery/1/ProductName"
    tags = "_/site_id/_/_/pivot_field"
    [[inputs.mqtt_consumer.json_v2]]
      [[inputs.mqtt_consumer.json_v2.field]]
        path = "value"
        type = "string"
    [[processors.pivot]]
      # Let the tag become the actual measurement value
      tag_key = "pivot_field"
      value_key = "value"
    [[processors.rename]]
      namepass = ["device_info__battery__product_name"]
      [[processors.rename.replace]]
        field = "ProductName"
        dest = "batt_product_name"

I end up not receiving the CPU product name in column cpu_product_name and the battery name in column batt_product_name. I either had the renaming processors not firing, or that both values overwrite them mutually.

I’ve tried many different things with namepass, tags, fields and so on but they all didn’t work out. You help is much appreciated!

Even ChatGPT couldn’t help me here :face_with_hand_over_mouth:

Can you use the [[outputs.file]] output and show what you are getting and then what you expect please?

Sure. Sorry if this was not perfectly clear.

My desired output has the battery and CPU information going into separate columns within the same row:

ProductName batt_product_name cpu_product_name site_id time
LiONbatt ARM Cortex-A72 c061abcd 2024-01-27T05:58:40Z

Here’s my current (condensed) configuration file how I tried to achieve it:

# Configuration for telegraf agent
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = true
  debug = true
  quiet = false

# Bucket 'site'
[[outputs.influxdb_v2]]
  urls = ["https://eu-central-1-1.aws.cloud2.influxdata.com"]
  token = "$INFLUX_TOKEN"
  organization = "Testing"
  bucket = "site"
  user_agent = "telegraf_bucket_site"
  namepass = ["device_info"]


[[inputs.mqtt_consumer]]
  servers = ["ssl://mqtt.myserver.com:8883"]
  username = "username"
  password = "pwd"
  topics = [
    "t/+/cpu/1/ProductName",
    "t/+/battery/1/ProductName",
  ]
  topic_tag = ""    # Don't add a tag with the individual topic which would prevent combining different fields into one measurement entry.
  name_override = "device_info"
  data_format = "json_v2"
  precision = "10s"

  # cpu_product_name
  [[inputs.mqtt_consumer.topic_parsing]]
    alias = "device_info__cpu__product_name"
    topic = "t/+/cpu/1/ProductName"
    tags = "_/site_id/_/_/pivot_field"
    [[inputs.mqtt_consumer.json_v2]]
      [[inputs.mqtt_consumer.json_v2.field]]
        path = "value"
        type = "string"
    [[processors.pivot]]
      tag_key = "pivot_field"
      value_key = "value"
    [[processors.rename]]
      namepass = ["device_info__cpu__product_name"]
      [[processors.rename.replace]]
        field = "pivot_field"
        dest = "cpu_product_name"
  
  # batt_product_name
  [[inputs.mqtt_consumer.topic_parsing]]
    alias = "device_info__battery__product_name"
    topic = "t/+/battery/1/ProductName"
    tags = "_/site_id/_/_/pivot_field"
    [[inputs.mqtt_consumer.json_v2]]
      [[inputs.mqtt_consumer.json_v2.field]]
        path = "value"
        type = "string"
    [[processors.pivot]]
      tag_key = "ProductName"
      value_key = "value"
    [[processors.rename]]
      namepass = ["device_info__battery__product_name"]
      [[processors.rename.replace]]
        field = "ProductName"
        dest = "batt_product_name"

With the configuration above I get this:

ProductName batt_product_name cpu_product_name site_id time
ARM Cortex-A72 c061abcd 2024-01-27T05:58:40Z
LiONbatt c061abcd 2024-01-27T05:58:50Z

So both topics end up in column ProductName instead of their individual columns because the renaming processor drops the input.

In contrast, if I remove the namepass = ... condition in the two renaming sections the inputs are processed, but I end up seeing only the CPU in the correct collumn but the battery the but batt_product_name column is empty:

ProductName batt_product_name cpu_product_name site_id time
ARM Cortex-A72 c061abcd 2024-01-27T06:05:30Z

Again, my desired output is this:

ProductName batt_product_name cpu_product_name site_id time
LiONbatt ARM Cortex-A72 c061abcd 2024-01-27T05:58:40Z

What do I have to change to have both the cpu_product_name and the batt_product_name in their respective columns?

So that output is not from outputs.fiile :wink: Why I wanted you to show that output is I believe the metrics you are collecting are in two different metrics, not one given you have two different topic parsers configured.

To merge two different metrics, if they have the same tag set + field + timestamp, you can use the merge processors. Otherwise you have to use an aggregator to combine them together.

Ah now I understand what you meant with the outputs file. :slight_smile:
After I replaced the [[outputs.influxdb_v2]] section with

[[outputs.file]]
  files = ["stdout"]

I get this output in the console:

system_info,site_id=c061abcd cpu_product_name="LiONbatt" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="LiONbatt" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="ARM Cortex-A72" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="ARM Cortex-A72" 1706369720000000000

I don’t understand why it is double output but it’s what the logs give me.

I understood from your answer that having two [[inputs.mqtt_consumer.topic_parsing]] might lead to my problem of not being able to merge both MQTT messages together easily. Can you give me a sample on how I have to modify my configuration above, so I can put the two different MQTT topics ending with the same suffix (“ProductName”) into the same metric (with different names cpu_ / batt_)?

@jpowers Can you give me a sample on how I have to modify my configuration above? I would really appreciate that, as it can help me continue. :slight_smile:

Sorry missed your reply!

Are you sure they are all called cpu_product_name? That is going to cause some difficulty here because you would need to rename that field first.

So you want to take the following:

system_info,site_id=c061abcd cpu_product_name="LiONbatt" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="LiONbatt" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="ARM Cortex-A72" 1706369720000000000
system_info,site_id=c061abcd cpu_product_name="ARM Cortex-A72" 1706369720000000000

And create:

system_info,site_id=c061abcd cpu_product_name="ARM Cortex-A72",batt_product_name="LiONbatt" 1706369720000000000

Essentially, you could try using the merge aggregator. Take a look at the example there. However, if the field names are in fact the same that won’t work. The fields need unique names.