Multiple mqtt_consumer

I’m tying to consume MQTT topics that have varying data types (data_format = "value"). I saw this post: https://community.influxdata.com/t/mqtt-consumer-with-multiple-value-types/5666/5
It seems that I need to create an mqtt_consumer for each set of topics with a given data type. When I try to do this, with a single server, I see disconnection errors in the logs stating [inputs.mqtt_consumer]: Error in plugin: connection lost: EOF. It seems that multiple connections to the same MQTT broker are being attempted and causing issues.
Am I misunderstanding how to handle multiple MQTT value data types.

I can see this as a potential problem with the multiple plugin method, how many plugins do you have defined and what MQTT broker are you using?

I’m currently trying this with just two MQTT plug-ins connecting to the same mosquitto broker. Long term, it seems that it would be best to have the ability to define data types with each topic in a single plug-in definition. I have not looked at this plug-in code. If you are familiar with it, can you comment on whether this is worth pursuing?

1 Like

Is this with Telegraf 1.9? I would be surprised if 2 connections was enough to trigger MQTT to close the connection.

The plugin is using the data format system to parse messages, which only allows for a single parser configuration per plugin. I think it would be possible to create a data_type = "auto" that converts based on the data (42 -> int 42.0 -> float, etc).

I wonder if a better way to go about this would be grabbing all values as strings, and then converting them with a processor. Here is an example that uses the topic name to pick a type:


# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  servers = ["tcp://debian-stretch-mqtt:1883"]
  qos = 0
  connection_timeout = "30s"
  topics = [
    "telegraf/integer/#",
    "telegraf/float/#",
  ]
  data_type = "string"
  data_format = "value"

[[processors.converter]]
  namepass = "mqtt_consumer"
  [processors.converter.tagpass]
    topic = ["telegraf/integer/*"]
  [processors.converter.fields]
    integer = ["value"]

[[processors.converter]]
  namepass = "mqtt_consumer"
  [processors.converter.tagpass]
    topic = ["telegraf/float/*"]
  [processors.converter.fields]
    float = ["value"]
1 Like

Thanks, @daniel. I’m new to Telegraf and am not aware of all the possibilities. This looks promising. I’ll play with it.

@daniel, thanks for the pointers here. I think this could work but I’m now hitting type conflicts when writing to Influx.

I’m trying to understand the converters but I don’t see any documentation on things like “namepass” and “tagpass”. Can you point me to the relevant info?

In a specific case, I’d be happy to convert both the integer and float strings to integer values but the converter is just dropping all the floats when I try to do this. If I leave them as floats, the type conflict occurs.

I’d also be happy creating separate fields for the floats and integers but I don’t see how to do that either.

Here is a link to the metric-filtering documentation, check the examples lower on the page as well.

When a metric is selected (namepass/tagpass/etc) the conversions will take place, if it is not selected it will be skipped and the next processor will have a chance to handle it.

You are able to convert from a string like: 42.3 to integer 42, although it always seems to round down 42.742.

Thanks, @daniel. I’ll look at the documentation. It seems that my experience is not matching what you stated. I changed your example so that both converters were integer = .... The MQTT topic that was originally an integer (no decimal in the string) gets written to the DB just fine but the float topic does not. I do not see anything in the log and assume it is just being dropped.

I do see logging if I convert the float topic to a float but the log is from the Influx error.

I read through the metric-filtering documentation. Do I understand correctly that I could combine the two converters under one namepass = "mqtt_consumer" if I’m trying to convert everything to an integer?

I just tried this and now see this error in the log any time a float value comes in:

[processors.converter] error converting to integer [string]: 64.1241

Sorry, I must have been testing it incorrectly, it can’t handle numeric strings with decimal part. I opened 5518 with a fix, we can include it in 1.10.0 which is coming out soon. This also adds the usual rounding to the conversion (2.5 → 3).

Thanks, @daniel. That should resolve my issue.