Creating a moving average of the same metric with 3 lengths

Hello, I have a metric and I have been asked to do some calculations before producing the graphs, so I was thinking there is this cool Fluxlang, let’s use it. this is what I came out with

import "math"
import "experimental"

data = from(bucket: "latest")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "moving_averages")

meanData = data
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with _value: (r.fast + r.rapid) / 2 }))
  |> drop(columns: ["fast", "rapid", "slow", "standard"])

moving1 = meanData
  |> movingAverage(n: 15)

moving5 = meanData
  |> movingAverage(n: 30)

moving10 = meanData
  |> movingAverage(n: 60)

moving20 = meanData
  |> movingAverage(n: 120)

firstJoin = experimental.join(left: moving5, right: moving10, fn: (left, right) => ({
    left with
    moving5: left._value,
    moving10: right._value,
  }))

secondJoin = experimental.join(left: firstJoin, right: moving20, fn: (left, right) => ({
  left with
  moving5: left.moving5,
  moving10: left.moving10,
  moving20: right._value
}))

thirdJoin = experimental.join(left: secondJoin, right: moving1, fn: (left, right) => ({
  left with
  moving5: left.moving5,
  moving10: left.moving10,
  moving20: left.moving20,
  moving1: right._value
}))
  |> map(fn: (r) => {
    min1 = math.mMin(x: r.moving5, y: r.moving10)
    min2 = math.mMin(x: min1, y: r.moving20)
    min3 = math.mMin(x: min2, y: r.moving1)
    return { r with _value: min3 }
  })
  |> yield(name: "min")

meanData
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

The calculation is to sum up 2 values, then do some moving averages, then take out the min of them.

The query runs quite slow on my small VM, I think there must be some basic mistake that multiplies the row or something.
Help! thanks

Hello @colthreepv,
Your script looks good to me. I’m not sure how to optimize it further.
But are you aware of the flux profiler?

How long is the query taking?
What execution time are you expecting?
How many rows of data are you returning?

I performed a simplified version of your query with two joins and everything ran well.

Thanks a lot for the time.
I thought I missed something obvious, seems I got the Flux part correct, but I took the wrong approach.

I just realized this query does this kind of computation for every point in the scanned result, that’s the problem. (It’s fine with a 5m window, it gets around 6s on 1hr, then goes to hell)

What I really wanted to achieve is for each point produced on that measurement, Add 4 moving averages
This is much easier to just ingest to InfluxDB via code, so the computation is done on a max of 120 points “per each point sent” (every 10s or so), much less computationally expensive than this query

Hello @colthreepv,
That’s what I was going to suggest next. That you perform this logic in a task and execute it on a schedule. Do you need any help creating a task?