Parsing messsage and topics from MQTT

Hi all!

Please help me
I have these variable topics
neuron/Site_1/Network_1/Device_1
neuron/Site_1/Network_1/Device_2
neuron/Site_1/Network_1/Device_n

or

neuron/Site_2/Network_1/Device_1
neuron/Site_2/Network_1/Device_2
neuron/Site_2/Network_1/Device_n

or

neuron/Site_3/Network_2/Device_n

my mqtt message is like this
“node”: “Network_1”,
“group”: “Device_1”,
“timestamp”: 1707303642843,
“timestamp”: 1707303643843,
“values”: {
“Tag1”: 134,
“Tag2”: 345,
“Tag3”: 3543
},
“errors”: {}

I want to filter in influxdb by Site / node / group / values .
After a lot of lost nights nothing works.

Help me please!
Thanks

Hello @Catalin_Grancea,
Welcome!
What do you mean you want to filter in influxdb?
Do you mean query?
If youre using 2.x it would look something like:

from(bucket: "your_bucket")
  |> range(start: -1h) // Adjust the range as needed
  |> filter(fn: (r) => r["_measurement"] == "measurements")
  |> filter(fn: (r) => r["Site"] == "Site_1")
  |> filter(fn: (r) => r["node"] == "Network_1")
  |> filter(fn: (r) => r["group"] == "Device_1")
  |> yield()

I think you could do something like:

# Global Telegraf configuration settings
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""

# MQTT Consumer Input Plugin
[[inputs.mqtt_consumer]]
  servers = ["tcp://your.mqtt.broker:1883"]
  topics = [
    "neuron/+/+/+"
  ]
  qos = 0
  connection_timeout = "30s"
  persistent_session = false
  client_id = ""
  username = ""
  password = ""
  data_format = "json"
  json_name_key = "measurement"
  json_time_key = "timestamp"
  json_time_format = "unix_ms"
  tag_keys = [
    "node",
    "group"
  ]

  # Custom processing to extract Site, Network, and Device from the topic
  [[processors.regex]]
    namepass = ["mqtt_consumer"]
    [[processors.regex.tags]]
      key = "topic"
      pattern = "neuron/(.*)/(.*)/(.*)"
      replacement = "${1}"
      result_key = "Site"
    [[processors.regex.tags]]
      key = "topic"
      pattern = "neuron/(.*)/(.*)/(.*)"
      replacement = "${2}"
      result_key = "node"
    [[processors.regex.tags]]
      key = "topic"
      pattern = "neuron/(.*)/(.*)/(.*)"
      replacement = "${3}"
      result_key = "group"

# InfluxDB Output Plugin
[[outputs.influxdb_v2]]
  urls = ["http://your.influxdb.server:8086"]
  token = "YourInfluxDBToken"
  organization = "your_org"
  bucket = "your_bucket"

Although I haven’t tested it.

1 Like

Hi

Thanks very much! :slight_smile:

I did it yesterday night with json_v2. And it works very well. But I couldn’t do the topics either. I tried with what you gave me and it gives me an error:

[processors.regex] tags: Using explicit mode...
[processors.regex] tags: Using explicit mode...
[processors.regex] tags: Using explicit mode...
[processors.regex] tags: Using explicit mode...
[processors.regex] tags: Using explicit mode...
[processors.regex] tags: Using explicit mode...
[inputs.mqtt_consumer] Connected [tcp://192.168.0.203:1883]

I separated the “node” and “group” fields based on the mqtt message. And now I want to display from topics only the “Site” filter and under it to have “Site_1, Site_2, Site_n”. The name of Site_1 is a variable, it can take any other name.

I have attached a picture of how I would like to separate the topics

Certain fields do not write my new name (see in the picture)

I want to exclude the “host” filter, how do I do that?

Thanks a lot again!