Telegraf Aggregation - lot of data missed

Hi there,

I hope you can find the mistakes I’m making that lead to missing data if I’m using aggregation. If I set drop_original = false I get messages from multiple devices quickly within seconds. But if I look at the aggregated data with period set to 3600s I often don’t get any data for hours. So the issue seems to be narrowed down to processing/aggregating. I used AI to build my config and tried finding the issue for hours. What’s wrong?

This is my mqtt data:

m22wmbusmeters/ei6500/07289784/{“media”:“smoke detector”,“meter”:“ei6500”,“name”:“smokedetector_UgSchlafzimmerMitte”,“id”:“07289784”,“alarm_counter”:0,“duration_removed_h”:0,“removed_counter”:0,“test_button_counter”:0,“battery_level”:“3.00V”,“dust_level”:“DUST_0”,“installation_date”:“2025-10-22”,“last_alarm_date”:“2000-01-01”,“last_remove_date”:“2000-01-01”,“last_sound_check_date”:“2025-11-16”,“message_datetime”:“2025-12-13 13:00”,“obstacle_distance”:“”,“software_version”:“020100”,“status”:“OK”,“test_button_last_date”:“2000-01-01”,“timestamp”:“2025-12-13T13:00:44Z”,“device”:“im871a[01000155]”,“rssi_dbm”:-76,“floor”:“eg”,“address”:“XXX”,“city”:“XX”}

This is part of the telegraf.conf file:

[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Collection offset is used to shift the collection by the given amount.
  ## This can be be used to avoid many plugins querying constraint devices
  ## at the same time by manually scheduling them in time.
  # collection_offset = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## Collected metrics are rounded to the precision specified. Precision is
  ## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  ##
  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s:
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ##
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  precision = "0s"

  ## Log at debug level.
   debug = true

[[inputs.mqtt_consumer]]
 interval = "120s"
 servers = ["tcp://mqtt.flespi.io:1883"]
 topics = [
"m22wmbusmeters/#",
   ]
 username = "XX"
client_id="telegraf"
persistent_session = true
data_format = "json"
 tag_keys = [
  "name",
  "floor",
  "factor",
 ]

# Keep these payload fields as strings
json_string_fields = [
  "media","meter","name","dust_level",
  "installation_date","last_alarm_date","last_remove_date",
  "last_sound_check_date","message_datetime",
  "obstacle_distance","software_version","status",
  "test_button_last_date","device",
  "address","city","floor","id","battery_level"
]

 [[inputs.mqtt_consumer.topic_parsing]]
  topic ="m22wmbusmeters/+/+"
  tags = "platform/devicetype/deviceid"
  measurement="_/_/measurement"
[[inputs.mqtt_consumer]]
 servers = ["tcp://mqtt.flespi.io:1883"]
 topics = [
"aiontheedge-water-meter/main/value",
   ]
 username = "xx"
 data_format = "value"
 data_type = "float"

 [[inputs.mqtt_consumer.topic_parsing]]
  topic ="aiontheedge-water-meter/main/value"
  tags = "device/_/_"
  measurement="_/_/measurement"

[[processors.regex]]
  [[processors.regex.fields]]
    key = "battery_level"
    pattern = '.*?([0-9]+(?:\.[0-9]+)?)\s*[Vv]?.*'
    replacement = "${1}"

[[processors.converter]]
  [processors.converter.fields]
    integer = [
      "alarm_counter",
      "duration_removed_h",
      "removed_counter",
      "test_button_counter",
      "rssi_dbm"
    ]
    float = ["battery_level"]
[[processors.starlark]]
source = '''
def apply(metric):
    # Copy string fields so they survive aggregation
    for field in ["status", "dust_level", "device", "name", "floor", "city"]:
        if field in metric.fields:
            metric.fields["last_" + field] = metric.fields[field]
    return metric
'''

[[aggregators.final]]

 period = "60s"
 series_timeout = "120s"
 drop_original = true
 name_suffix = ""

 [aggregators.final.tagpass]
  platform = ["m22wmbusmeters"]

This is the data written to influxdb according to the telegraf log:

1765626435377002818

07289780,deviceid=07289780,devicetype=ei6500,floor=eg,name=smokedetector_EgFlur,platform=m22wmbusmeters,topic=m22wmbusmeters/ei6500/07289780 last_sound_check_date_final=“2025-11-16”,duration_removed_h_final=0i,last_dust_level_final=“DUST_0”,last_device_final=“im871a[01000155]”,dust_level_final=“DUST_0”,obstacle_distance_final=“”,removed_counter_final=0i,test_button_last_date_final=“2025-10-22”,software_version_final=“020100”,address_final=„XX",rssi_dbm_final=-75i,device_final=“im871a[01000155]”,installation_date_final=“2025-10-22”,meter_final=“ei6500”,city_final=„XX“,last_remove_date_final=“2000-01-01”,last_alarm_date_final=“2000-01-01”,media_final=“smoke detector”,id_final=“07289780”,message_datetime_final=“2025-12-13 11:46”,alarm_counter_final=0i,battery_level_final=3,status_final=“OK”,test_button_counter_final=1i,last_status_final=“OK”,last_city_final=„XX“ 1765626450216738726

This is my influxdb graph. the narrow data entries are from when I set drop_original = false, so there should be data at the end of the graph every 60s with the current configuration: