Telegraf Aggregation - lot of data missed

Hi there,

I hope you can find the mistakes I’m making that lead to missing data if I’m using aggregation. If I set drop_original = false I get messages from multiple devices quickly within seconds. But if I look at the aggregated data with period set to 3600s I often don’t get any data for hours. So the issue seems to be narrowed down to processing/aggregating. I used AI to build my config and tried finding the issue for hours. What’s wrong?

This is my mqtt data:

m22wmbusmeters/ei6500/07289784/{“media”:“smoke detector”,“meter”:“ei6500”,“name”:“smokedetector_UgSchlafzimmerMitte”,“id”:“07289784”,“alarm_counter”:0,“duration_removed_h”:0,“removed_counter”:0,“test_button_counter”:0,“battery_level”:“3.00V”,“dust_level”:“DUST_0”,“installation_date”:“2025-10-22”,“last_alarm_date”:“2000-01-01”,“last_remove_date”:“2000-01-01”,“last_sound_check_date”:“2025-11-16”,“message_datetime”:“2025-12-13 13:00”,“obstacle_distance”:“”,“software_version”:“020100”,“status”:“OK”,“test_button_last_date”:“2000-01-01”,“timestamp”:“2025-12-13T13:00:44Z”,“device”:“im871a[01000155]”,“rssi_dbm”:-76,“floor”:“eg”,“address”:“XXX”,“city”:“XX”}

This is part of the telegraf.conf file:

[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Collection offset is used to shift the collection by the given amount.
  ## This can be be used to avoid many plugins querying constraint devices
  ## at the same time by manually scheduling them in time.
  # collection_offset = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## Collected metrics are rounded to the precision specified. Precision is
  ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  ##
  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s:
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ##
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  precision = "0s"

  ## Log at debug level.
   debug = true

[[inputs.mqtt_consumer]]
 interval = "120s"
 servers = ["tcp://mqtt.flespi.io:1883"]
 topics = [
"m22wmbusmeters/#",
   ]
 username = "XX"
client_id="telegraf"
persistent_session = true
data_format = "json"
 tag_keys = [
  "name",
  "floor",
  "factor",
 ]

# Keep these payload fields as strings
json_string_fields = [
  "media","meter","name","dust_level",
  "installation_date","last_alarm_date","last_remove_date",
  "last_sound_check_date","message_datetime",
  "obstacle_distance","software_version","status",
  "test_button_last_date","device",
  "address","city","floor","id","battery_level"
]

 [[inputs.mqtt_consumer.topic_parsing]]
  topic ="m22wmbusmeters/+/+"
  tags = "platform/devicetype/deviceid"
  measurement="_/_/measurement"
[[inputs.mqtt_consumer]]
 servers = ["tcp://mqtt.flespi.io:1883"]
 topics = [
"aiontheedge-water-meter/main/value",
   ]
 username = "xx"
 data_format = "value"
 data_type = "float"

 [[inputs.mqtt_consumer.topic_parsing]]
  topic ="aiontheedge-water-meter/main/value"
  tags = "device/_/_"
  measurement="_/_/measurement"

[[processors.regex]]
  [[processors.regex.fields]]
    key = "battery_level"
    pattern = '.*?([0-9]+(?:\.[0-9]+)?)\s*[Vv]?.*'
    replacement = "${1}"

[[processors.converter]]
  [processors.converter.fields]
    integer = [
      "alarm_counter",
      "duration_removed_h",
      "removed_counter",
      "test_button_counter",
      "rssi_dbm"
    ]
    float = ["battery_level"]
[[processors.starlark]]
source = '''
def apply(metric):
    # Copy string fields so they survive aggregation
    for field in ["status", "dust_level", "device", "name", "floor", "city"]:
        if field in metric.fields:
            metric.fields["last_" + field] = metric.fields[field]
    return metric
'''

[[aggregators.final]]

 period = "60s"
 series_timeout = "120s"
 drop_original = true
 name_suffix = ""

 [aggregators.final.tagpass]
  platform = ["m22wmbusmeters"]

This is the data written to influxdb according to the telegraf log:

1765626435377002818

07289780,deviceid=07289780,devicetype=ei6500,floor=eg,name=smokedetector_EgFlur,platform=m22wmbusmeters,topic=m22wmbusmeters/ei6500/07289780 last_sound_check_date_final=“2025-11-16”,duration_removed_h_final=0i,last_dust_level_final=“DUST_0”,last_device_final=“im871a[01000155]”,dust_level_final=“DUST_0”,obstacle_distance_final=“”,removed_counter_final=0i,test_button_last_date_final=“2025-10-22”,software_version_final=“020100”,address_final=„XX",rssi_dbm_final=-75i,device_final=“im871a[01000155]”,installation_date_final=“2025-10-22”,meter_final=“ei6500”,city_final=„XX“,last_remove_date_final=“2000-01-01”,last_alarm_date_final=“2000-01-01”,media_final=“smoke detector”,id_final=“07289780”,message_datetime_final=“2025-12-13 11:46”,alarm_counter_final=0i,battery_level_final=3,status_final=“OK”,test_button_counter_final=1i,last_status_final=“OK”,last_city_final=„XX“ 1765626450216738726

This is my influxdb graph. the narrow data entries are from when I set drop_original = false, so there should be data at the end of the graph every 60s with the current configuration:

You are doing a lot of different processing of the data before it ever gets sent out. Processors.regex .converter and .starlark are all possibly changing your data and then after them aggregators.final is only going to emit the last metric of every 3m period. My guess is that those different processors and aggregators are either dropping or making duplicates that are being dedupped and that’s causing the gaps. I would comment out everything that is not an input plugin and print it all to an output file and see what it looks like. Also try running it in –debug and looking for any dropped metric messages. If you are using aggregators.final I would assume you have a ton of metrics and a batch size of 1000 could be getting overrun and dropping them.