Combine two Rows

Hi I am currently doing this in R but would like to know if there is a way for me to do it in Flux instead:

I have a time series that tracks a value and only stores when the signal is turned on and off. The problem is that the nature of the machine I am tracking only allows for it to be done this way. This results in a data table/measurement where two rows kind of display a single value (e.g. the start and end of a Fault). How do I query the data using flux to combine those two rows? (With “start” and “stop” as tags/fields)

I currently use the elapsed()-function to calculate the time difference/duration of my value

time                      value     field          measurement    equipmentNumber    workplace    duration
2021-01-29 07:11:17.496   1         FAULT_LASER    FAULT_LASER    L5211M0855         0            188
2021-01-29 07:12:03.332   0         FAULT_LASER    FAULT_LASER    L5211M0855         0            45835
2021-01-29 07:12:19.618   1         FAULT_LASER    FAULT_LASER    L5211M0855         0            16285
2021-01-29 07:12:19.618   0         FAULT_LASER    FAULT_LASER    L5211M0855         0            161725

Im am doing this in R at the moment:

for(i in 1:nrow(df_f)){
  if(df_f[i, "duration"] > 0){
    df_fdur[i, "start"] <- df_f[i, "time"]
    df_fdur[i, "stop"] <- df_f[i+1, "time"]
    df_fdur[i, "type"] <- df_f[i, "value"]
    df_fdur[i, "duration"] <- df_f[i, "duration"]
    df_fdur[i, "workplace"] <- df_f[i, "workplace"]
    df_fdur[i, "equipmentNumber"] <- df_f[i, "equipmentNumber"]
  }
}

Any ideas on how I can do that?

@strittmm The following query takes the sample data you provided and adds an event column based on the existing _value (1 = “start”, 0 = “stop”). It then creates two separate streams for start and stop events, uses cumulativeSum() to generate an event order (index), and then joins the two streams. After the join, it just cleans up the columns a little.

data = // Your query to get the data

events = data
  |> map(fn: (r) => ({ r with event: if r._value == 1 then "start" else "stop", index: 1 }))
  |> group(columns: ["event"])

startEvents = events |> filter(fn: (r) => r.event == "start") |> cumulativeSum(columns: ["index"])
stopEvents = events |> filter(fn: (r) => r.event == "stop") |> cumulativeSum(columns: ["index"])

join(tables: {start: startEvents, stop: stopEvents}, on: ["index", "equipmentNumber", "_measurement", "_field", "workplace"])
  |> drop(columns: ["event_start", "event_stop", "index", "_value_start", "_value_stop"])
  |> rename(columns: {duration_start: "start", duration_stop: "stop" 

Use the data sample you provided above, this query returns:

_field _measurement _time_start _time_stop start stop equipmentNumber workplace
FAULT_LASER FAULT_LASER 2021-01-29T07:11:17.496Z 2021-01-29T07:12:03.332Z 188 45835 L5211M0855 0
FAULT_LASER FAULT_LASER 2021-01-29T07:12:19.618Z 2021-01-29T07:12:19.618Z 16285 161725 L5211M0855 0
1 Like

Hi @scott, I haven’t tested your answer yet because I ran into a problem with the elapsed()-function before I could try it:

The function returns the time for every row that has passed since the previous row. This is not what I would like to do. I would like to have: for every row the time that passes until the NEXT entry (not the previous one). How would I do that?

EDIT: @scott your solution works great! I discovered a fairly simple problem I can’t figure out for some reason. I use my data to calculate the total duration of FAULT_LASER and to count how often the laser was on fault. I simply would like to divide these two number but can’t get it to work. What am I doing wrong?

data = from(bucket: "plcview_4/autogen")
  |> range(start: 2021-01-29T00:00:00.000Z, stop: now())
  |> filter(fn: (r) => r._measurement == "FAULT_LASER")

events = data
  |> map(fn: (r) => ({ r with event: if r._value == 1 then "start" else "stop", index: 1 }))
  |> group(columns: ["event"])

startEvents = events |> filter(fn: (r) => r.event == "start") |> cumulativeSum(columns: ["index"])
stopEvents = events |> filter(fn: (r) => r.event == "stop") |> cumulativeSum(columns: ["index"])

faultEvents = join(tables: {start: startEvents, stop: stopEvents}, 
	 on: ["index", "equipmentNumber", "_measurement", "_field", "workplace"])
  |> drop(columns: ["event_start", "event_stop", "index", "_start_start", "_start_stop", "_stop_start", "_stop_stop"])
  |> rename(columns: {_time_start: "start", _time_stop: "stop"})
  
// Calculate duration for every fault 
faultDurations = faultEvents
  |> map(fn: (r) => ({ r with start: uint(v: r.start), stop: uint(v: r.stop) }))
  |> map(fn: (r) => ({ r with duration: (r.stop - r.start)/uint(v: 1000000) }))
  |> map(fn: (r) => ({ r with start: time(v: r.start), stop: time(v: r.stop) }))

// Get total fault time
totalFaultTime = faultDurations
  |> sum(column: "duration")

// Get number of faults
nFaults = faultDurations
  |> count(column: "_field")


mttr = totalFaultTime / nFaults

@strittmm to get proper elapsed values, I use events.duration().

There are a couple of different ways to do what you’re trying to do. One way is to extract totalFaultTime and nFaults as scalar values and operate on them as scalar values, but I found as I was testing this method in the InfluxDB Data Explorer, it would result in an error. I believe it has something to do with multiple calls to findColumn() in the UI, but I’m not sure.

Another method is to use reduce() to create a custom aggregate function that calculates both the fault count and total fault duration in a single pass:

events = data
  |> map(fn: (r) => ({ r with event: if r._value == 1 then "start" else "stop", index: 1 }))
  |> group(columns: ["event"])

startEvents = events |> filter(fn: (r) => r.event == "start") |> cumulativeSum(columns: ["index"])
stopEvents = events |> filter(fn: (r) => r.event == "stop") |> cumulativeSum(columns: ["index"])

faultEvents = join(tables: {start: startEvents, stop: stopEvents}, 
	 on: ["index", "equipmentNumber", "_measurement", "_field", "workplace"])
  |> drop(columns: ["event_start", "event_stop", "index", "_start_start", "_start_stop", "_stop_start", "_stop_stop", "_value_start", "_value_stop"])
  |> rename(columns: {_time_start: "start", _time_stop: "stop"})

// Calculate duration for every fault 
faultDurations = faultEvents
  |> map(fn: (r) => ({ r with duration: (int(v: r.stop) - int(v: r.start))/1000000 }))

mttr = faultDurations
  |> reduce(
    identity: {totalFaultTime: 0, nFaults: 0},
    fn: (r, accumulator) => ({
        totalFaultTime: r.duration + accumulator.totalFaultTime,
        nFaults: accumulator.nFaults + 1
    })
  )
  |> map(fn: (r) => ({ _value: r.totalFaultTime / r.nFault }))

mttr
1 Like

@scott is it possible to use a variable within the reduce()-function that I created outside of the function? For example identity: { totalFaultTime: 0, nFaults: nWorkplaces }, with nWorkplaces being the number of distinct values for the tag workplace that I calculate at some point before.

I believe you could do that, yes.

I tried that but can’t get it to work. What do I need to do to just store a single number in a variable? All the results I get after applying functions like count() and distinct() is a what looks to be a table with just one column and one row (with my value). I have can’t get that value by doing something like nWorkplaces[0].

What query do you use to get nWorkspaces?

I use:

nWorkplaces = from(bucket: "plc view_4/autogen")
  |> filter(fn: (r) => r._measurement == "FAULT_LASER")
  |> group()
  |> distinct(column: "workplace")
  |> count()

when I try to use |> findRecord()or |> findColumn() I can’t seem to find them in the script editor (v1.8.4). I only can use |> getRecord() or |> getColumn() but they give me an error…

Oh, yeah, I don’t thing findColumn() and findRecord() are supported in 1.8.4. I think a fairly old version of Flux is packaged with 1.8.4. In that case, you’d have to do:

nWorkplaces = (from(bucket: "plc view_4/autogen")
  |> filter(fn: (r) => r._measurement == "FAULT_LASER")
  |> group()
  |> distinct(column: "workplace")
  |> count()
  |> tableFind(fn: (key) => true)
  |> getColumn(column: "_value")
)[0]

Ok thanks, I’ll give it a shot.

@scott I tried it and get error: type error: cannot unify array with semantic.array
What does that mean?

@strittmm I realized that you probably need a range() call in your query. I think this is caused by tableFind() returning no results and not handling the error (this is why findRecord() and findColumn() were introduced in newer versions of Flux. Are you certain count() is returning results.

@scott the range() wasn’t the problem. There was some kind of problem with the way I tried to access the first value within the column I extracted. This worked for me:

nWorkplaces = from(bucket: "plcview_4/autogen")
  |> range(start: 2021-01-29T00:00:00.000Z, stop: now())
  |> filter(fn: (r) => r._measurement == "FAULT_LASER")
  |> group()
  |> distinct(column: "workplace")
  |> count()
  |> tableFind(fn: (key) => true)
  |> getColumn(column: "_value")

and then later I could use this colum and extract the value in the calculation:

mtbf = noFaultDurations
  |> reduce(
  	fn: (r, accumulator) => ({ 
    	totalNoFaultTime: r.duration + accumulator.totalNoFaultTime,
        nFaults: accumulator.nFaults + 1
    }),
    identity: { totalNoFaultTime: 0, nFaults: nWorkplaces[0] }
  )
  |> map(fn: (r) => ({ _value: r.totalNoFaultTime / r.nFaults }))
  |> yield(name: "mtbf")

@scott Do you know if it is possible to translate all of what I did here into TICKscript? I am trying to automate the calculation and have it calculate the script every day and store the result into another database with a different retention policy. Since I am using v1.8.4 I can’t use Flux tasks…

I created a separate topic for this issue: Translate Flux into TICK script

My TICKscript is a little rusty, so I may not be the best person to help translate this. @Emrys_Landivar, thoughts on doing this operation :point_up_2: in TICKscript?