I got mqtt_consumer running, which gathers metrics from a bunch of topics.
All this information is processed, pushed in a influxdb and shown in a grafana dashboard.
(some time series graphs but also some last values are shown in the dashboard)
Some metrics are very fluctuating; so every few seconds/minutes the mqtt topic is updated and written to the influx database. But I got also some metric’s which stay on the same value for hours, even days.
So new new entries in the influxdb for this metrics as they got no new value (mqtt_consumer is a service plugin, the interval does not apply).
For this metrics I got no data in grafana if I select for example last 8 hours.
I looked at aggregators.final, but also that 's a no go; as this looks only applicable for a time-out of the gathering of a metric; but mqtt_consumer doesn’t take any action if there is no new value published in mqtt.
(already tested)
How can I write every x minutes the values of every configure topic to influx without restarting telegraf every x minutes?
[agent]
debug = true
quiet = false
logtarget = "file"
#logfile = "/var/log/telegraf/telegraf.log"
logfile = "/tmp/telegraf.log"
hostname = "influx18"
[[inputs.mqtt_consumer]]
servers = ["tcp://172.16.0.227:1883"]
topics = [
"test/topic1"
]
username = "xxx"
password = "yyy"
name_override = "finaltesting"
data_format = "value"
data_type = "int"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "test/+"
measurement = "measurement/_"
tags = "_/id"
[[processors.pivot]]
tag_key = "id"
value_key = "value"
# Report the final metric of a series
[[aggregators.final]]
## The period on which to flush & clear the aggregator.
period = "30m"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
# drop_original = false
## The time that a series is not updated until considering it final.
series_timeout = "1m"
## Output strategy, supported values:
## timeout -- output a metric if no new input arrived for `series_timeout`;
## useful for filling gaps in input data
## periodic -- output the last received metric every `period`; useful to
## downsample the input data
output_strategy = "timeout"
[[outputs.file]]
files = ["stdout", "/tmp/metrics.out"]
data_format = "influx"
What I did on MQTT:
Mon Apr 15 11:25:59 AM CEST 2024 publish value 0 onto test/topic1
Mon Apr 15 12:05:01 PM CEST 2024 publish value 2 onto test/topic1
The results in /tmp/metrics.out (epoch timestamps converted for readability):
finaltesting,host=influx18,topic=test/topic1 topic1=0i – Mon Apr 15 11:25:59 AM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1_final=0i – Mon Apr 15 11:25:59 AM CEST 2024
finaltesting,host=influx18,topic=test/topic1 topic1=2i – Mon Apr 15 12:05:01 PM CEST 2024
So no updates in between the publishes.
My best guess is because mqtt is a service plugin; which works more like “interrupt” based instead of interval based?
There was less than 30 minutes between the 2 pushes, so at what interval do you expect the final metric to be emitted?
Edit: whoops it did indeed. @jpowers This looks like a bug? Or the comment for the timeout strategy option is misleading since the gap is not filled.
@jeroenvd Do you have the same results when using aggregators.basicstats and stats = ["last"]?
For aggregators.basicstats and stats = ["last"]
I got the message:
[aggregators.basicstats] Unrecognized basic stat "last", ignoring
figured out my telegraf container was still running: 1.29.2
Looks like the “last” stat is only available in 1.31 (Due by June 10, 2024)?
In the meantime, I updated my docker container to the newest on docker hub: 1.30
and redo my tests. (but looks like nothing changed; also testing with adding the interval in the agent section)
Timeout is the default strategy and did not guarantee values at each series expiry. The behavior that was added was that when you set the strategy to periodic, you would get a guaranteed metric at every timeout interval. The following would output a metric every 5mins even if one was not received.
It still looks the “useful for” is switched between these options?
## timeout -- output a metric if no new input arrived for `series_timeout`;
## useful for filling gaps in input data
## periodic -- output the last received metric every `period`; useful to
## downsample the input data
I changed timing a little to speed-up my testing; but looks like the periodic doesn’t take any action at all:
[agent]
debug = true
quiet = false
logtarget = "file"
#logfile = "/var/log/telegraf/telegraf.log"
logfile = "/tmp/telegraf.log"
hostname = "influx18"
[[inputs.mqtt_consumer]]
servers = ["tcp://172.16.0.227:1883"]
topics = [
"test/topic1"
]
username = "xxx"
password = "yyy"
name_override = "finaltest2"
data_format = "value"
data_type = "int"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "test/+"
measurement = "measurement/_"
tags = "_/id"
[[processors.pivot]]
tag_key = "id"
value_key = "value"
# Report the final metric of a series
[[aggregators.final]]
## The period on which to flush & clear the aggregator.
period = "30m"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
# drop_original = false
## The time that a series is not updated until considering it final.
series_timeout = "90s"
## Output strategy, supported values:
## timeout -- output a metric if no new input arrived for `series_timeout`;
## useful for filling gaps in input data
## periodic -- output the last received metric every `period`; useful to
## downsample the input data
output_strategy = "periodic"
[[outputs.file]]
files = ["stdout", "/tmp/metrics.out"]
data_format = "influx"
I ran a script with logging, executing 3 commands:
Because you didn’t wait long enough. You set your period to 30mins and only waited ~7mins:
Mon Apr 15 07:13:51 PM CEST 2024
Mon Apr 15 07:20:52 PM CEST 2024
From the readme:
Contrary to this, output_strategy = "periodic" will always output a
final metric at the end of the period irrespectively of when the last
metric arrived, the series_timeout is ignored. This is helpful if you
for example want to downsample input data arriving at a high rate
and require a periodic output of the final metric.
Your totally right about “you didn’t wait long enough”!
# Report the final metric of a series
[[aggregators.final]]
## The period on which to flush & clear the aggregator.
period = "30m"
## The time that a series is not updated until considering it final.
series_timeout = "1m"
## Output strategy, supported values:
## timeout -- output a metric if no new input arrived for `series_timeout`;
## useful for filling gaps in input data
## periodic -- output the last received metric every `period`; useful to
## downsample the input data
output_strategy = "periodic"
So there is 1 topic1_final written, after about 20 minutes,
but also with the timestamp of the originally entry;
looks also a little strange the originally timestamp is used?