I am trying to run large queries with window and aggregating (window and limit) in Flux with InfluxDB v1.8. To my understanding, when put after range() in my query, these functions are not working as pushdown functions, and when I query my database flux first pulls all values in the specified time range. This is causing my query to be extremely slow. Is there anyway to not pull all values from the database before aggregating in InfluxDB1.8? I am already using range, filter, and group. Any help to optimize would be appreciated.
Hi @annariley,
To confirm there really isn’t a notion of pushdown functions within 1.X for Flux. This was introduced in 2.X of InfluxDB. As @Pooh said we can at least try to simply the query for you.
Hi @annariley,
so it looks like you want to use pivot?
It seems you have made a lot of your fields generic so it is hard to tell if pivot will work? The join is your particular issue here. The limit is also interesting in one case you limit to 5 rows per window and 1 in the second table. Could use an aggregate window instead for the first table?
thanks @Jay_Clifford I’ll add some clarification to the vague titles. Would aggregate window be more efficient than the decontructed version? I want to limit them to the same number per window but the actual number is trivial. I don’t think pivot would work because I still want the values to be vertical but I want the values in both tables to relate to a single time column. (The goal is to plot the values with table1_value as the x axis and table2_value as the y axis.)
Ah sadly pivoting is not possible across measurements. Pivoting would retain your values to be verticle they would just be indicated as two columns against the same timestamp. I personally would do the following:
table1 = from (bucket:"bucket")
|>range(start: v.timeRangeStart, stop: v.timeRangeStop)
|>filter(fn: (r) => r._measurement == "m1" and r.node == "${Node}" and r.mode == "mode1" and r.path == "path1")
|>filter(fn: (r) => r["_field"] == "field")
|> aggregateWindow(every: v.windowPeriod , fn: first)
table2 = from (bucket:"bucket")
|>range(start: v.timeRangeStart, stop: v.timeRangeStop)
|>filter(fn: (r) => r._measurement == "m2" and r.node == "${Node}" and (r.path == "path2" or r.path == "path3") and r["_field"] == "mean")
|> aggregateWindow(every: v.windowPeriod , fn: first)
join(
tables: {table1:table1, table2:table2},
on: ["_time"]
)|> rename(columns: { _time: "_time_joined"})