Flux query over multiple downsampled buckets with different retention policies

Hi,

Assume that a sensor reports a value every second which is stored in a bucket with a retention policy of one day.
Then there is a continuous query accumulating that data in hour intervals and writing it to a second bucket with, for simplicity, an infinite retention policy.

Now assume that I want to have the data from the last two days.
Is there any way to have a single flux query that returns the low resolution data from the downsampled bucket for day -2 to -1 and the high resolution data from day -1 to now?

I only have found solutions discussing this with influxdb v1.x, which seem not directly be applicable to v2?

Hello @cinhcet,
Welcome!
I believe so, if I’m understanding you correctly–though I fear I might not be since the solution seems a little to obvious to me.
I would employ two queries.
Something like:

//for downsampled data 
from(bucket: "low resolution data bucket")
|> range(start: -2d, stop: -1d)
// for high precision data
from(bucket: "raw data bucket")
|> range(start: -1d,  stop: now())

Can you share the solutions in 1.x that you are referring to so that I can help you better?

Thank you :slight_smile:

thanks for the reply.

Sure, in my example where it is clear what the boundaries of the retention policies are, your solution works.
Let us say we want to make a query where the range might go over multiple retention policies but does not start as nicely at well defined borders.
Then one needs to find out from which buckets to choose and then also the ranges, like in this image

taken from https://github.com/grafana/grafana/issues/4262, where solutions are discussed for 1.x (I have not tried them)

The simple workaround is to calculate the ranges for the different buckets on the client side, which is fine, but not general.

I recently found out that you can do time arithmetic within flux by converting time to an integer, so it should be possible to do that also within one flux query.

Hello @cinhcet,
I apologize. I’m still having trouble understanding your problem. Reading through the issue, it seems that users are wanting retention policies to be automatically rolled up based on queries in a dashboard.

My confusion stems from that image you posted. In that example, what data is the user trying to select? The Dashboard timepicker range selects for a duration that doesn’t match the duration of any of types of data. Is the assumption that all of that data (coarse, medium, and fine) exists in one bucket?

Thank you for helping me understand.

A sensor generates data every second. This data is stored in bucket_1.
Then a task downsamples this data by averaging into buckets of 1 hour, this is stored in bucket_2.
Repeat, we have let us say 5 buckets, bucket_i, for i = 1,…, 5. Each of these buckets have a different, increasing retention policy.

Now I want to get the data of that sensor for a specific time range and always get the highest “precision” or data frequency that is stored. This means I need to query multiple buckets and only get those parts of each bucket that does not overlap with a bucket of higher precision.

In the image, you want to query data from __from to __to, where the data from the bucket Medium, 1 week should be taken from __from until data exists from the bucket `Fine, 1 day`. From this timestamp, you want to get the data from bucket `Fine, 1 day` until __to.

Does it now make sense?

@cinhcet I think the problem you’re going to face here is efficiency. Querying that many buckets is definitely possible, but they all have to essentially be separate queries that then get unioned together. All that is fairly simple. Where it gets tricky (and likely very inefficient) is when you try to filter out overlapping data. In order to do that, you’d have to detect the earliest timestamp from each bucket, then filter out data newer than that timestamp in the other buckets. These essentially have to be run as separate queries as well.

findFirstTime = (data) => {
  firstTime = data
    |> first()
    |> keep(columns: ["_time"])
    |> tableFind(fn: (key) => true)
    |> getRecord(idx: 0)
  return firstTime._time
}

predicate = (r) => // some filter predicate

b1 = from(bucket: "bucket_1")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn:  predicate)

b2 = from(bucket: "bucket_2")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn:  predicate)

b3 = from(bucket: "bucket_3")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn:  predicate)

b4 = from(bucket: "bucket_4")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn:  predicate)

b5 = from(bucket: "bucket_5")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn:  predicate)

b1_first = findFirstTime(data: b1)
b2_first = findFirstTime(data: b2)
b3_first = findFirstTime(data: b3)
b4_first = findFirstTime(data: b4)

b2_noOverlap = b2 |> range(start: v.timeRangeStart, stop: b1_first)
b3_noOverlap = b3 |> range(start: v.timeRangeStart, stop: b2_first)
b4_noOverlap = b4 |> range(start: v.timeRangeStart, stop: b3_first)
b5_noOverlap = b5 |> range(start: v.timeRangeStart, stop: b4_first)

union(tables: [b1,b2_noOverlap,b3_noOverlap,b4_noOverlap,b5_noOverlap])

Disclaimer: This is somewhat untested and has to potential to be a very heavy operation.

3 Likes

Hi,

Thanks for your answer!
I think that would do the job.
I have not tested it yet, but I wonder what for example b1_first is if there is no data within the range(start: v.timeRangeStart, stop: v.timeRangeStop) for b1.
Also one might can make it more efficient by not querying all buckets until timeRangeStop, but find the first time of the previous bucket first and then stop there.

Is what I want to do exotic? It seems to me that this is what one always wants to do if one has retention policies and downsampled data. Or not?
It just somehow does not fit to the paradigm of “bucket is a table plus retention policy”.

That’s a good point. You’d have to adjust the findFirstTime() function to return a default if nothing is found.

findFirstTime = (data, default=now()) => {
  firstTime = data
    |> first()
    |> keep(columns: ["_time"])
    |> tableFind(fn: (key) => true)
    |> getColumn(column: "_time")
  output = if length(arr: firstTime) == 0 then default else firstTime[0]
  return output
}

You’d then need to update the calls to that function:

b1_first = findFirstTime(data: b1, default: v.timeRangeStop)
b2_first = findFirstTime(data: b2, default: b1_first)
b3_first = findFirstTime(data: b3, default: b2_first)
b4_first = findFirstTime(data: b4, default: b3_first)

You could try that, but it seems like it would create a circular dependency between the bucket queries (b1, b2, etc.) and the _first variables (b1_first, b2_first, etc).

This is definitely not standard practice. Generally you query one bucket and your returned data is in a single precision. Another approach you may consider is determining which bucket/retention period to query based on the total range of the query. For example, if the queried range is greater than 24 hours, query bucket x. If it’s greater than 7 days, query bucket y, etc.

It shouldn’t be.

Exotic for Influxdb perhaps. :wink:

It’s the one feature from Graphite that I sorely miss in Influxdb, being able to transparently query across retention policies with one query and having it return data in whichever precision is available, without having to put enormous amounts of logic into every dashboard panel, or alternatively without having to resort to Munin-style fixed period dashboards.

@Tubemeister There have been updates to Flux since this post was originally written that make this a lot easier/simpler.

Assume I have a downsampling pipeline that takes data from my example bucket and downsamples the data into two other buckets with longer retention periods–example-1d and example-1w.
I can write a query that queries all three of these buckets and unions the results together:

import "array"

// Define query variables
bucket = "example"
start = -1y
stop = now()

// List of all associated buckets
allBuckets = [bucket, bucket + "-1d", bucket + "-1w"]

// Base query for each bucket
queryBucket = (bucket) =>
    from(bucket: bucket)
        |> range(start: start, stop: stop)
        |> filter(fn: (r) => r._measurement == "m")

// Iterate through the list of buckets and query each
allData = array.map(arr: allBuckets, fn: (x) => queryBucket(bucket: x))

// Union the results from all the queried buckets
union(tables: allData)
2 Likes

Sorry if this has been answered elsewhere, but I haven’t been able to find it. Your query helped me out a lot, but because of the way retention works, there is often an overlap in timestamps between the buckets that can cause illegibility on a line graph.

Is there a way to prioritize buckets? Ideally example + “-1w” wouldn’t output data past example + “-1d” first timestamp and example + “1d” wouldn’t output data past example’s first timestamp.

@John_3 There isn’t a way to prioritize buckets. You would have to explicitly define the time range to query for each bucket. You could actually do this in the allBuckets array and then update the allData variable to use the new structure of the array:

import "array"
import "date"

// Define query variables
bucket = "example"

// List of all associated buckets and time ranges
allBuckets = [
    {name: bucket, start: date.time(t: -1d), stop: now()},
    {name: bucket + "-1d", start: date.time(t: -1d), stop: date.time(t: -1w)},
    {name: bucket + "-1w", start: date.time(t: -1w), stop: date.time(t: -1y)},
]

// Base query for each bucket
queryBucket = (bucket, start, stop) =>
    from(bucket: bucket)
        |> range(start: start, stop: stop)
        |> filter(fn: (r) => r._measurement == "m")

// Iterate through the list of buckets and query each
allData = array.map(arr: allBuckets, fn: (x) => queryBucket(bucket: x.name, start: x.start, stop: x.stop))

// Union the results from all the queried buckets
union(tables: allData)
1 Like