I’m tying to consume MQTT topics that have varying data types (data_format = "value"
). I saw this post: https://community.influxdata.com/t/mqtt-consumer-with-multiple-value-types/5666/5
It seems that I need to create an mqtt_consumer
for each set of topics with a given data type. When I try to do this, with a single server, I see disconnection errors in the logs stating [inputs.mqtt_consumer]: Error in plugin: connection lost: EOF
. It seems that multiple connections to the same MQTT broker are being attempted and causing issues.
Am I misunderstanding how to handle multiple MQTT value data types.
I can see this as a potential problem with the multiple plugin method, how many plugins do you have defined and what MQTT broker are you using?
I’m currently trying this with just two MQTT plug-ins connecting to the same mosquitto broker. Long term, it seems that it would be best to have the ability to define data types with each topic in a single plug-in definition. I have not looked at this plug-in code. If you are familiar with it, can you comment on whether this is worth pursuing?
Is this with Telegraf 1.9? I would be surprised if 2 connections was enough to trigger MQTT to close the connection.
The plugin is using the data format system to parse messages, which only allows for a single parser configuration per plugin. I think it would be possible to create a data_type = "auto"
that converts based on the data (42
-> int 42.0
-> float, etc).
I wonder if a better way to go about this would be grabbing all values as strings, and then converting them with a processor. Here is an example that uses the topic name to pick a type:
# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
servers = ["tcp://debian-stretch-mqtt:1883"]
qos = 0
connection_timeout = "30s"
topics = [
"telegraf/integer/#",
"telegraf/float/#",
]
data_type = "string"
data_format = "value"
[[processors.converter]]
namepass = "mqtt_consumer"
[processors.converter.tagpass]
topic = ["telegraf/integer/*"]
[processors.converter.fields]
integer = ["value"]
[[processors.converter]]
namepass = "mqtt_consumer"
[processors.converter.tagpass]
topic = ["telegraf/float/*"]
[processors.converter.fields]
float = ["value"]
Thanks, @daniel. I’m new to Telegraf and am not aware of all the possibilities. This looks promising. I’ll play with it.
@daniel, thanks for the pointers here. I think this could work but I’m now hitting type conflicts when writing to Influx.
I’m trying to understand the converters but I don’t see any documentation on things like “namepass” and “tagpass”. Can you point me to the relevant info?
In a specific case, I’d be happy to convert both the integer and float strings to integer values but the converter is just dropping all the floats when I try to do this. If I leave them as floats, the type conflict occurs.
I’d also be happy creating separate fields for the floats and integers but I don’t see how to do that either.
Here is a link to the metric-filtering documentation, check the examples lower on the page as well.
When a metric is selected (namepass/tagpass/etc) the conversions will take place, if it is not selected it will be skipped and the next processor will have a chance to handle it.
You are able to convert from a string like: 42.3
to integer 42
, although it always seems to round down 42.7
→ 42
.
Thanks, @daniel. I’ll look at the documentation. It seems that my experience is not matching what you stated. I changed your example so that both converters were integer = ...
. The MQTT topic that was originally an integer (no decimal in the string) gets written to the DB just fine but the float topic does not. I do not see anything in the log and assume it is just being dropped.
I do see logging if I convert the float topic to a float but the log is from the Influx error.
I read through the metric-filtering documentation. Do I understand correctly that I could combine the two converters under one namepass = "mqtt_consumer"
if I’m trying to convert everything to an integer?
I just tried this and now see this error in the log any time a float value comes in:
[processors.converter] error converting to integer [string]: 64.1241
Sorry, I must have been testing it incorrectly, it can’t handle numeric strings with decimal part. I opened 5518 with a fix, we can include it in 1.10.0 which is coming out soon. This also adds the usual rounding to the conversion (2.5 → 3).
Thanks, @daniel. That should resolve my issue.