Single MQTT Consumer for int, float & string?

Hi there!

I’m trying to setup a telegraf mqtt_consumer config. My MQTT Server get’s lots of simple valued topics that I would like to log in my influxDB.

I have topics of various data types: int, float & enum (=string?).

Do I need to handle those in 3 different [[inputs.mqtt_consumer]] instances (with data_format = "value" & data_type = "*" where *= integer, float & string?

Or is there a (reasonable) way to get it done with a single MQTT-client over a single connection to the MQTT-server?

To make my desire to handle multiple data_types in a single MQTT-client easier to understand, see my current config of two clients where most lines are annoying applications of the copy-paste-pattern:


[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*****"
  password = "*****"
  topics = [
    "valetudo/+/BatteryStateAttribute/level",
    "valetudo/+/WifiConfigurationCapability/signal",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "integer"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"
  [[processors.regex]]
  order = 1
  [[processors.regex.tags]]
  key = "category"
  pattern = "(.*)(StateAttribute)|(ConfigurationCapability)"
  replacement = "${1}"
  [[processors.template]]
  order = 2
  tag = "joined"
  template = '{{ .Tag "category" }}.{{ .Tag "variable" }}'
  [[processors.pivot]]
  order = 3
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*****"
  password = "*****"
  topics = [
    "valetudo/+/BatteryStateAttribute/status",
    "valetudo/+/StatusStateAttribute/status",
    "valetudo/+/StatusStateAttribute/detail",
    "valetudo/+/StatusStateAttribute/error",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "string"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"
  [[processors.regex]]
  [[processors.regex.tags]]
  order = 1
  key = "category"
  pattern = "(.*)StateAttribute"
  replacement = "${1}"
  [[processors.template]]
  order = 2
  tag = "joined"
  template = '{{ .Tag "category" }}.{{ .Tag "variable" }}'
  [[processors.pivot]]
  order = 3
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]

UPDATE: okey, so I think maybe I’d be able to only use a single mqtt_consumer with data_type = "string" and use [[processors.parser]] to parse integers, floats & booleans out of their string versions afterwards.

UPDATE2: by now I’ve learned that by using stupid config file formatting I’ve deluded myself into believing those [[processors.*]] blocks to only apply to exactly their immediately preceding [[inputs.*]] block. That was a false believe.

My current state of the relevant Telegraf config segment is:


[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*****"
  password = "*****"
  topics = [
    "valetudo/+/BatteryStateAttribute/level",
    "valetudo/+/BatteryStateAttribute/status",
    "valetudo/+/StatusStateAttribute/status",
    "valetudo/+/StatusStateAttribute/detail",
    "valetudo/+/StatusStateAttribute/error",
    "valetudo/+/WifiConfigurationCapability/signal",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "string"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"

[[processors.regex]]
[[processors.regex.tags]]
  namepass = ["roborock", "dreame"]
  order = 1
  key = "category"
  pattern = "(.*)(StateAttribute)|(ConfigurationCapability)"
  replacement = "${1}"

[[processors.template]]
  namepass = ["roborock", "dreame"]
  order = 2
  tag = "joined"
  template = '{{ .Tag "category" }}.{{ .Tag "variable" }}'

[[processors.pivot]]
  namepass = ["roborock", "dreame"]
  order = 3
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]

[[processors.parser]]
  namepass = ["roborock", "dreame"]
  order = 4
  parse_fields = ["Battery.level", "Wifi.signal"]
  drop_original = false
  merge = "override"
  data_format = "value"
  data_type = "integer"

The problem with this config is, that the [[processors.parser]] writes the parsed values into a new measurement called parser instead of the source measurement where I want them and I would have expected them with an option merge = "override" set…

The measurement being called parser instead of the source measurement is a bug. It can be tracked in this issue.

Glad to hear you figured out the rest :slight_smile:

1 Like

Thank you for that hint!

For now I’ve switched back to using two separate mqtt clients:

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*****"
  password = "*****"
  topics = [
    "valetudo/+/BatteryStateAttribute/level",
    "valetudo/+/WifiConfigurationCapability/signal",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "integer"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*****"
  password = "*****"
  topics = [
    "valetudo/+/BatteryStateAttribute/status",
    "valetudo/+/StatusStateAttribute/status",
    "valetudo/+/StatusStateAttribute/detail",
    "valetudo/+/StatusStateAttribute/error",
    "valetudo/+/AttachmentStateAttribute/watertank",
    "valetudo/+/AttachmentStateAttribute/mop",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "string"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"

[[processors.strings]]
  namepass = ["roborock", "dreame"]
  order = 1
  [[processors.strings.trim_suffix]]
  tag = "category"
  suffix = "StateAttribute"
  [[processors.strings.trim_suffix]]
  tag = "category"
  suffix = "ConfigurationCapability"

[[processors.template]]
  namepass = ["roborock", "dreame"]
  order = 2
  tag = "joined"
  template = '{{ .Tag "category" }}.{{ .Tag "variable" }}'

[[processors.pivot]]
  namepass = ["roborock", "dreame"]
  order = 3
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]

Though I hope in the future I’ll be able to use the processors.parser plugin to get back to using a single one to have a more concise config and less resource usage.

@medavoc139 any way you could test the artifact of this pr and see if it changes the measurement from parser to the source of the measurement?

Tiger bot will post the artifacts after the ci finishes (it normally takes about 45 minutes).

I’ve upgraded to

# telegraf --version
Telegraf 1.25.0-44fe0e84 (git: pull/12116@44fe0e84)

by downloading and installing https://output.circle-artifacts.com/output/job/e8ce735b-f1e1-4751-8627-7f57d50baff8/artifacts/0/build/dist/telegraf_1.25.0~44fe0e84-0_arm64.deb

With this the parser plugin preserves the measurement name, but it seems like it passes the string values through without parsing them to integers:

E! [outputs.influxdb_v2] Failed to write metric to default (will be dropped: 422 Unprocessable Entity): unprocessable entity: failure writing points to database: partial write: field type conflict: input field “Wifi.signal” on measurement “dreame” is type string, already exists as type integer dropped=1

UPDATE: okey so it now outputs all parsed values to the field named “value” leaving the original fields unchanged. If we instead specify drop_original = true the original fields are deleted, and only the parsed values are output with the field name = “value”.

So I guess the merge = "override" option doesn’t work for data_format = “value”, or I’ve misunderstood how it does.

UPDATE2: So since parser writes it’s result to the field = “value” when using data_format = “value”, and doesn’t preserve the input field name, I thought maybe I could resolve my problem by switching the order of my pivot & parser processors:

[[processors.parser]]
  namepass = ["roborock", "dreame"]
  order = 3
  parse_fields = ["value"]
  drop_original = false
  merge = "override"
  data_format = "value"
  data_type = "integer"
  [[processors.parser.tagpass]]
  joined = ["Battery.level", "Wifi.signal"]

[[processors.pivot]]
  namepass = ["roborock", "dreame"]
  order = 4
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]

This works, but prints errors for all the string values that can’t be parsed into integers. I’m trying to filter those out with the “tagpass” option, but it seems this is either ignored or only processed after parsing.

UPDATE3: I’ve now learned that those “subsections” of telegraf plugins need a single set of square brackets, even though many sections of documentation display them with two, and in some cases like with [[inputs.mqtt_consumer.topic_parsing]] and [[processors.strings.trim_suffix]] two sets work fine (haven’t tested those with a single set yet), just like the main section headers. Using just one set of square brackets makes those tagpass sections work as intended:


[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  username = "*********"
  password = "*********"
  topics = [
    "valetudo/+/WifiConfigurationCapability/signal",
    "valetudo/+/BatteryStateAttribute/level",
    "valetudo/+/BatteryStateAttribute/status",
    "valetudo/+/StatusStateAttribute/status",
    "valetudo/+/StatusStateAttribute/detail",
    "valetudo/+/StatusStateAttribute/error",
    "valetudo/+/StatusStateAttribute/error_description",
    "valetudo/+/AttachmentStateAttribute/watertank",
    "valetudo/+/AttachmentStateAttribute/mop",
  ]
  topic_tag = ""
  data_format = "value"
  data_type = "string"
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "valetudo/+/+/+"
  measurement = "_/measurement/_/_"
  tags = "_/_/category/variable"

[[processors.strings]]
  namepass = ["roborock", "dreame"]
  order = 1
  [[processors.strings.trim_suffix]]
  tag = "category"
  suffix = "StateAttribute"
  [[processors.strings.trim_suffix]]
  tag = "category"
  suffix = "ConfigurationCapability"

[[processors.template]]
  namepass = ["roborock", "dreame"]
  order = 2
  tag = "joined"
  template = '{{ .Tag "category" }}.{{ .Tag "variable" }}'

# couldn't use with telegraf 1.24.2, because of this bug:
# https://github.com/influxdata/telegraf/issues/12115
# https://community.influxdata.com/t/single-mqtt-consumer-for-int-float-string/27137
[[processors.parser]]
  namepass = ["roborock", "dreame"]
  order = 3
  #parse_fields = ["Battery.level", "Wifi.signal"] # before pivot, field = value, correct name = tag
  parse_fields = ["value"]
  drop_original = false
  merge = "override"
  data_format = "value"
  data_type = "integer"
  [processors.parser.tagpass]
  joined = ["Battery.level", "Wifi.signal"]

[[processors.parser]]
  namepass = ["roborock", "dreame"]
  order = 3
  parse_fields = ["value"]
  drop_original = false
  merge = "override"
  data_format = "value"
  data_type = "boolean"
  [processors.parser.tagpass]
  joined = ["Attachment.*"]

[[processors.pivot]]
  namepass = ["roborock", "dreame"]
  order = 4
  tag_key = "joined"
  value_key = "value"
  tagexclude = ["category","variable"]