Telegraf’s basicstats
aggregator fails to process one-month-old Kafka data despite:
Setting offset = "oldest"
Resetting consumer group offsets
Kafka containing full historical data
# Kafka Input
[[inputs.kafka_consumer]]
brokers = [xxxxxxxxxxxxx]
topics = ["sensors_raw"]
offset = "oldest"
consumer_group = "telegraf_consumer_group_1h"
data_format = "json"
json_time_key = "timestamp"
json_time_format = "unix_ns"
# Aggregator
[[aggregators.basicstats]]
period = "1h"
drop_original = true
stats = ["count", "min", "max", "mean"]
Observed Behavior:
- Telegraf only aggregates new data (from start time)
- Ignores historical Kafka messages (1 month back)
- No errors in logs, but old data never appears in InfluxDB
InfluxDB v2.7.8
@akbasferhat,
Hmm I’m not sure… @skartikey might have an idea of what could be going wrong here though.
@akbasferhat Welcome to Influxdata Community!
When processing historical Kafka data, Telegraf reads all the old messages very quickly, but the aggregator only pushes results based on the current system time intervals, not the timestamps in your historical data.
Why Your Historical Data Isn’t Appearing
- Telegraf rapidly consumes 1 month of historical Kafka messages
- All historical data gets accumulated in the aggregator’s cache
- When the next
period
interval hits (based on system time), it pushes ONE aggregated result containing stats for ALL historical data
- The aggregated result gets timestamped with the current system time, not the historical timestamps
The basicstats
aggregator is designed for real-time streaming data, not historical data processing with preserved timestamps.
Solution : Modify Your Approach - Use Processors Instead
Use processors to enrich data, then let InfluxDB handle the aggregation via Flux queries:
[[inputs.kafka_consumer]]
brokers = [xxxxxxxxxxxxx]
topics = ["sensors_raw"]
offset = "oldest"
consumer_group = "telegraf_consumer_group_1h"
data_format = "json"
json_time_key = "timestamp"
json_time_format = "unix_ns"
# Remove the aggregator completely
# Add time-based tags for later aggregation
[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15" # Hour-level grouping
tag_key = "hour_bucket"
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "your-token"
organization = "your-org"
bucket = "your-bucket"
Then use Flux queries in InfluxDB to aggregate:
from(bucket: "your-bucket")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "sensors_raw")
|> group(columns: ["hour_bucket"])
|> aggregateWindow(every: 1h, fn: mean)
This gives you:
- Proper historical timestamp handling
- Flexible aggregation windows
- Better performance for large historical datasets
- The ability to re-aggregate data with different time windows without reprocessing