Flux language join question

I put my running data into a influxdb2 and it has fields like: heartRate, distance etc. and a tag runName which is unique for every run. What I want as a result is what is the furthest distance I was able to run in 12 minutes, where the stddev of HR was below 5 for each run. (That is a metric that can be calculated to VO2Max).

The first three queries work as expected, but then I get a bit lost. First of all: Do I really need to make that three queries, or is there a smart way to re-use the heartRate window to yield avg and stddev at the same time?

bucket1 = from(bucket: "Running")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "FitnessPoint")
  |> filter(fn: (r) => r["_field"] == "heartRate")
  |> window(every: 30s, period: 12m)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
  |> yield(name: "hrMean")

bucket2 = from(bucket: "Running")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "FitnessPoint")
  |> filter(fn: (r) => r["_field"] == "heartRate")
  |> window(every: 30s, period: 12m)
  |> stddev()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
  |> yield(name: "hrStdDev")

bucket3 = from(bucket: "Running")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "FitnessPoint")
  |> filter(fn: (r) => r["_field"] == "distance")
  |> window(every: 30s, period: 12m)
  |> spread()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
  |> yield(name: "cooperDistance")

Then I try to join those three tables into one table and return the biggest distance where my stddev in HR was below 5, but somehow the data explorer seems to completly ignore everything below the first three results. Am I doing something wrong?

j1 = join(
  tables: {t1: bucket1, t2:bucket2},
  on: ["_time", "type", "runName"]
)

j2 = join(tables: {t1: j1, t2:bucket3}, on: ["_time", "type", "runName"])
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "FitnessPoint")
  |> filter(fn: (r) => r["hrStdDev"] <= 5)
  |> max(column: "cooperDistance")

Update: I got it almost working, but I encounter two problems. The second merge does not like it when I join on type and runName, although I can see them in the sub results, so this works:

j1 = join(
  tables: {hrMean: bucket1, hrStdDev:bucket2},
  on: ["_time", "_measurement", "_start", "_stop", "_field", "type", "runName"]
)
//  |> yield(name:"SubMerge")

j2 = join(tables: {t1: j1, t2:bucket3}, on: ["_time","_start", "_stop"])
  |> filter(fn: (r) => r["_value_hrStdDev"] <=5)
  |> duplicate(column: "_value", as: "_value_distance")
  |> max()
  |> yield(name:"Result")

The second problem is that when I try to drop some columns the InfluxDB process load increases to 100% and I don’t get any data back.

I really appreciate any help I can get here.

Hello @KarstenB,
Sorry for the delay.

Since the filter is at the top of the query, I think writing it the way you have makes sense. I would make the following changes to your flux query though:

bucket1 = from(bucket: "Running")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "FitnessPoint")
  |> filter(fn: (r) => r["_field"] == "heartRate")
  |> aggregateWindow(every: 30s, fn: mean)
// rather than duplicating and windowing with inf, use an empty group() to ungroup the tables
  |> group()

If you want get really fancy you could encapsulate all of that initial data preparation in a custom function to reuse it like you want:

  preProcessing = (tables=<-, filterfor="heartRate") => tables 
  |> filter(fn: (r) => r["_measurement"] == filterfor)
// please be generally aware of the optional createEmpty param for aggrageWindow
  |> aggregateWindow(every: 30s, fn: mean)
  |> group()

from(bucket: "Running")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> preProcessing(filterfor:"distance")

I don’t think that’s really necessary here, but it’s cool to know about. Please see these docs for how to write custom functions.

Can you please export some data to annotated csv? I think it might be easier for me to help you with the second part.

I’m thinking you might want to be using findRecord though to extract values and filter? Although I’m not entirely sure yet. findRecord() function | InfluxDB OSS 2.0 Documentation

Hello @KarstenB ,
Do you mind me asking what you’re doing with InfluxDB?
It looks pretty interesting!