I tried really hard to achieve this output with Flux without succeeding.
My initial intuition is to try to group consecutive records in a given state.
But I can’t find a way. Maybe my approch is wrong.
Is it possible to achieve this output with Flux.
The calculated duration records would reside in another measurement which could be joined on the time with the original measurement records. But that would result into:
_time
_value
_tag
duration
1
1
B
0
4
0
A
1
6
1
B
3
7
0
A
2
As you can see the duration is added to the next node.
Using the function stateDuration would be another option (sorry I’m not allowed to post more links).
But this node requires that you have at least two consecutive records for every point to calculate the duration, otherwise the duration is -1.
Maybe someone has an idea how to add the duration calculated using the elapsed function and the merge to the first of the compared records?
Thank you @sjahreis,
The main difficulty is grouping consecutive records. And yours seems to be a very nice solution for this. I’ll will give a try.
I’m still curious if this is possible with only using Flux.
I believe I found an important logic flaw in my question.
Example input data with two states (1/2 or A/B) :
_time
_value
_tag
1
1
B
2
0
A
3
0
A
4
0
A
5
1
B
6
1
B
7
0
A
I realized that the target output I mentioned in my initial post doesn’t represent the information I have been looking after. In fact this output itself doesn’t represent any accurate and meaningful data.
Looking at more carefully to the input data, it can be observed that the last time point of a state(_tag) before it changes to another state is not the timestamp of the last row/record with this state(_tag).
For example; the first “A” state in this time range starts at timestamp 2 but doesn’t end at the row with timestamp 4. This “A” state in time extends up to the timestamp 5. Timestamp 5 is the time point where state “A” ends and state “B” gets started.
So within an inclusive time range of 1 to 7, I believe a target output like this would be more meaningful.
Target output:
_time
_value
_tag
duration
2
1
B
1
5
0
A
3
7
1
B
2
What do you think?
Is this approach a more accurate way of expressing state duration?
Any ideas?
Combining duplicate/difference with a map and a cumulative sum can generate a state column that increments any time a change occurs in either of multiple fields A and B, grouping if desired by another tag column that could signify something like a plant Identifier.If available in your installation, you could use the contributed package tomhollingworth/events to get duration of each state.
import "influxdata/influxdb/v1"
import "contrib/tomhollingworth/events"
from(bucket: "db/autogen")
|> range(start: -24h)
// Select a measure of interest
|> filter(fn: (r) => r._measurement == "m1")
// Select our fields of interest
|> filter(fn: (r) => r._field == "A" or
r._field == "B")
// group by a tag column for Plant ID
|> group(columns: ["plantId"])
// Pivot so we can operate on fields
|> v1.fieldsAsCols()
// Make place-holder columns to calculate changes in A or B
|> duplicate(column: "A", as: "A_diff")
|> duplicate(column: "B", as: "B_diff")
// Calculate changes
|> difference(columns: ["A_diff",
"B_diff"], keepFirst: true)
// Fill the first value with a 1.0 to signify a new state at the beginning of series
|> fill(column: "A_diff", value: 1.0)
|> fill(column: "B_diff", value: 1.0)
// Flag rows where either the value of either A or B changed
|> map(fn: (r) => ({r with stateChange: if (r.A_diff != 0 or
r.B_diff != 0) then 1 else 0}))
// Generate a run length identifier for each new state
|> cumulativeSum(columns: ["stateChange"])
// Group on that run length identifier
|> group(columns: ["plantId","stateChange"])
// Get the first value of each state grouping
|> limit(n: 1)
// Regroup on only the tag column to calculate duration of each state
|> group(columns: ["plantId"])
// Calculate a duration in each state
|> events.duration()
// Drop grouping (if desired)
|> group()
Also, here is an option that does not require any contributed packages. However, due to the number of maps involved here, I can’t guarantee it will be performant enough to use in a production environment, this would really be better implemented in a compiled go function…
import "influxdata/influxdb/v1"
from(bucket: "db/autogen")
|> range(start: -24h)
// Select a measure of interest
|> filter(fn: (r) => r._measurement == "m1")
// Select our fields of interest
|> filter(fn: (r) => r._field == "A" or
r._field == "B")
// group by a tag column for Plant ID
|> group(columns: ["plantId"])
// Pivot so we can operate on fields
|> v1.fieldsAsCols()
// Make place-holder columns to calculate changes in A or B
|> duplicate(column: "A", as: "A_diff")
|> duplicate(column: "B", as: "B_diff")
// Calculate changes
|> difference(columns: ["A_diff",
"B_diff"], keepFirst: true)
// Fill the first value with a 1.0 to signify a new state at the beginning of series
|> fill(column: "A_diff", value: 1.0)
|> fill(column: "B_diff", value: 1.0)
// Flag rows where either the value of either A or B changed
|> map(fn: (r) => ({r with stateChange: if (r.A_diff != 0 or
r.B_diff != 0) then 1 else 0}))
// Generate a run length identifier for each new state
|> cumulativeSum(columns: ["stateChange"])
// Group on that run length identifier
|> group(columns: ["plantId","stateChange"])
// Get the first value of each state grouping
|> limit(n: 1)
// Regroup on only the tag column to calculate duration of each state
|> group(columns: ["plantId"])
// Sort backwards so that we get the time in state, not time since change
|> sort(columns: ["_time"], desc: true)
// Get a float value for time and convert from nanoseconds to seconds
|> map(fn: (r) => ({r with stateDuration: float(v: uint(v: r._time))/float(v: "1e9")}))
// Calculate time until next change
|> difference(columns: ["stateDuration", keepFirst: true])
// Fill most recent row with start time relative to now
|> map(fn: (r) => ({r with duration: if exists r.duration then - r.duration else float(v: uint(v: now()) - uint(v: r._time))/float(v: "1e9")}))
// Sort back to time order
|> sort(columns: ["_time"], desc: false)
// Drop grouping (if desired)
|> group()