Why during downsampling the start range duration must be doubled?

In the documentation examples (such as here), when extracting data (to be further downsampled) the start range is always doubled. Why?

Consider this example:

option task = {name: "downsample_task", every: 1h, offset: 5m}

from(bucket: "original_bucket")
	|> range(start: -duration(v: int(v: task.every) * 2))
	|> aggregateWindow(fn: mean, every: task.every, createEmpty: false)
	|> to(bucket: "destination_bucket_1h")

The task runs every 1h, but the start range is -2h. So if “now” - when the task runs - is 8:05 (due to the offset), the data window starts from 6:00 (-2h) to 8:05 and the aggregation should produce 3 results for each measurement, with the following _time:

  1. 7:00 (window from 6:00 to 7:00)
  2. 8:00 (window from 7:00 to 8:00)
  3. 8:05 (window from 8:00 to 8:05)

Then, during the next task run at 9:05, the aggregation retrieves the following _time:

  1. 8:00 (window from 7:00 to 8:00)
  2. 9:00 (window from 8:00 to 9:00)
  3. 9:05 (window from 9:00 to 9:05)

Is that correct? But then, what happens in destination_bucket_1h? Measurements [2] (8:00), [3] (8:05) and [4] (8:00) are consolidated? Or only [2] and [4] that have the same _time?

If not, is still correct to double the start time, or would be better to have:

from(bucket: "original_bucket")
	|> range(start: -task.every) // <--- LIKE THIS?
	|> aggregateWindow(fn: mean, every: task.every, createEmpty: false)
	|> to(bucket: "destination_bucket_1h")

Hello @virtualdj,
It’s generally better not to double the start time and have the latter.

It’s the official documentation that provides the doubled start time example:

|> range(start: -duration(v: int(v: task.every) * 2))   // <-- Two

… hence my question.

in this case the values will be overwritten with the most recent ones.

Yeah, I can confirm that: nothing changes in the results data while using start: -duration(v: int(v: task.every) * 2) or start: -task.every.

I made a test and would like to share the results, for anyone interested (now or in the future). Sorry for the long post but I think it might be useful!

Bash script to create sample data

I created a bash script that inserts an integer value that is incremented every time the script is run, in this way:

#!/bin/bash
# Set the InfluxDB host, port, and token
INFLUXDB_HOST="192.168.10.2"
INFLUXDB_PORT="8086"
INFLUXDB_ORG="org"
INFLUXDB_BUCKET="sample"
INFLUXDB_TOKEN="==replace=with=yours=="

# Read the current variable value (latest one)
cval=$(/sbin/curl -s \
  "http://$INFLUXDB_HOST:$INFLUXDB_PORT/api/v2/query?org=$INFLUXDB_ORG" \
  --header "Authorization: Token $INFLUXDB_TOKEN" \
  --header "Content-Type: application/vnd.flux" \
  --header "Accept: application/csv" \
  --data-binary "
  from(bucket: \"$INFLUXDB_BUCKET\")
  |> range(start: 0)
  |> filter(fn: (r) => r[\"_field\"] == \"value\")
  |> last()
  |> keep(columns: [\"_value\"])
  " \
  | /bin/awk 'BEGIN { FS=","; RS="\r\n" } /^,_result/{print $4; exit}')

# Print the data
/bin/echo "Current value = |$cval|"
cval=$((cval + 1))
/bin/echo "New value = |$cval|"

# Insert the new data into InfluxDB
/sbin/curl --request POST \
  "http://$INFLUXDB_HOST:$INFLUXDB_PORT/api/v2/write?org=$INFLUXDB_ORG&bucket=$INFLUXDB_BUCKET" \
  --header "Authorization: Token $INFLUXDB_TOKEN" \
  --header "Content-Type: text/plain; charset=utf-8" \
  --header "Accept: application/json" \
  --data-binary "
  test value=$cval
  "

I added a cron line to run this script every 30 minutes (actually at 0 and 30 minutes):

# m h dom m dow cmd
*/30 * * * * /influx-test.sh

InfluxDB task settings

Then I prepared the task in InfluxDB that downsamples the data from the original bucket sample (with the incremental values) into a new one named sample_1h:

// Task settings
option task = {name: "sample_1h", every: 1h, offset: 2m}

// Defines the data source
data =
    from(bucket: "sample")
        |> range(start: -task.every) // START WITH THIS
        //|> range(start: -duration(v: int(v: task.every) * 2)) // THEN THIS
        |> filter(fn: (r) => r._field == "value")

// Write aggregated data
data
    |> aggregateWindow(fn: mean, every: task.every, createEmpty: false)
    |> set(key: "agg_type", value: "mean")
    |> to(bucket: "sample_1h", org: "org")
data
    |> aggregateWindow(fn: min, every: task.every, createEmpty: false)
    |> set(key: "agg_type", value: "min")
    |> to(bucket: "sample_1h", org: "org")
data
    |> aggregateWindow(fn: max, every: task.every, createEmpty: false)
    |> set(key: "agg_type", value: "max")
    |> to(bucket: "sample_1h", org: "org")

Results

Simple range

Let’s start with the simple range(start: -task.every), you can query the original database with:

from(bucket: "sample")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "test")
  |> filter(fn: (r) => r["_field"] == "value")

and these are the results (look ath the one second delay in the _time column, due to the time the Bash script takes to read and reinsert the new value):

-------------------------------------------------------
_value	_time
-------------------------------------------------------
122     2023-07-22T06:00:01.898Z
123     2023-07-22T06:30:01.740Z
124     2023-07-22T07:00:01.633Z   // 1st
125     2023-07-22T07:30:01.388Z   // 1st
126     2023-07-22T08:00:01.508Z   // 2nd
127     2023-07-22T08:30:01.276Z   // 2nd
128     2023-07-22T09:00:01.818Z

Then I queried the downsampled database after the first and second task iteraction, with the following query:

from(bucket: "sample_1h")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "test")
  |> filter(fn: (r) => r["_field"] == "value")

In the first task iteraction, at about 08:03:00.000Z I got:

-------------------------------------------------------
_value	_time				        agg_type
-------------------------------------------------------
124	    2023-07-22T08:00:00.000Z	min	    // NEW
124.5	2023-07-22T08:00:00.000Z	mean	// NEW
125	    2023-07-22T08:00:00.000Z	max	    // NEW

In the second task iteraction, at 09:11:00.000Z I got:

-------------------------------------------------------
_value	_time				        agg_type
-------------------------------------------------------
124	    2023-07-22T08:00:00.000Z	min
126	    2023-07-22T09:00:00.000Z	min	    // NEW
124.5	2023-07-22T08:00:00.000Z	mean
126.5	2023-07-22T09:00:00.000Z	mean	// NEW
125	    2023-07-22T08:00:00.000Z	max
127	    2023-07-22T09:00:00.000Z	max	    // NEW

So everytime the task runs, 3 new lines are added and as the aggregation window is 1 hour (see: aggregateWindow(fn: mean, every: task.every, createEmpty: false) with task.every = 1h in the task) the min/mean/max values are calculated from the previous hour to the current hour.

You can notice that in the first iteraction the max value is 125 because the task runs at 2023-07-22T08:00:00.000Z and in the time window 2023-07-22T07:00:00.000Z - 2023-07-22T08:00:00.000Z (1 hour) 125 is the maximum value, as 126 is recorded 1 second later (at 2023-07-22T08:00:01.508Z).

Doubled duration range

After emptying the start_1h bucket and changing the task above with the double duration, i.e. like so:

// Defines the data source
data =
    from(bucket: "sample")
        |> range(start: -duration(v: int(v: task.every) * 2)) // THEN THIS
        |> filter(fn: (r) => r._field == "value")

and with this data in the sample bucket:

-------------------------------------------------------
_value	_time
-------------------------------------------------------
127	    2023-07-22T08:30:01.276Z
128	    2023-07-22T09:00:01.818Z   // 1st_1
129	    2023-07-22T09:30:01.682Z   // 1st_1
130	    2023-07-22T10:00:01.499Z   // 1st_2 / 2nd_1
131	    2023-07-22T10:30:01.215Z   // 1st_2 / 2nd_1
132	    2023-07-22T11:00:01.043Z   // 2nd_2
133	    2023-07-22T11:30:01.691Z   // 2nd_2

these are the results of the first task iteraction, at 11:34:00.000Z

-------------------------------------------------------
_value	_time				        agg_type
-------------------------------------------------------
128	    2023-07-22T10:00:00.000Z	min	    // NEW
130	    2023-07-22T11:00:00.000Z	min	    // NEW
128.5	2023-07-22T10:00:00.000Z	mean	// NEW
130.5	2023-07-22T11:00:00.000Z	mean	// NEW
129	    2023-07-22T10:00:00.000Z	max	    // NEW
131	    2023-07-22T11:00:00.000Z	max	    // NEW

In the second task iteraction, at 12:02:00.000Z I got:

-------------------------------------------------------
_value	_time				        agg_type
-------------------------------------------------------
128	    2023-07-22T10:00:00.000Z	min
130	    2023-07-22T11:00:00.000Z	min
132	    2023-07-22T12:00:00.000Z	min	    // NEW
128.5	2023-07-22T10:00:00.000Z	mean
130.5	2023-07-22T11:00:00.000Z	mean
132.5	2023-07-22T12:00:00.000Z	mean	// NEW
129	    2023-07-22T10:00:00.000Z	max
131	    2023-07-22T11:00:00.000Z	max
133	    2023-07-22T12:00:00.000Z	max	    // NEW

You can spot that the differences apply only in the first run of the task, as doubling the range creates 2 lines for each aggregation, meaning I had 6 new lines. But then, after the second run, always 3 lines are added (or the other 3 are overwritten, as @fercasjr wrote).

Anyway this doesn’t change the results, other that starting from 2 hours before the task first run.

Final considerations and TL;DR

Just stick with the simple:

// Defines the data source
data =
    from(bucket: "sample")
        |> range(start: -task.every) // <-- THAT'S OK!
        |> filter(fn: (r) => r._field == "value")

that is perfect for the goal! Hope this helps.