MQTT -> telegraf -> influx beginners question

Hi there,

this undoubtedly is a beginners question but I keep looking, trying etc. without any success. I have an application that can send its measurements via MQTT. Every app. 10 seconds I receive a bunch of measurements like this:

solaranzeige/pv-garage/ac_ausgangsfrequenz 49.986938476562
solaranzeige/pv-garage/solarspannung_string_2 298.82556152344
solaranzeige/pv-garage/solarstrom_string_2 0.98286706209183
solaranzeige/pv-garage/ac_wirkleistung 526.43493652344

I would love to translate this to an influx measurement pv-garage with

timestamp, ac_ausgangsfrequenz, solarspannung_string_2, solarstrom_string_2, ac_wirkleistung

When I use

[[inputs.mqtt_consumer]]
  ## Topics that will be subscribed to.
  ### MQTT server data...

  topics = [
    "solaranzeige/#",
  ]

  data_format="value"
  data_type="float"

  [[inputs.mqtt_consumer.topic_parsing]]
     topic = "solaranzeige/+/+"
     measurement = "_/measurement/_"
     tags = "_/_/field"

I get

pv-garage,field=ac_ausgangsfrequenz,host=analysis,topic=solaranzeige/pv-garage/ac_ausgangsfrequenz value=49.97631072998 1678718599895328449
pv-garage,field=solarspannung_string_2,host=analysis,topic=solaranzeige/pv-garage/solarspannung_string_2 value=277.36282348633 1678718599895343882
pv-garage,field=solarstrom_string_2,host=analysis,topic=solaranzeige/pv-garage/solarstrom_string_2 value=0.37698721885681 1678718599895359030
pv-garage,field=ac_wirkleistung,host=analysis,topic=solaranzeige/pv-garage/ac_wirkleistung value=172.58512878418 1678718599895385720

First question: How do I get rid of the “host” tag as I do not need it and where does it come from in the first place? Then I do not really need the topic tag in the output. Esp. since this then breaks pivoting. I tried

 [[processors.pivot]]
   tag_key = "field"
   value_key = "value"

which brings me to:

pv-garage,host=analysis,topic=solaranzeige/pv-garage/ac_ausgangsfrequenz ac_ausgangsfrequenz=49.994220733643 1678718789339766130
pv-garage,host=analysis,topic=solaranzeige/pv-garage/solarspannung_string_2 solarspannung_string_2=282.73272705078 1678718789339772460
pv-garage,host=analysis,topic=solaranzeige/pv-garage/solarstrom_string_2 solarstrom_string_2=0.42668148875237 1678718789339779801
pv-garage,host=analysis,topic=solaranzeige/pv-garage/ac_wirkleistung ac_wirkleistung=180.20916748047 1678718789339838840

I would have expected one line with the four fields columns. The first problem is the changing topic (which I wanted to get rid of in the first place). The second is the timestamp since the MQTT datagrams are processed at a different time. The last problem I could maybe work around with an aggregator function and a suitable period. But I still will not get everything into one line. I must be missing something obvious but what?

Any help would be greatly appreciated.

Hi and welcome. Have you ever used NodeRED? It can do everything you described.

Hi grant1,

I was able to solve the problem with telegraf and all sorts of aggregations etc. But it is somewhat clumsy. I have used Node-RED in the past. Very good suggestion. Question: How would you go about getting the datagrams, aggregate the incoming tickets within the last x seconds, pivot them and then send them to influx?

Regards
JP

I could do a mockup in NodeRED but am short on time today. Their forum is excellent and you would likely get help within hours of posting.

I envision using Regex, change nodes, join nodes and perhaps Jsonata to get the output format needed. The Influx nodes are excellent for sending or retrieving data from Influx.

@j.koopmann I would be interested in your solution and in a suggestion on how we can improve Telegraf. The main issue with MQTT is the different timing of messages, this makes merging them complex as we currently do not know which messages to expect, when they arrive and how long to wait…

One possible solution would be to introduce a synchronization strategy in the merge aggregator…

@j.koopmann In case you understand German (as your MQTT topics may suggest), you may have a look at this discussion at the ioBroker forums. The underlying question was a bit different, searching for a solution on the source side. But the workarounds involving Node-RED are similar to those of your problem.

Use tagexclude = [“host”, “topic”] in the [[outputs.influxdb_v2]] section of telegraf.conf

Hi @srebhan,

this is what seems to help:

[[outputs.influxdb]]
urls = ["http://192.168.1.1:8086"] # ServerIP:InfluxDB-Port
database = "solaranzeige"
write_consistency = "any"
timeout = "5s"
namepass = ["pv-garage"]

[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "solaranzeige/pv-garage/wattstundengesamtheute",
    "solaranzeige/pv-garage/solarspannung_string_1",
    "solaranzeige/pv-garage/solarspannung_string_2",
  ]

  data_format="value"
  data_type="float"

  topic_tag=""


  [[inputs.mqtt_consumer.topic_parsing]]
     topic = "solaranzeige/+/+"
     measurement = "_/measurement/_"
     tags = "_/_/field"


  [[processors.pivot]]
    tag_key = "field"
    value_key = "value"

[[processors.regex]]

  [[processors.regex.field_rename]]
  pattern = "(.*)_mean"
  replacement = "${1}"

[[aggregators.basicstats]]
period = "2s"
drop_original = true
stats = ["mean"]
namepass = ["pv-garage"]

The aggregator is sort of “collecting” the incoming MQTT messages so the timestamp is aligned. Then the pivot seems to work and the regex processor is getting rid of _mean. Far from beautiful.

It would be great to have the ability to consolidate incoming MQTT messages over a certain period of time and then treat them as “one” message or one “cluster” of messages. The rest would fall into pieces immediately.

@bmenard Not sure this would help. Since the pivoting does not work then does it due to the different topic-tags still in place during the input-phase.

topic_tag=“”

and

omit_hostname = true

did the trick.

@j.koopmann can you please open an issue for Telegraf describing your input (as in this thread), your expected output and potentially your solution?! We can then discuss the assumptions you want to make (e.g. collect everything within a max-timespan, sync to receive every field once, etc). The easiest is probably to extend the merge aggregator to either have a grace-timespan for merging or by doing a barrier-sync kind of strategy…