Hi there,
I hope you can find the mistakes I’m making that lead to missing data if I’m using aggregation. If I set drop_original = false I get messages from multiple devices quickly within seconds. But if I look at the aggregated data with period set to 3600s I often don’t get any data for hours. So the issue seems to be narrowed down to processing/aggregating. I used AI to build my config and tried finding the issue for hours. What’s wrong?
This is my mqtt data:
m22wmbusmeters/ei6500/07289784/{“media”:“smoke detector”,“meter”:“ei6500”,“name”:“smokedetector_UgSchlafzimmerMitte”,“id”:“07289784”,“alarm_counter”:0,“duration_removed_h”:0,“removed_counter”:0,“test_button_counter”:0,“battery_level”:“3.00V”,“dust_level”:“DUST_0”,“installation_date”:“2025-10-22”,“last_alarm_date”:“2000-01-01”,“last_remove_date”:“2000-01-01”,“last_sound_check_date”:“2025-11-16”,“message_datetime”:“2025-12-13 13:00”,“obstacle_distance”:“”,“software_version”:“020100”,“status”:“OK”,“test_button_last_date”:“2000-01-01”,“timestamp”:“2025-12-13T13:00:44Z”,“device”:“im871a[01000155]”,“rssi_dbm”:-76,“floor”:“eg”,“address”:“XXX”,“city”:“XX”}
This is part of the telegraf.conf file:
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"
## Log at debug level.
debug = true
[[inputs.mqtt_consumer]]
interval = "120s"
servers = ["tcp://mqtt.flespi.io:1883"]
topics = [
"m22wmbusmeters/#",
]
username = "XX"
client_id="telegraf"
persistent_session = true
data_format = "json"
tag_keys = [
"name",
"floor",
"factor",
]
# Keep these payload fields as strings
json_string_fields = [
"media","meter","name","dust_level",
"installation_date","last_alarm_date","last_remove_date",
"last_sound_check_date","message_datetime",
"obstacle_distance","software_version","status",
"test_button_last_date","device",
"address","city","floor","id","battery_level"
]
[[inputs.mqtt_consumer.topic_parsing]]
topic ="m22wmbusmeters/+/+"
tags = "platform/devicetype/deviceid"
measurement="_/_/measurement"
[[inputs.mqtt_consumer]]
servers = ["tcp://mqtt.flespi.io:1883"]
topics = [
"aiontheedge-water-meter/main/value",
]
username = "xx"
data_format = "value"
data_type = "float"
[[inputs.mqtt_consumer.topic_parsing]]
topic ="aiontheedge-water-meter/main/value"
tags = "device/_/_"
measurement="_/_/measurement"
[[processors.regex]]
[[processors.regex.fields]]
key = "battery_level"
pattern = '.*?([0-9]+(?:\.[0-9]+)?)\s*[Vv]?.*'
replacement = "${1}"
[[processors.converter]]
[processors.converter.fields]
integer = [
"alarm_counter",
"duration_removed_h",
"removed_counter",
"test_button_counter",
"rssi_dbm"
]
float = ["battery_level"]
[[processors.starlark]]
source = '''
def apply(metric):
# Copy string fields so they survive aggregation
for field in ["status", "dust_level", "device", "name", "floor", "city"]:
if field in metric.fields:
metric.fields["last_" + field] = metric.fields[field]
return metric
'''
[[aggregators.final]]
period = "60s"
series_timeout = "120s"
drop_original = true
name_suffix = ""
[aggregators.final.tagpass]
platform = ["m22wmbusmeters"]
This is the data written to influxdb according to the telegraf log:
1765626435377002818
07289780,deviceid=07289780,devicetype=ei6500,floor=eg,name=smokedetector_EgFlur,platform=m22wmbusmeters,topic=m22wmbusmeters/ei6500/07289780 last_sound_check_date_final=“2025-11-16”,duration_removed_h_final=0i,last_dust_level_final=“DUST_0”,last_device_final=“im871a[01000155]”,dust_level_final=“DUST_0”,obstacle_distance_final=“”,removed_counter_final=0i,test_button_last_date_final=“2025-10-22”,software_version_final=“020100”,address_final=„XX",rssi_dbm_final=-75i,device_final=“im871a[01000155]”,installation_date_final=“2025-10-22”,meter_final=“ei6500”,city_final=„XX“,last_remove_date_final=“2000-01-01”,last_alarm_date_final=“2000-01-01”,media_final=“smoke detector”,id_final=“07289780”,message_datetime_final=“2025-12-13 11:46”,alarm_counter_final=0i,battery_level_final=3,status_final=“OK”,test_button_counter_final=1i,last_status_final=“OK”,last_city_final=„XX“ 1765626450216738726
This is my influxdb graph. the narrow data entries are from when I set drop_original = false, so there should be data at the end of the graph every 60s with the current configuration:

