How to route different MQTT Topics to different influxdb databases?

hello

setup:
two sonoff pow r2 sendings mqtt data via mosquitto and telegraf to a single database on a raspberry pi
(see telegraf.conf and stored data in influxdb below).
question:
how can i alter the telegraf.conf file, so that data from the two different sonoff pow r2 (with different topics) are stored in two seperate databases on the raspberry pi?

many thanks

telegraf.conf

[global_tags]

[agent]

  interval = "10s"

  round_interval = true

  metric_batch_size = 1000

  metric_buffer_limit = 10000

  collection_jitter = "0s"


  flush_interval = "10s"

  flush_jitter = "0s"

  precision = ""


#===============================================================================
[[outputs.influxdb]]

urls = ["http://raspberrypi:8086"]

database = "sensors"



[[inputs.mqtt_consumer]]

servers = ["tcp://raspberrypi:1883"]

topics = ["tele/solar/SENSOR","tele/Sonoff_PV001/SENSOR"]

data_format = "json"

influxdb data

select * from "mqtt_consumer" order by time desc Limit 6

name: mqtt_consumer
time                           ENERGY_ApparentPower ENERGY_Current ENERGY_Factor ENERGY_Period ENERGY_Power ENERGY_ReactivePower ENERGY_Today ENERGY_Total ENERGY_Voltage ENERGY_Yesterday host        topic
----                           -------------------- -------------- ------------- ------------- ------------ -------------------- ------------ ------------ -------------- ---------------- ----        -----
2022-03-08T09:42:10.479688066Z 1221                 5.044          0.95          10            1158         386                  2.045        1490.898     242            8.037            raspberrypi tele/solar/SENSOR
2022-03-08T09:42:09.549686515Z 890                  3.671          0.97          7             867          199                  1.624        75.186       242            5.458            raspberrypi tele/Sonoff_PV001/SENSOR
2022-03-08T09:41:40.575211802Z 1204                 4.968          0.96          9             1153         348                  2.036        1490.888     242            8.037            raspberrypi tele/solar/SENSOR
2022-03-08T09:41:39.553784115Z 882                  3.642          0.98          7             860          192                  1.617        75.179       242            5.458            raspberrypi tele/Sonoff_PV001/SENSOR
2022-03-08T09:41:10.504204597Z 1162                 4.809          0.97          2             1125         291                  2.026        1490.879     242            8.037            raspberrypi tele/solar/SENSOR
2022-03-08T09:41:09.639315037Z 870                  3.612          0.98          7             851          177                  1.61         75.172       241            5.458            raspberrypi tele/Sonoff_PV001/SENSOR
> 
-----------------------------------------------------------------------------------------------------------------------------------------------------------

You can either use:

  1. Influxdb output routing by using the options database_tag
  2. Metric filtering which are filters applied to the plugins in order to filter the data based on tags

I think you might want option 2 given your case, it should look like this

[[outputs.influxdb]]
  urls = ["http://influxdb.example.com"]
  database = '__YourDb1__'
  [outputs.influxdb.tagpass]
    topic= ['tele/solar/SENSOR']

[[outputs.influxdb]]
  urls = ["http://influxdb.example.com"]
  database = '__YourDb2__'
  [outputs.influxdb.tagpass]
    topic= ['tele/Sonoff_PV001/SENSOR']

3 Likes

Hello Giovanni
Thousand thanks, it works great !
Thanks

hello again
another question … how does the syntax look like, to write into same database, but into a separate measurement for each sonoff pow r2 ?
many thanks

Not sure what you mean with “for each sonoff pow r2” as I don’t see a significant sample in the data, but I’ll treat it as the topic value

Generally speaking, you have the same two ways as above (plus a mix)

  1. As an “extension” of option one, you get retention_policy_tag in the InfluxDB output.
  2. You can use filtering, but it will require you to have additional outputs (one for each Retention Policy), each one with its own retention_policy

The mixed solution is to add the routing attribute to the data, via processors and let the InflxDB output manage the routing automatically.
To attach it to the correct data some filtering is needed, but routing gets easier and way less verbose.

In the sample below I add the routing data for the RP, but you can also add the DB and manage the routing via the inflxDB Output (database_tag + retention_policy_tag, if you do so filtering in the output is not needed anymore).

[[processors.override]]
  [processors.override.tagpass]
    tagpass= topic= ['tele/Sonoff_PV001/SENSOR','tele/Sonoff_PV002/SENSOR']

  [processors.override.tags]
    ## create an additional tag
    redirect_rp = "__DestinationRp1__"

[[processors.override]]
  [processors.override.tagpass]
    tagpass= topic= ['tele/Sonoff_PV003/SENSOR','tele/Sonoff_PV004/SENSOR']

  [processors.override.tags]
    redirect_rp = "__DestinationRp2__"

[[outputs.influxdb]]
  urls = ["http://influxdb.example.com"]
  database = '__YourDb__'
  ## default destination RP - empty = DB default
  retention_policy = ''
  ## if present overrides the default one
  retention_policy_tag = 'redirect_rp'
  exclude_retention_policy_tag = 'true'

  [outputs.influxdb.tagpass]
    ## note that now it's a glob/regex pattern
    topic= ['tele/Sonoff_*/SENSOR']

many thanks for your effort …
to clarify, i was wondering if it is possible (referencing my above example)
i could write the topic “tele/solar/SENSOR” into the database “sensor” and now not under measurements "mqtt_consumer " but for example under the measurement “mqtt_sensor_1”.
and the topic “tele/Sonoff_PV001/SENSOR” into the same database now “sensor” but under a own measurement, say for example “mqtt_sensor_2” ??
in other words, not two database to seperate the data, but this time two seperate measurements in same database …
thanks

measurements are created based on the data point themselves, in this case by default they are sent to the mqtt_consumer measurement.

You must override the measurement attribute or even better serialize it properly in the input itself.

  • look at the mqtt consumer docs here to see how to extract the measurement name form the data
  • if you want to set the measurement “manually” look here for name_override and add the filtering logic to it

hi Giovanni

many thanks,
i was successfully in manually adding an own measurement with “name_override = “new_measurement””.
perfect