Calculate stateDuration based on conditional logic of another column

I’m trying to count the duration of a fault, but only if the fault is previously in a specific state.

First, I used the timeShift() to find the previous state in a simple table like this.

How would I write this stateDuration function to count the yellow faults duration if previous state = “RunPQ-Auto” (in green). Is there a way to do this? Thank you so much!! @Anaisdg @scott

@Marion_Akagi I’m not sure there’s a way to do this with Flux as it currently stands. The tricky part is grouping by “fault cycles” and then only computing the duration of relevant fault cycles. There’s a proposed feature for Flux that, if added, this would be totally doable.

Unfortunately, I don’t when or if this will ever be added :frowning:.

1 Like

Ahh gotcha, thank you for letting me know! I have been trying to achieve this for 2 full days now so I will take a break. Thank you so much.

This is as close as I can get, which will prob work once I put the query in Grafana and can filter out some rows. Actual_new and prev_record_new and downtime minutes are the only columns I need (the other two are there b/c I had to do an aggregate window and couldn’t do that based on strings). So, of these three rows, I would keep only the second one. I can see what the prev value was (RunPQ-Auto) and the current fault value and the downtime. Looking forward to a more straight forward way to do this in the future.

Here’s my super messy flux that I will try to optimize!

//import "experimental/query"
import "influxdata/influxdb/monitor"
//import "contrib/tomhollingworth/events"
import "date"
import "strings"
import "join"

left = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat", "_time"])
  |> map(fn: (r) => ({ r with _value: if r.concat == "RunPQ-Auto" then 1.0 else if r.concat =~/Fault/ then 2.0 else 0.0 }))
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: true) 
  |> fill(value: 0.0)
  |> sort(columns: ["_time"], desc: false)
  |> rename(columns: {_value: "actual"})
  |> keep(columns: ["_time", "actual"])

right = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat2", "_time"])
  |> map(fn: (r) => ({ r with _value: if r.concat2 == "RunPQ-Auto" then 1.0 else if r.concat2 =~/Fault/ then 2.0 else 0.0 }))
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: true)
  |> fill(value: 0.0)
  |> sort(columns: ["_time"], desc: false)
  |> keep(columns: ["_time", "_value"])
  |> timeShift(duration: 1m, columns: ["_time"]) 

join.time(method: "full", left: left, right: right, as: (l, r) => ({l with prev_record: r._value}))


 |> map(fn: (r) => ({ r with _level: 
  if r.prev_record == 1.0 and r.actual == 2.0 then "ok" 
  else if r.prev_record == 2.0 and r.actual == 2.0 then "ok"
  else if r.prev_record == 0.0 and r.actual == 2.0 then "ok"
  else "info",}),)


  |> map(fn: (r) => ({ r with actual_new: if r.actual == 1.0 then "RunPQ-Auto" else if r.actual == 2.0 then "Fault" else "NA" }))
  |> map(fn: (r) => ({ r with prev_record_new: if r.prev_record == 1.0 then "RunPQ-Auto" else if r.prev_record == 2.0 then "Fault" else "NA" }))

  |> sort(columns: ["_time"], desc: false)
  |> monitor.stateChangesOnly()
  //|> group()
  |> sort(columns: ["_time"], desc: false)
  |> elapsed(unit: 1m, columnName: "downtime (mins)")

Actually, looking at your data structure that you screenshotted in your original post, there might be a way to do it. I’d recommend using the events.duration() function.

  1. Filter by only records where the current state does not equal the previous state.
  2. Use events.duration() to calculate the duration between the state cycles.
  3. Filter by records where previous state is RunPQ-Auto and current state is some variant of FalutN.
import "contrib/tomhollingworth/events"

// data just represents the data from your screenshot
data
    |> filter(fn: (r) => r["current state"] != r["previous state"])
    |> events.duration(unit: 1m, stop: now())
    |> filter(fn: (r) => r["previous state"] == "RunPQ-Auto" and r["current state"] =~ /^Fault[0-9]+$/

Ooh! Let me try that, thank you! I didn’t know I could filter that way. Thank you @scott

@scott Interesting, so that filters down to the correct rows I need but it’s not calculating the duration correctly. Is it possible instead to grab the start and stop timestamps of the faults and then calculate the duration? The issue now is the events.duration is calculating the minutes until the next row starts. But we have the right rows in the results so that’s a start!

Can you provide your full query?

Sure! Here it is, and this is the output below also.

import "experimental/query"
import "influxdata/influxdb/monitor"
import "contrib/tomhollingworth/events"
import "date"
import "strings"
import "join"

//left side of join which outputs _time, and a field called "Actual", which is a concatenation of InvStatus and RunMode
left = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> truncateTimeColumn(unit: 1m)
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat", "_time"])
  |> sort(columns: ["_time"], desc: false)
  |> rename(columns: {concat: "_value"}) 
  |> rename(columns: {_value: "actual"}) 

//right side of join which is the same as left, except time shifted 1min to get 'prev_record' on the same row as 'actual'
  right = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> truncateTimeColumn(unit: 1m)
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat2", "_time"])
  |> sort(columns: ["_time"], desc: false)
  |> timeShift(duration: 1m, columns: ["_time"])  
  |> rename(columns: {concat2: "_value"}) 

//join left and right
join.time(method: "full", left: left, right: right, as: (l, r) => ({l with prev_record: r._value}))

//this is the query I addded per your earlier comment.  Like I mentioned, the duration output is the whole time between rows, not just the fault time
    |> filter(fn: (r) => r["prev_record"] == "RunPQ-Auto" and r["actual"] =~/Fault/)
    |> events.duration(unit: 1m, stop: now())

This is the output of the query above:

This below is the duration I’m trying to get to from the first row in the above table. Right now, the duration lasts until the second record in the above table (yellow arrow).

I know this super hard but it’s super important for our upgrade to our new software system. Thank you for your help!

Try this:

import "experimental/query"
import "influxdata/influxdb/monitor"
import "contrib/tomhollingworth/events"
import "date"
import "strings"
import "join"

//left side of join which outputs _time, and a field called "Actual", which is a concatenation of InvStatus and RunMode
left = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> truncateTimeColumn(unit: 1m)
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat", "_time"])
  |> sort(columns: ["_time"], desc: false)
  |> rename(columns: {concat: "_value"}) 
  |> rename(columns: {_value: "actual"}) 

//right side of join which is the same as left, except time shifted 1min to get 'prev_record' on the same row as 'actual'
  right = from(bucket: "test-fault-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "unit")
  |> filter(fn: (r) => r["point_name"] == "InvStatus" or r["point_name"] == "RunMode")
  |> filter(fn: (r) => r.unit_controller_id == "0215D26C-10")
  |> truncateTimeColumn(unit: 1m)
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode })) 
  |> map(fn: (r) => ({ r with string: string(v: r.point_name) }))
  |> group()
  |> pivot(rowKey:["_time"], columnKey: ["string"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with concat2: r.InvStatus+"-"+r.RunMode }))  
  |> keep(columns: ["concat2", "_time"])
  |> sort(columns: ["_time"], desc: false)
  |> timeShift(duration: 1m, columns: ["_time"])  
  |> rename(columns: {concat2: "_value"}) 

//join left and right
join.time(method: "full", left: left, right: right, as: (l, r) => ({l with prev_record: r._value}))

//This filters down to only rows where `prev_record` and `actual` don't match, calculates the
//duration of those states, then filters out all of the calculated durations that don't match `/Fault/`.
    |> filter(fn: (r) => r["prev_record"] != r["actual"])
    |> events.duration(unit: 1m, stop: now())
    |> filter(fn: (r) => r["prev_record"] == "RunPQ-Auto" and r["actual"] =~/Fault/)

@scott That works! So amazing! Thank you so much for working through this with me!

No problem! Happy to help.