Telegraf Kafka Aggregation Issue: Not Processing Historical Data On Kafka Topic

Telegraf’s basicstats aggregator fails to process one-month-old Kafka data despite:

  • :white_check_mark: Setting offset = "oldest"
  • :white_check_mark: Resetting consumer group offsets
  • :white_check_mark: Kafka containing full historical data
# Kafka Input
[[inputs.kafka_consumer]]
  brokers = [xxxxxxxxxxxxx]
  topics = ["sensors_raw"]
  offset = "oldest"
  consumer_group = "telegraf_consumer_group_1h"
  data_format = "json"
  json_time_key = "timestamp"
  json_time_format = "unix_ns"

# Aggregator
[[aggregators.basicstats]]
  period = "1h"
  drop_original = true
  stats = ["count", "min", "max", "mean"]

Observed Behavior:

  • Telegraf only aggregates new data (from start time)
  • Ignores historical Kafka messages (1 month back)
  • No errors in logs, but old data never appears in InfluxDB

InfluxDB v2.7.8

@akbasferhat,
Hmm I’m not sure… @skartikey might have an idea of what could be going wrong here though.

@akbasferhat Welcome to Influxdata Community!

When processing historical Kafka data, Telegraf reads all the old messages very quickly, but the aggregator only pushes results based on the current system time intervals, not the timestamps in your historical data.

Why Your Historical Data Isn’t Appearing

  1. Telegraf rapidly consumes 1 month of historical Kafka messages
  2. All historical data gets accumulated in the aggregator’s cache
  3. When the next period interval hits (based on system time), it pushes ONE aggregated result containing stats for ALL historical data
  4. The aggregated result gets timestamped with the current system time, not the historical timestamps

The basicstats aggregator is designed for real-time streaming data, not historical data processing with preserved timestamps.

Solution : Modify Your Approach - Use Processors Instead

Use processors to enrich data, then let InfluxDB handle the aggregation via Flux queries:

[[inputs.kafka_consumer]]
  brokers = [xxxxxxxxxxxxx]
  topics = ["sensors_raw"]
  offset = "oldest"
  consumer_group = "telegraf_consumer_group_1h"
  data_format = "json"
  json_time_key = "timestamp"
  json_time_format = "unix_ns"
  # Remove the aggregator completely

# Add time-based tags for later aggregation
[[processors.date]]
  field_key = "timestamp"
  date_format = "2006-01-02T15"  # Hour-level grouping
  tag_key = "hour_bucket"

[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-token"
  organization = "your-org"
  bucket = "your-bucket"

Then use Flux queries in InfluxDB to aggregate:

from(bucket: "your-bucket")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "sensors_raw")
  |> group(columns: ["hour_bucket"])
  |> aggregateWindow(every: 1h, fn: mean)

This gives you:

  • Proper historical timestamp handling
  • Flexible aggregation windows
  • Better performance for large historical datasets
  • The ability to re-aggregate data with different time windows without reprocessing