MQTT message to Influxdb using telegraf

Hello,

I am facing some issues trying to “translate” an MQTT message to an influx database.
The MQTT message has a very simple form following this pattern

/field1/field2/field3 XXX

Many softwares are consuming this message, meaning I am not able to adapt it to my needs, otherwise value would just be a influxdb line.

The influxdb line I would like to build is

field1,location=field3 field2=XXX timestamp

Actually I have no problem by setting the measurement to field1, thank to a processor_regex and a processor_converter from telegraf. I also think I will be able to build the location=field3. However, I have no clue to build the field2=XXX.

Right now, the best I can do is

field1,location=field3,topic=/field1/field2/field3 value=XXX timestamp

Any help would be very appreciated.

Bill

I think I have successfully achieved what I wanted to perform. However, it seems very complex for a thing that I thought quite basic.

I share it for the ones, that would like to perform the same thing. Do not hesitate to comment if you think that it could be optimised, because I am very new in telegraf.

The goal is to transform this MQTT message read by input.mqtt_consummer

/field1/field2/field3 XXX

To this influxdb line

field1,location=field3 field2=XXX

My telegraf configuration is the following (commented)

# Create the good "bucket" tag that will route the message to the correct bucket
# In: mqtt_consummer,topic=/field1/field2/field3 value=XXX   
# Out: mqtt_consummer,topic=/field1/field2/field3,bucket=field1 value=XXX
[[processors.regex]]
  order = 1
  [[processors.regex.tags]]
     key = "topic"
     pattern = "^/([^/]*)/.*"
     replacement = "$1"
     result_key = "bucket"

# Rename to get the correct measurement name
# In: mqtt_consummer,topic=/field1/field2/field3,bucket=field1 value=XXX
# Out: field1,topic=/field1/field2/field3,bucket=field1 value=XXX
[[processors.converter]]
  order = 2
  [processors.converter.tags]
    measurement = ["bucket"]

# Create a garbage tag, that will be exchanged with the field
# In: field1,topic=/field1/field2/field3,bucket=field1 value=XXX
# Out: field1,topic=/field1/field2/field3,bucket=field1,garbage=field2 value=XXX
[[processors.regex]]
  order = 3
  [[processors.regex.tags]]
    key = "topic"
    pattern = "^/[^/]+/([^/]+).*"
    replacement = "$1"
    result_key = "garbage"

# Extract the tag
# In: field1,topic=/field1/field2/field3,bucket=field1,garbage=field2 value=XXX
# Out: field1,topic=/field1/field2/field3,bucket=field1,garbage=field2,location=field3 value=XXX
[[processors.regex]]
  order = 4
  [[processors.regex.tags]]
    key = "topic"
    pattern = "^/[^/]+/[^/]+/(.*)"
    replacement = "$1"
    result_key = "location"

# Pivot the tag with the field
# In: field1,topic=/field1/field2/field3,bucket=field1,garbage=field2,location=field3 value=XXX
# Out: field1,topic=/field1/field2/field3,bucket=field1,location=field3 field2=XXX
[[processors.pivot]]
  order = 5
  tag_key = "garbage"
  value_key = "value"

The remaining tags topic and bucket are removed by the output plugin.

Hope it will help someone.

Bill

Your solution is functional but it might be more simple to use the starlark plugin. Then you don’t have to deal with multiple regular expressions and converter and pivot. The starlark plugin was made just for this kind of transformation! It requires you to understand python code but that’s often not a problem for users.

I think this does the metric transformation that you are looking for:

[[processors.starlark]]
  # we have this: mqtt_consummer,topic=/field1/field2/field3 value="myvalue" 12000000000
  # we want this: field1,location=field3 field2="myvalue" 12000000000
  source = '''
def apply(metric):
	topic = metric.tags["topic"]
	s = topic.split('/')
	m = Metric(s[1])
	m.tags["location"] = s[3]
	m.fields[s[2]] = metric.fields["value"]
	m.time = metric.time
	return m
'''
2 Likes

Oh ! I was not aware of this plugin ! Amazing :slight_smile:
I am used to code in python, so it’s not a problem. Thank you a lot !