How to downsample cummulative data

I am collecting network data from my router via telegraf and displaying them as monthly aggregates with this rather complicated query:

from(bucket: bucket_name)
  |> range(start: -12mo)
  |> filter(fn: (r) => r["_measurement"] == "net")
  |> filter(fn: (r) => r["_field"] == "bytes_recv" or r["_field"] == "bytes_sent")
  |> filter(fn: (r) => r["host"] == "OPNsense")
  |> filter(fn: (r) => r["interface"] == "vtnet0")
  |> derivative(unit: 1s, nonNegative: true)
  |> map(fn: (r) => ({ r with _value: float(v: r._value) * 8.0 }))
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: false)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with month: string(v: date.sub(from: r["_time"], d: 1h)), count: r["_value"] }))
  |> yield(name: "sum")

Obviously this is also slow, i after confirming this works I downsampled the data to a new bucket with the following task:

from(bucket: bucket_name)
    |> range(start: -task.every)
    |> filter(fn: (r) => r["_measurement"] == "net")
    |> filter(fn: (r) => r["_field"] == "bytes_recv" or r["_field"] == "bytes_sent")
    |> filter(fn: (r) => r["host"] == "OPNsense")
    |> increase()
    |> aggregateWindow(every: 1h, fn: sum, createEmpty: false)
    |> to(bucket: "opnsense-net_1d")

Then i made a query over the downsampled data like this:

from(bucket: "opnsense-net_1h")
  |> range(start: -12mo)
  |> filter(fn: (r) => r["_measurement"] == "net")
  |> filter(fn: (r) => r["_field"] == "bytes_recv" or r["_field"] == "bytes_sent")
  |> filter(fn: (r) => r["host"] == "OPNsense")
  |> filter(fn: (r) => r["interface"] == "vtnet0")
  |> derivative(unit: 1s, nonNegative: true)
  |> map(fn: (r) => ({ r with _value: float(v: r._value) * 8.0 }))
  |> aggregateWindow(every: 1mo, fn: sum, createEmpty: false)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with month: string(v: date.sub(from: r["_time"], d: 1h)), count: r["_value"] }))
  |> yield(name: "sum")

However I am getting substantially different results when queryring the downsampled data as compared to the original ones. The downsampled data shows around 30-70% higher values.

I am thinking this is affected by the restarts of the cummulative values in the downsampled period. I thought the increase() function will be mitigatting it, but apparetnly not very well.

Any suggestion how to improve these queries so that i get more realistic output from the downsampled data?

@molnart
Wait sorry I feel dense, why would the results be the same if you’re applying logic to data that has been downsampled and summed?

Like increase returns the cumulative sum of non-negative differences between subsequent values and then you’re getting the sum of those non negative differences with aggregate window().

Instead of applying increase() over the entire hourly window, maybe calculate increase() over shorter intervals first (e.g. 5m), then aggregate to 1h.:

from(bucket: bucket_name)
    |> range(start: -task.every)
    |> filter(fn: (r) => r["_measurement"] == "net")
    |> filter(fn: (r) => r["_field"] == "bytes_recv" or r["_field"] == "bytes_sent")
    |> filter(fn: (r) => r["host"] == "OPNsense")
    |> filter(fn: (r) => r["interface"] == "vtnet0")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)  // Smooth out small fluctuations
    |> increase()  // Now apply increase() after downsampling
    |> aggregateWindow(every: 1h, fn: sum, createEmpty: false)  // Sum over hourly intervals
    |> to(bucket: "opnsense-net_1h")

I think this works better because shorter increase() window (5m) prevents large gaps from accumulating resets.Pre-smoothing with mean func reduces the impact of outliers before applying Final aggregation at 1h ensures efficient storage but avoids compounding errors with increase(). Your monthly aggregates should now align much more closely with the original dataset. This is what I would check for first from your downsampled data.

Let me know if this helps!

1 Like

@Anaisdg thanks a lot for your insight, its really appreciated. One thing I am unsure of is that where should I apply the derivative() function be able to meaningfully summarize cummulative data. For testing purposed i tried monthly aggregagation with the derivative function, but the results I get are non plausible

    |> derivative(unit: 1s, nonNegative: true)
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)  // Smooth out small fluctuations
    |> increase()  // Now apply increase() after downsampling
    |> aggregateWindow(every: 1mo, fn: sum, createEmpty: false)  // Sum over daily intervals

Also tried using the derivative in the final query, but i was unsure what unit should i use. I tried 1s, 5m or 1mo, but none of results made any sense