Creating a moving average of the same metric with 3 lengths

Hello, I have a metric and I have been asked to do some calculations before producing the graphs, so I was thinking there is this cool Fluxlang, let’s use it. this is what I came out with

import "math"
import "experimental"

data = from(bucket: "latest")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "moving_averages")

meanData = data
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with _value: (r.fast + r.rapid) / 2 }))
  |> drop(columns: ["fast", "rapid", "slow", "standard"])

moving1 = meanData
  |> movingAverage(n: 15)

moving5 = meanData
  |> movingAverage(n: 30)

moving10 = meanData
  |> movingAverage(n: 60)

moving20 = meanData
  |> movingAverage(n: 120)

firstJoin = experimental.join(left: moving5, right: moving10, fn: (left, right) => ({
    left with
    moving5: left._value,
    moving10: right._value,
  }))

secondJoin = experimental.join(left: firstJoin, right: moving20, fn: (left, right) => ({
  left with
  moving5: left.moving5,
  moving10: left.moving10,
  moving20: right._value
}))

thirdJoin = experimental.join(left: secondJoin, right: moving1, fn: (left, right) => ({
  left with
  moving5: left.moving5,
  moving10: left.moving10,
  moving20: left.moving20,
  moving1: right._value
}))
  |> map(fn: (r) => {
    min1 = math.mMin(x: r.moving5, y: r.moving10)
    min2 = math.mMin(x: min1, y: r.moving20)
    min3 = math.mMin(x: min2, y: r.moving1)
    return { r with _value: min3 }
  })
  |> yield(name: "min")

meanData
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

The calculation is to sum up 2 values, then do some moving averages, then take out the min of them.

The query runs quite slow on my small VM, I think there must be some basic mistake that multiplies the row or something.
Help! thanks

Hello @colthreepv,
Your script looks good to me. I’m not sure how to optimize it further.
But are you aware of the flux profiler?
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/

How long is the query taking?
What execution time are you expecting?
How many rows of data are you returning?

I performed a simplified version of your query with two joins and everything ran well.

Thanks a lot for the time.
I thought I missed something obvious, seems I got the Flux part correct, but I took the wrong approach.

I just realized this query does this kind of computation for every point in the scanned result, that’s the problem. (It’s fine with a 5m window, it gets around 6s on 1hr, then goes to hell)

What I really wanted to achieve is for each point produced on that measurement, Add 4 moving averages
This is much easier to just ingest to InfluxDB via code, so the computation is done on a max of 120 points “per each point sent” (every 10s or so), much less computationally expensive than this query

Hello @colthreepv,
That’s what I was going to suggest next. That you perform this logic in a task and execute it on a schedule. Do you need any help creating a task?