I have an MQTT server with 3 different topics that each need slightly different processing, and I want to use telegraf to collect, process and sort them into the corresponding databases in influxdb.
So what I tried is to attach a custom tag to each source, use tagpass for the processors, and use that tag to funnel the data in the right database. But for some reason all inputs end up in the telegraf database. I am not sure I fully understand how to encapsulate the different data streams.
Any feedback is highly appreciated. Thanks.
Karsten
[[outputs.influxdb]]
urls = ["http://127.0.0.1:8086"]
database_tag = "influx_db"
skip_database_creation = true
username = "telegraf"
password = "sdasd"
[[processors.regex]]
[[processors.regex.tags]]
key = "topic"
pattern = "^.*?/(.*?)/.*$"
replacement = "${1}"
result_key = "owner"
[[processors.regex.tags]]
key = "topic"
pattern = "^.*?/.*?/(.*?)($|/.*)"
replacement = "${1}"
result_key = "device"
[processors.regex.tagpass]
influx_db = ["owntracks"]
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = [
"owntracks/#",
]
qos = 1
persistent_session = true
client_id = "telegrafMQTTOwntracks"
username = "telegraf"
password = "sdasd"
data_format = "json"
json_time_key = "tst"
json_time_format = "unix"
tag_keys = [
"t",
"tid",
"con",
"event",
"desc",
"uuid",
"action"
]
json_name_key = "_type"
[inputs.mqtt_consumer.tags]
influx_db = "owntracks"
[[processors.regex]]
[[processors.regex.tags]]
key = "qfn"
pattern = "^(.*?)\\.(.*?)\\.(.*?)$"
replacement = "${1}"
result_key = "host"
[[processors.regex.tags]]
key = "qfn"
pattern = "^(.*?)\\.(.*?)\\.(.*?)$"
replacement = "${2}"
result_key = "channel"
[[processors.regex.tags]]
key = "qfn"
pattern = "^(.*?)\\.(.*?)\\.(.*?)$"
replacement = "${3}"
result_key = "sensor"
[processors.regex.tagpass]
influx_db = ["sensors"]
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = [
"sensors/+/influxformat"
]
qos = 1
persistent_session = true
client_id = "telegrafMQTTSensors"
username = "telegraf"
password = "sdasd"
data_format = "influx"
[inputs.mqtt_consumer.tags]
influx_db = "sensors"
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = [
"telegraf/myHost/#"
]
qos = 1
persistent_session = true
client_id = "telegrafMQTTKBServer"
username = "telegraf"
password = "sdasd"
data_format = "influx"
[inputs.mqtt_consumer.tags]
influx_db = "telegraf"