How to merge monitor.stateChanges and aggregateWindow

I could query stateChanges or aggregateWindow separately.
But i have no idea to query stateChanges and count by every: 1h, period: 1d.
Expect output : count the sensor stateChange by hour per day.
thanks.

@Erikson You could define a custom aggregate/reducer function to do this. It would essentially check the state from the previous row against the state of the previous row and increment the count if they’re different. It would look something like this:

countStateChanges = (tables=<-) => tables
    |> reduce(
        identity: {_prevState: "", count: 0},
        fn: (r, accumulator) => ({
            _prevState: r.state,
            count: if r.state == accumulator._prevState then accumulator.count else accumulator.count + 1,
        })
    )
    |> drop(columns: ["prevState"])
    |> rename(columns: {count: "_value"})

data
    |> aggregateWindow(every: 1h, period: 1d, fn: (tables=<-, column) => tables |> countStateChanges())

Note: In the reduce function, replace r.state with the name of the column you’re storing your state in, for example: r.your_state_column.

Thanks @scott
I try the script bellow and it works for me.
Are there any performance issues using custom function and monitor.stateChanges ?

import “influxdata/influxdb/monitor”
import “influxdata/influxdb/schema”
from(bucket: “Mybucket”)
|> range(start: 2021-12-22T00:00:00Z, stop: 2021-12-22T23:59:59Z)
|> filter(fn: (r) => r["_measurement"] == “Sensor”)
|> filter(fn: (r) => r[“SensorId”] == “sensor0”)
|> rename(columns: {“MyLevel”: “_level”})
|> monitor.stateChanges(fromLevel: “0”, toLevel: “1”)
|> aggregateWindow(every: 1h, fn: count,column: “MyState”, createEmpty: true)

You’d have to actually test it, but I actually think in this case, using the custom function would be faster. In your current query, once you use the rename() function, it has to pull all the data returned from filter into memory where it then performs rename() |> monitor.stateChanges() |> aggregateWindow.

With the custom function, you can push all the operations down to the storage teir (where they run MUCH faster) until it actually needs to apply the custom aggregate to the data. In this case, that happens after the data gets windowed. Try running this and see if it’s more performant:

countStateChanges = (tables=<-) => tables
    |> reduce(
        identity: {_prevState: "", count: -1},
        fn: (r, accumulator) => ({
            _prevState: r.MyLevel,
            count: if r.MyLevel == accumulator._prevState then accumulator.count else accumulator.count + 1,
        })
    )
    |> drop(columns: ["prevState"])
    |> rename(columns: {count: "_value"})

from(bucket: "Mybucket")
    |> range(start: 2021-12-22T00:00:00Z, stop: 2021-12-22T23:59:59Z)
    |> filter(fn: (r) => r["_measurement"] == "Sensor")
    |> filter(fn: (r) => r["SensorId"] == "sensor0")
    |> aggregateWindow(every: 1h, period: 1d, fn: (tables=<-, column) => tables |> countStateChanges())

Hi @scott , thank you for your valuable advice. I got it and will test it.

Hi, @scott . It do faster. And I don’t add the period parameter, because of the error message “unexpected argument period”. Maybe I misunderstand the usage. We expect if we give a range 7 days and we got 7 tables(window) with 24h data per day.

Hi @scott ,
The identity _preState will reset to “” every 1h, right ? It will cause error count.
Is there any way to refer the pre-hour value ?
Thanks.

There isn’t a way to refer to the pre-hour value outside of each window. What error are you getting? What version of InfluxDB/Flux are you using?

Hi @scott ,

InfluxDB 2.0.7
testing by Data Expolrer , c#

original data
_time,sensor,state
2022-01-03T19:01:06.927720627Z,Sensor1,1
2022-01-03T21:01:13.075139235Z,Sensor1,1
2022-01-03T21:10:12.425344976Z,Sensor1,1
2022-01-03T22:36:34.043182427Z,Sensor1,1
2022-01-03T22:48:43.080439153Z,Sensor1,1
2022-01-03T22:55:09.177597618Z,Sensor1,1

output
sensor,stateChangeCount,_time
Sensor1,1,2022-01-03T20:00:00Z
Sensor1,0,2022-01-03T21:00:00Z
Sensor1,1,2022-01-03T22:00:00Z
Sensor1,1,2022-01-03T23:00:00Z

expect
sensor,stateChangeCount,_time
Sensor1,1,2022-01-03T20:00:00Z
Sensor1,0,2022-01-03T21:00:00Z
Sensor1,0,2022-01-03T22:00:00Z
Sensor1,0,2022-01-03T23:00:00Z