Get at least every 5 min datapoint by mqtt consumer

Hello,

I got mqtt_consumer running, which gathers metrics from a bunch of topics.
All this information is processed, pushed in a influxdb and shown in a grafana dashboard.
(some time series graphs but also some last values are shown in the dashboard)

Some metrics are very fluctuating; so every few seconds/minutes the mqtt topic is updated and written to the influx database. But I got also some metric’s which stay on the same value for hours, even days.
So new new entries in the influxdb for this metrics as they got no new value (mqtt_consumer is a service plugin, the interval does not apply).
For this metrics I got no data in grafana if I select for example last 8 hours.

I looked at aggregators.final, but also that 's a no go; as this looks only applicable for a time-out of the gathering of a metric; but mqtt_consumer doesn’t take any action if there is no new value published in mqtt.
(already tested)

How can I write every x minutes the values of every configure topic to influx without restarting telegraf every x minutes?

Did you also try with output_strategy = "timeout"? As that seems what you are looking for.

Hello Hipska,

Thanks for your reply!

This is the config I used for testing:

[agent]  
  debug = true
  quiet = false
  logtarget = "file"
  #logfile = "/var/log/telegraf/telegraf.log"
  logfile = "/tmp/telegraf.log"
  hostname = "influx18"

[[inputs.mqtt_consumer]]
  servers = ["tcp://172.16.0.227:1883"]
  topics = [
    "test/topic1"
  ]
  username = "xxx"
  password = "yyy"
  name_override = "finaltesting"
  data_format = "value"
  data_type = "int"

[[inputs.mqtt_consumer.topic_parsing]]
  topic = "test/+"
  measurement = "measurement/_"
  tags = "_/id"

[[processors.pivot]]
    tag_key = "id"
    value_key = "value"

# Report the final metric of a series
[[aggregators.final]]
  ## The period on which to flush & clear the aggregator.
  period = "30m"
  ## If true, the original metric will be dropped by the
  ## aggregator and will not get sent to the output plugins.
  # drop_original = false

  ## The time that a series is not updated until considering it final.
  series_timeout = "1m"

  ## Output strategy, supported values:
  ##   timeout  -- output a metric if no new input arrived for `series_timeout`;
  ##               useful for filling gaps in input data
  ##   periodic -- output the last received metric every `period`; useful to
  ##               downsample the input data
  output_strategy = "timeout"


[[outputs.file]]
  files = ["stdout", "/tmp/metrics.out"]
  data_format = "influx"

What I did on MQTT:
Mon Apr 15 11:25:59 AM CEST 2024 publish value 0 onto test/topic1
Mon Apr 15 12:05:01 PM CEST 2024 publish value 2 onto test/topic1

The results in /tmp/metrics.out (epoch timestamps converted for readability):
finaltesting,host=influx18,topic=test/topic1 topic1=0i – Mon Apr 15 11:25:59 AM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1_final=0i – Mon Apr 15 11:25:59 AM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1=2i – Mon Apr 15 12:05:01 PM CEST 2024

So no updates in between the publishes.
My best guess is because mqtt is a service plugin; which works more like “interrupt” based instead of interval based?

There was less than 30 minutes between the 2 pushes, so at what interval do you expect the final metric to be emitted?
Edit: whoops it did indeed. @jpowers This looks like a bug? Or the comment for the timeout strategy option is misleading since the gap is not filled.

@jeroenvd Do you have the same results when using aggregators.basicstats and stats = ["last"]?

Hello Hipska,

For aggregators.basicstats and stats = ["last"]
I got the message:

[aggregators.basicstats] Unrecognized basic stat "last", ignoring

figured out my telegraf container was still running: 1.29.2

Looks like the “last” stat is only available in 1.31 (Due by June 10, 2024)?

In the meantime, I updated my docker container to the newest on docker hub: 1.30
and redo my tests. (but looks like nothing changed; also testing with adding the interval in the agent section)

Timeout is the default strategy and did not guarantee values at each series expiry. The behavior that was added was that when you set the strategy to periodic, you would get a guaranteed metric at every timeout interval. The following would output a metric every 5mins even if one was not received.

[[aggregators.final]]
  series_timeout = "5m"
  output_strategy = "periodic"

For this metrics I got no data in grafana if I select for example last 8 hours.

Grafana has the null value option as well. This would allow you to connect previous values.

1 Like

Okay that looks like what is needed here indeed.

It still looks the “useful for” is switched between these options?

  ##   timeout  -- output a metric if no new input arrived for `series_timeout`;
  ##               useful for filling gaps in input data
  ##   periodic -- output the last received metric every `period`; useful to
  ##               downsample the input data

Agreed, I’ve put up docs(aggregators.final): Swap useful statements by powersj · Pull Request #15159 · influxdata/telegraf · GitHub

1 Like

Hello Hipska,

I changed timing a little to speed-up my testing; but looks like the periodic doesn’t take any action at all:

[agent]  
  debug = true
  quiet = false
  logtarget = "file"
  #logfile = "/var/log/telegraf/telegraf.log"
  logfile = "/tmp/telegraf.log"
  hostname = "influx18"

[[inputs.mqtt_consumer]]
  servers = ["tcp://172.16.0.227:1883"]
  topics = [
    "test/topic1"
  ]
  username = "xxx"
  password = "yyy"
  name_override = "finaltest2"
  data_format = "value"
  data_type = "int"

[[inputs.mqtt_consumer.topic_parsing]]
  topic = "test/+"
  measurement = "measurement/_"
  tags = "_/id"

[[processors.pivot]]
    tag_key = "id"
    value_key = "value"

# Report the final metric of a series
[[aggregators.final]]
  ## The period on which to flush & clear the aggregator.
  period = "30m"
  ## If true, the original metric will be dropped by the
  ## aggregator and will not get sent to the output plugins.
  # drop_original = false

  ## The time that a series is not updated until considering it final.
  series_timeout = "90s"

  ## Output strategy, supported values:
  ##   timeout  -- output a metric if no new input arrived for `series_timeout`;
  ##               useful for filling gaps in input data
  ##   periodic -- output the last received metric every `period`; useful to
  ##               downsample the input data
  output_strategy = "periodic"


[[outputs.file]]
  files = ["stdout", "/tmp/metrics.out"]
  data_format = "influx"

I ran a script with logging, executing 3 commands:

  • date
  • cat tmp/metrics.out
  • stat tmp/metrics.out
Mon Apr 15 07:13:51 PM CEST 2024


  File: tmp/metrics.out
  Size: 0         	Blocks: 0          IO Block: 4096   regular empty file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 19:13:51.893377457 +0200
Modify: 2024-04-15 19:13:30.009193410 +0200
Change: 2024-04-15 19:13:39.825275968 +0200
 Birth: 2024-04-15 19:13:30.009193410 +0200
------------

Mon Apr 15 07:14:21 PM CEST 2024

finaltest2,host=influx18,topic=test/topic1 topic1=20i  -- Mon Apr 15 07:14:12 PM CEST 2024

  File: tmp/metrics.out
  Size: 74        	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 19:14:17.993596932 +0200
Modify: 2024-04-15 19:14:12.549551154 +0200
Change: 2024-04-15 19:14:12.549551154 +0200
 Birth: 2024-04-15 19:13:30.009193410 +0200
------------

Mon Apr 15 07:14:51 PM CEST 2024

finaltest2,host=influx18,topic=test/topic1 topic1=20i  -- Mon Apr 15 07:14:12 PM CEST 2024

  File: tmp/metrics.out
  Size: 74        	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 19:14:17.993596932 +0200
Modify: 2024-04-15 19:14:12.549551154 +0200
Change: 2024-04-15 19:14:12.549551154 +0200
 Birth: 2024-04-15 19:13:30.009193410 +0200
------------
...

------------
Mon Apr 15 07:20:52 PM CEST 2024

finaltest2,host=influx18,topic=test/topic1 topic1=20i  -- Mon Apr 15 07:14:12 PM CEST 2024

  File: tmp/metrics.out
  Size: 74        	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 19:14:17.993596932 +0200
Modify: 2024-04-15 19:14:12.549551154 +0200
Change: 2024-04-15 19:14:12.549551154 +0200
 Birth: 2024-04-15 19:13:30.009193410 +0200
------------

Because you didn’t wait long enough. You set your period to 30mins and only waited ~7mins:

Mon Apr 15 07:13:51 PM CEST 2024
Mon Apr 15 07:20:52 PM CEST 2024

From the readme:

Contrary to this, output_strategy = "periodic" will always output a
final metric at the end of the period irrespectively of when the last
metric arrived, the series_timeout is ignored. This is helpful if you
for example want to downsample input data arriving at a high rate
and require a periodic output of the final metric.

Your totally right about “you didn’t wait long enough”!

# Report the final metric of a series
[[aggregators.final]]
  ## The period on which to flush & clear the aggregator.
  period = "30m"

  ## The time that a series is not updated until considering it final.
  series_timeout = "1m"

  ## Output strategy, supported values:
  ##   timeout  -- output a metric if no new input arrived for `series_timeout`;
  ##               useful for filling gaps in input data
  ##   periodic -- output the last received metric every `period`; useful to
  ##               downsample the input data
  output_strategy = "periodic"
------------
Mon Apr 15 09:09:06 PM CEST 2024


  File: tmp/metrics.out
  Size: 0         	Blocks: 0          IO Block: 4096   regular empty file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 20:49:06.166371015 +0200
Modify: 2024-04-15 20:47:24.768513398 +0200
Change: 2024-04-15 20:47:29.356597380 +0200
 Birth: 2024-04-15 20:47:24.768513398 +0200
------------
Mon Apr 15 09:11:06 PM CEST 2024

finaltesting,host=influx18,topic=test/topic1 topic1=1i  -- Mon Apr 15 09:09:29 PM CEST 2024

  File: tmp/metrics.out
  Size: 75        	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 21:09:34.276999418 +0200
Modify: 2024-04-15 21:09:32.708970458 +0200
Change: 2024-04-15 21:09:32.708970458 +0200
 Birth: 2024-04-15 20:47:24.768513398 +0200
------------
...
------------
Mon Apr 15 09:29:06 PM CEST 2024

finaltesting,host=influx18,topic=test/topic1 topic1=1i  -- Mon Apr 15 09:09:29 PM CEST 2024

  File: tmp/metrics.out
  Size: 75        	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 21:09:34.276999418 +0200
Modify: 2024-04-15 21:09:32.708970458 +0200
Change: 2024-04-15 21:09:32.708970458 +0200
 Birth: 2024-04-15 20:47:24.768513398 +0200
------------
Mon Apr 15 09:31:06 PM CEST 2024

finaltesting,host=influx18,topic=test/topic1 topic1=1i  -- Mon Apr 15 09:09:29 PM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1_final=1i  -- Mon Apr 15 09:09:29 PM CEST 2024

  File: tmp/metrics.out
  Size: 156       	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 21:31:06.511365319 +0200
Modify: 2024-04-15 21:30:02.782349078 +0200
Change: 2024-04-15 21:30:02.782349078 +0200
 Birth: 2024-04-15 20:47:24.768513398 +0200
------------
...
------------
Mon Apr 15 10:33:07 PM CEST 2024

finaltesting,host=influx18,topic=test/topic1 topic1=1i  -- Mon Apr 15 09:09:29 PM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1_final=1i  -- Mon Apr 15 09:09:29 PM CEST 2024

  File: tmp/metrics.out
  Size: 156       	Blocks: 8          IO Block: 4096   regular file
Device: 254,0	Inode: 8331        Links: 1
Access: (0777/-rwxrwxrwx)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2024-04-15 21:31:06.511365319 +0200
Modify: 2024-04-15 21:30:02.782349078 +0200
Change: 2024-04-15 21:30:02.782349078 +0200
 Birth: 2024-04-15 20:47:24.768513398 +0200

So there is 1 topic1_final written, after about 20 minutes,
but also with the timestamp of the originally entry;
looks also a little strange the originally timestamp is used?

1 Like

Hello everyone,

I discussed this with a colleague this morning,
he was looking into grafana again to fix this and come with “custom” queries in grafana.
more like Get last value even if outside of time - InfluxDB - Grafana Labs Community Forums

Which is a solution for my usecase. (preventing also to have more data in influx than strictly needed)

But for Hipska, I still open to figure out the aggregators.final function.
Only to have a working example on the internet of this function.