Mqtt consumer: how to split data to different measurements, based on topics

Hello!

I’m actually splitting data received by MQTT to different measurements using two different [[inputs.mqtt_consumer]] and a “name_override” sentence, like this:

[[inputs.mqtt_consumer]]
    servers = ["tcp://mymqtt:1883"]
    qos = 0
    connection_timeout = "30s"
	topics = ["/topic/split1/#"]
	name_override = "split_1_data"
	data_format = "json"
[[inputs.mqtt_consumer]]
    servers = ["tcp://mymqtt:1883"]
    qos = 0
    connection_timeout = "30s"	
	topics = ["/root/split2/#"]
	name_override = "split_2_data"
	data_format = "json"

My question: is it the best way? This way, at startup, i see two MQTT connection open. Splitting to different DB i was able to use a tagpass/tagdrop and database with a single MQTT connection, but i was not able to find a way to use something similar working with a single DB but multiple measurements.

Thanks!

Alessio

You could try capturing them both with a single mqtt_consumer plugin, and then having two override processors doing the measurement rename:

[[processors.override]]
  namepass = "foo" # namepass will make matching faster if you have lots of other metrics.
  tagpass = ["/root/split1/foo"]
  name_override = "split_1_data"
[[processors.override]]
  namepass = "foo"
  tagpass = ["/root/split2/foo"]
  name_override = "split_2_data"

Hello Daniel, thank you for your answer.

I tried this way but i’m still stuck.

Now i’ve this scenario:

[[inputs.mqtt_consumer]]
    servers = ["tcp://mybrocker:1883"]
    qos = 0
    connection_timeout = "30s"
	topics = ["/root/topics/#"]
	data_format = "json"

[[processors.override]]
	order = 1
	tagpass = ["/root/topics/group1/*"]
	name_override = "group1"
[[processors.override]]
	order = 2
	tagpass = ["/root/topics/group2/*"]
	name_override = "group2"

The mqtt brocker is emitting a lot of topics, something like:

/root/topics/group1/A
/root/topics/group1/B
/root/topics/group1/C
/root/topics/group2/yellow
/root/topics/group2/red
/root/topics/group2/blue
/root/topics/group3/apple
/root/topics/group3/banana
/root/topics/group3/raspberry

Using the [[process.override]] conf, now i’ve a unique measurements, called “group2” (the last processor in order) containing all the topics, not only the ones matched by the tagpass pattern, (for example also the topics in group3).

Ok, solved!

The last configuration is (with a little fixing):

[[inputs.mqtt_consumer]]
    servers = ["tcp://mybroker:1883"]
    qos = 0
    connection_timeout = "30s"
	topics = ["/root/topics/#"]
	data_format = "json"
	
[[processors.override]]
	order = 1
	name_override = "group1"
	[processors.override.tagpass]
		topic = ["/root/topics/group1/*"]
		
[[processors.override]]
	order = 2
	name_override = "group2"
	[processors.override.tagpass]
		topic = ["/root/topics/group2/*"]

Now i’ve three measurements:

  • group1
  • group2
  • mqtt_consumer

The firts two contains the topics related to each group, the third contains all the other topics (that’s also a side nice effect so i can catch all the rogue topics).

Thanks!

1 Like

What if the name of the measurement needs to be dynamically generated based on the topic? For example, the topic is something like /root/topic/<userId> and we need name_override = <userId>. We do not want to add a processor override for each user.
thank you.
Philippe