Is it possible to do stateful aggregations with influx 2.0 and flux?

Hi! :wave:

I’m measuring our energy consumption and supply, store it in influxdb (latest, 2.0.4) and visualize it using Grafana.

Now I want to simulate a battery, for which I need a “cumulativeSum with constraints (battery full/empty)”. I’m basically stuck with these two issues:

It seems to be impossible to do a stateful aggregation with flux out-of-the-box. Fine with me, as long as I can hack around that.

I tried the following (full example):

consumed = from(bucket: "default")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.phase == "Totals" and r._name == "sma_hm_measurements_pconsume_actual")
|> aggregateWindow(every: 1m, fn: mean)
|> keep(columns:["_field", "_time", "_value"])

supplied = from(bucket: "default")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.phase == "Totals" and r._name == "sma_hm_measurements_psupply_actual")
|> aggregateWindow(every: 1m, fn: mean)
|> keep(columns:["_field", "_time", "_value"])

joined = join(
    tables: {consumed:consumed, supplied:supplied},
    on: ["_time"]
)
|> map(fn: (r) => {
    return ({
        _time: r._time,
        _value: r._value,
        index: 1
    })
})
|> cumulativeSum(columns: ["index"])

joined
|> map(fn: (r) => {
    // questionable part start
    previousIndexRaw = r.index - 2
    previousIndex = if previousIndexRaw <= 0 then 0 else previousIndexRaw
    previousR = joined
        |> getRecord(idx: previousIndex)
    previousValue = previousR._value
    // questionable part end

    diff = r._value_supplied - r._value_consumed
    diffScaled = diff / 60.0

    newValue = previousValue + diffScaled
    newValueClamped = if newValue >= 20000.0 then 20000.0 else if newValue <= 0.0 then 0.0 else newValue

    return ({
        _time: r._time,
        _value: newValueClamped
    })
})
|> cumulativeSum()
|> aggregateWindow(every: v.windowPeriod, fn: mean)
|> set(key: "_field", value: "Battery")
|> yield()

This gives the following error:
invalid: runtime error @27:4-46:3: map: failed to evaluate map function: unexpected type for table: want {schema: [{grouped: bool | label: string | type: string | t0}]}, got [t5689]

Can someone help me understand why? :thinking:
Thx!