Downsample in the same bucket

Hi,

I would like to have in one bucket, multiple granularities. For example, get the data each 100 milliseconds, then for the data older than 1 minute down sample to 10s and for data older than 1 hour, down sample to 1min.

I tried to make a task for that :

option task = {name: "test", every: 10s}

// Defines a data source
data =
    from(bucket: "sensors")
        |> range(start: -duration(v: 1m), stop: -duration(v: 40s))

data
    |> aggregateWindow(fn: mean, every: 10s)
    |> to(bucket: "sensors", org: "orga")

The down sample part works but it stays to delete the data before to put back the data in the bucket.

Any ideas how I can do that ?

@steph2795 From what I understand, your use case is exactly why you should have multiple buckets, each with their own retention periods. Bucket retention periods are essentially expiration times for data in the bucket, after which, the data is deleted. However, the minimum retention period is 1 hour.

The process of deleting data through the delete API can be tricky as well. Data deletion through the API is an async process, so data doesn’t necessarily get deleted right away. I assume you’d want to delete data by time range, but since deletes are asynchronous, you’d likely end up writing new points that would immediately be deleted by a delayed deletion process. In short, this process would be very brittle and prone to failure.

If you’re trying to save on disk space, I think the best way would be to have different buckets with different retention periods, each meant for different granularities. For your downsampling task, you could process all the different granularities at the same time and write them to the appropriate bucket.

For example, lets assume we have the following buckets with their respective retention periods:

  • sensors_raw with 1h retention period
  • sensors_downsampled_10s with 1h retention period
  • sensors_downsampled_1m with 90d retention period

Your downsampling task would look something like this

option task = {name: "test", every: 1m}

// Define bucket variables
raw = "sensors_raw"
ds1 = "sensors_downsampled_10s"
ds2 = "sensors_downsampled_1m"

data_ds1 =
    from(bucket: raw)
        |> range(start: -1m, stop: now())
        |> aggregateWindow(every: 10s, fn: mean)

data_ds2 =
    from(bucket: ds1)
        |> range(start: -2m, stop: -1m)
        |> aggregateWindow(every: 1m, fn: mean)

data_ds1 |> to(bucket: ds1)
data_ds2 |> to(bucket: ds2)

Then to query all the different granularities, you can defined time ranges to query from each bucket based on your query’s start and stop time then union all the different streams together:

import "date"

// Define bucket variables
raw = "sensors_raw"
ds1 = "sensors_downsampled_10s"
ds2 = "sensors_downsampled_1m"

startTime = date.time(t: -1h)
stopTime = date.time(t: now())

timeRanges = {
    raw: {start: date.add(d: -10s, to: startTime), stop: stopTime},
    ds1: {start: date.add(d: -1m, to: startTime), stop: date.add(d: -10s, to: stopTime)},
    ds2: {start: startTime, stop: date.add(d: -1m, to: stopTime)},
}

data_raw =
    from(bucket: raw)
        |> range(start: timeRanges.raw.start, stop: timeRanges.raw.stop)

data_ds1 =
    from(bucket: ds1)
        |> range(start: timeRanges.ds1.start, stop: timeRanges.ds1.stop)

data_ds2 =
    from(bucket: ds2)
        |> range(start: timeRanges.ds2.start, stop: timeRanges.ds2.stop)

union(tables: [data_raw, data_ds1, data_ds2])

Really nice.
Thanks for the explanation