I am coming from InlfuxDB 1.8 and try learn Flux to develop tasks that replace my continuous queries. I am still writing the data through the v1 compatibility interface.
One of my challenges is with a measurement that required several nested queries with InfluxQL to aggregate the data. I am now trying to do that in Flux and I believe I can do that by splitting the columns in two different data pipelines.
data = from(bucket: "zvm")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sytcup")
|> group(columns: ["cpu", "_value"], mode: "except")
|> drop(columns: ["cpu"])
data1 = data
|> filter(fn: (r) => r["_field"] =~ /.*_meter$/)
|> sum()
data2 = data
|> filter(fn: (r) => r["_field"] !~ /.*_meter$/)
|> max()
How do I join those two sets of tables again for further processing? Even when I specify all the tags in the “on” parameter, I get those all doubled for each of the input tables, which is a royal mess.
The alternative is to duplicate the further processing and write the aggregates separately.
-Rob
Hello @rvdheij,
You have a couple options here.
-
when you perform a join, join on all like columns to reduce the duplicates (so for instance join on “_time”, “_measurement”, “cpu”, and any other tags that have the same value across data1 and data2.
-
use the reduce function like so (I included an example with more than just the sum and max cuz more is better right?):
countMinMaxMean = (tables=<-) =>
tables
|> reduce(
identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
sum: r._value + accumulator.sum,
min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
})
)
data |> countMinMaxMean()
I also included the function in a custom function, but this is optional.
Let me know if that makes sense.
Here is the documentation for the reduce() function:
Thank you, Anais.
I will have to try (1) again. I thought I tried with all the tags in the “on” argument, but ended up with all those sparse columns.
Your (2) solution sounds interesting, but I need to think. I find the learning curve rather steep with Flux. But thinking about reduce() makes me realize that my max(sum()) could just as well have been sum(max()) when the keys are complete. The max() is because I may have the same data reported by different end points, and I don’t want to double that in the aggregation. -Rob
@rvdheij,
Yes I completely agree, the learning curve with Flux is steep especially if you come from a SQL background. But now that I’m used to it i hate the idea of going back to nested queries, so I suppose it comes down to practice and familiarity. The plus is that there’s wayyy more you can do with Flux than InfluxQL. Please let me know if you have any more specific questions!
PS if you come from a SQL background then this might be helpful:
Also, what are you using InfluxDB for? I love learning about community use cases. Thanks!
@Anaisdg Thank you, I’ll study your blog in more detail. I also wish I had found the pointer to https://awesome.influxdata.com/ sooner…
The InfluxDB Query Builder in Grafana is easy enough to get you started, and encouraging enough to dig deeper for things that don’t fit the pattern. The support for Flux doesn’t get close to that level, at least I’ve not been able to get a non-trivial chart out of it. That, and the initial incompatibility between v1 and v2 made me ignore Flux for a long time.
I might be able to share details about our use case in the near future. I’m sure you can find my address when you can’t wait - Rob