Multiple mqtt_consumer to multiple influx databases

Hello,
Can I’d like to have one telegraf instance gather information from different mqtt servers and put the data from one mqtt server to one database and data from another mqtt server in another database. But I’m not sure how to do that. There is a [outputs.influxdb] plugin that seems to just define which database all the inputs use. And there doesn’t seem to specify (in the inputs.mqtt_consumer) which output plugin to use? So my configuration below would just write both mqtt_consumers to the same database.

# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
  urls = ["http://localhost:8086"] # required
  database = "telegraf_metrics_01"
  retention_policy = ""
  write_consistency = "any"
  timeout = "37s"
  username = "telegraf"
  password = "mypasswordhere"
  user_agent = "telegraf"


########################################################################

[[inputs.mqtt_consumer]]

  servers = ["tcp://192.168.1.2:1883"]
  qos = 2
  topics = [
    "/+/metrics1/#"
  ]
  persistent_session = false
  client_id = ""
  username = "telegraf" password = "metricsmetricsmetricsmetrics"
  data_format = "value"

########################################################################

[[inputs.mqtt_consumer]]

  servers = ["tcp://192.168.1.3:1883"]
  qos = 2
  topics = [
    "/+/metrics2/#"
  ]
  persistent_session = false
  client_id = ""
  username = "telegraf" password = "metricsmetricsmetricsmetrics"
  data_format = "value"

########################################################################
1 Like

Hi @jason2 , welcome to the community ,
this should work for you , best regards

Configuration for sending metrics to InfluxDB

[[outputs.influxdb]]
urls = [“http://localhost:8086”] # required
database = "telegraf_metrics_01"
retention_policy = “”
write_consistency = “any”
timeout = “37s”
username = “telegraf”
password = “mypasswordhere”
user_agent = “telegraf”
tagexclude = [“destinationdb”]
outputs.influxdb.tagpass]
destinationdb = [“db01”]

[[outputs.influxdb]]
urls = [“http://localhost:8086”] # required
database = "telegraf_metrics_02"
retention_policy = “”
write_consistency = “any”
timeout = “37s”
username = “telegraf”
password = “mypasswordhere”
user_agent = “telegraf”
tagexclude = [“destinationdb”]
outputs.influxdb.tagpass]
destinationdb = [“db02”]

########################################################################

[[inputs.mqtt_consumer]]

servers = [“tcp://192.168.1.2:1883”]
qos = 2
topics = [
“/+/metrics1/#”
]
persistent_session = false
client_id = “”
username = “telegraf” password = “metricsmetricsmetricsmetrics”
data_format = “value”
telegraf_metrics_02
[inputs.mqtt_consumer.tags]
destinationdb = "db01"
########################################################################

[[inputs.mqtt_consumer]]

servers = [“tcp://192.168.1.3:1883”]
qos = 2
topics = [
“/+/metrics2/#”
]
persistent_session = false
client_id = “”
username = “telegraf” password = “metricsmetricsmetricsmetrics”
data_format = “value”
[inputs.mqtt_consumer.tags]
destinationdb = "db02"

1 Like

mqtt 关键配置

###############################################################

#第一个数据库,筛选的数据,当device_id=d1时往tcp数据库mqtt_consumer表里写数据

[[outputs.influxdb]]

#定义数据库地址和端口号

urls = [ “http://127.0.0.1:8086” ]

#数据库名称

database = “tcp”

[outputs.influxdb.tagdrop]

name = ["*"]

precision = “s”

retention_policy = “”

write_consistency = “any”

timeout = “5s”

#筛选的条件device_id=d1

[outputs.influxdb.tagpass]

device_id = [“d1”]

#第二个数据库

[[outputs.influxdb]]

urls = [ “http://127.0.0.1:8086” ]

database = “udp”

[outputs.influxdb.tagdrop]

name = ["*"]

precision = “s”

retention_policy = “”

write_consistency = “any”

timeout = “5s”

[outputs.influxdb.tagpass]

device_id = [“d2”]

########################################################################

#定阅的第一个mqtt服务器

[[inputs.mqtt_consumer]]

servers = [“tcp://127.0.0.1:1883”]

qos = 2

topics = [

“metrics1”,“metrics3”

]

persistent_session = false

client_id = “”

insecure_skip_verify = true

data_format = “json” # 如果它没有会报:Error in plugin: metric parse error: expected field at 1:23

tag_keys = [

“device_id”

]

###[inputs.mqtt_consumer.tags]

###device_id = “d1”

##多个服务器需要写下面,单个服务器不用以下配置

[[inputs.mqtt_consumer]]

servers = [“tcp://127.0.0.1:1883”]

qos = 2

topics = [

“metrics2”

]

persistent_session = false

client_id = “”

insecure_skip_verify = true

data_format = “json”

tag_keys = [

“device_id”

]

Thank you for posting this - greatly appreciated by this Newbie.

I did find that the “database” and " “destinationdb” had to be the same… not sure what I missed.

Also for others - I’m sure most worked out that the following should be preceded by a “[”.
outputs.influxdb.tagpass]

1 Like