I have data being stored with microsecond precision, but when I aggregate the data it seems to use the timestamp of the batch write rather than the individual rows. This happens when using both flux and InfluxQL.
If I grab individual rows, the timestamps look good, roughly every us.
from(bucket: "flormo")
|> range(start: 2023-10-12T14:12:31.644Z, stop: 2023-10-12T14:12:42.741Z)
|> filter(fn: (r) => r["_measurement"] == "input_raw")
|> filter(fn: (r) => r["_field"] == "value")
|> keep(columns: ["_time", "_value", "channel"])
|> limit(n: 5)
// Result: _result
// Table: keys: [channel]
// channel:string _time:time _value:float
// ---------------------- ------------------------------ ----------------------------
// 0 2023-10-12T14:12:31.708456000Z 0.00042799814
// 0 2023-10-12T14:12:31.708457000Z 0.00033371995
// 0 2023-10-12T14:12:31.708458000Z 0.00045473385
// 0 2023-10-12T14:12:31.708459000Z 0.00016864754
// 0 2023-10-12T14:12:31.708460000Z -0.000039179366
// Table: keys: [channel]
// channel:string _time:time _value:float
// ---------------------- ------------------------------ ----------------------------
// 1 2023-10-12T14:12:31.708456000Z 0.000617619
// 1 2023-10-12T14:12:31.708457000Z 0.00023457911
// 1 2023-10-12T14:12:31.708458000Z 0.00018064602
// 1 2023-10-12T14:12:31.708459000Z 0.00007176242
// 1 2023-10-12T14:12:31.708460000Z -0.00011675003
The total count of rows also looks good
from(bucket: "flormo")
|> range(start: 2023-10-12T14:12:31.644Z, stop: 2023-10-12T14:12:42.741Z)
|> filter(fn: (r) => r["_measurement"] == "input_raw")
|> filter(fn: (r) => r["_field"] == "value")
|> keep(columns: ["_time", "_value", "channel"])
|> count()
|> group()
// channel:string _value:int
// ---------------------- --------------------------
// 0 11223
// 1 11223
However, once I try to aggregate the data to 1ms, it goes wonky.
from(bucket: "flormo")
|> range(start: 2023-10-12T14:12:31.644Z, stop: 2023-10-12T14:12:42.741Z)
|> filter(fn: (r) => r["_measurement"] == "input_raw")
|> filter(fn: (r) => r["_field"] == "value")
|> keep(columns: ["_time", "_value", "channel"])
|> aggregateWindow(every: 1ms, fn: mean, createEmpty: false)
|> drop(columns: ["_start", "_stop"])
|> limit(n: 5)
// channel:string _time:time _value:float
//---------------------- ------------------------------ ----------------------------
// 0 2023-10-12T14:12:31.709000000Z 0.000010776063363953497
// 0 2023-10-12T14:12:31.794000000Z -0.000011274311311627904
// 0 2023-10-12T14:12:31.880000000Z 0.000009607767158850571
// 0 2023-10-12T14:12:31.965000000Z -0.00000459027025813954
// 0 2023-10-12T14:12:32.050000000Z 0.0000038424563610465085
//Table: keys: [channel]
// channel:string _time:time _value:float
//---------------------- ------------------------------ ----------------------------
// 1 2023-10-12T14:12:31.709000000Z 0.000005660186117441864
// 1 2023-10-12T14:12:31.794000000Z -0.000008333603168604645
// 1 2023-10-12T14:12:31.880000000Z 0.000002323025393103451
// 1 2023-10-12T14:12:31.965000000Z 0.0000030712996627907035
// 1 2023-10-12T14:12:32.050000000Z 0.0000003046433469767358
If I don’t use createEmpty: false
then additional rows will be created at each ms, but they will have a value of 0.
Despite there being data points every us and being told to aggregate at the ms, the _time
of the output rows is every 86-87ms. I can actually see this clearly if I use count rather than mean.
from(bucket: "flormo")
|> range(start: 2023-10-12T14:12:31.644Z, stop: 2023-10-12T14:12:42.741Z)
|> filter(fn: (r) => r["_measurement"] == "input_raw")
|> filter(fn: (r) => r["_field"] == "value")
|> keep(columns: ["_time", "_value", "channel"])
|> aggregateWindow(every: 1ms, fn: count, createEmpty: false)
|> limit(n: 5)
|> drop(columns: ["_start", "_stop"])
// channel:string _time:time _value:int
// ---------------------- ------------------------------ --------------------------
// 0 2023-10-12T14:12:31.709000000Z 86
// 0 2023-10-12T14:12:31.794000000Z 86
// 0 2023-10-12T14:12:31.880000000Z 87
// 0 2023-10-12T14:12:31.965000000Z 86
// 0 2023-10-12T14:12:32.050000000Z 86
// Table: keys: [channel]
// channel:string _time:time _value:int
// ---------------------- ------------------------------ --------------------------
// 1 2023-10-12T14:12:31.709000000Z 86
// 1 2023-10-12T14:12:31.794000000Z 86
// 1 2023-10-12T14:12:31.880000000Z 87
// 1 2023-10-12T14:12:31.965000000Z 86
// 1 2023-10-12T14:12:32.050000000Z 86
The same grouping happens if I use InfluxQL as well:
select mean(value) from "input_raw" where time >= 1697119951644000000 and time <= 1697119962741000000 group by time(1ms), "channel"::tag fill(none) limit 5
Name: input_raw
Tags: channel=0
┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ index ┃ time ┃ mean ┃
┣━━━━━━━╋━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╋━━━━━━━━━━━━━━━┫
┃ 1┃ 1697119951708000000.0000000000┃ 0.0000107761┃
┃ 2┃ 1697119951792999936.0000000000┃ -0.0000112743┃
┃ 3┃ 1697119951879000064.0000000000┃ 0.0000096078┃
┃ 4┃ 1697119951964000000.0000000000┃ -0.0000045903┃
┃ 5┃ 1697119952048999936.0000000000┃ 0.0000038425┃
┣━━━━━━━┻━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━┫
┃ 3 Columns, 5 Rows, Page 1/1┃
┃ Table 1/2, Statement 1/1┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Given the output from using count with aggregateWindow, it seems like the data is correctly being aggregated to 1ms, but then there is another aggregation happening on top of it somehow.
Is there something I’m missing?