Specific filtering of data: removing chronological duplicates

Hey everyone I have a question.
Say I have some data along the lines of:

2021-11-16T14:45:04.959918682Z,start
2021-11-16T14:45:22.888960013Z,stop
2021-11-17T09:17:34.493371966Z,start
2021-11-17T09:18:41.713114886Z,stop
2021-11-17T11:06:31.444954418Z,stop

Where name is the name of the field and is either a start or stop event. But I want to remove duplicate events. So in the end I would only like the start and stop events to be interlaced [start, stop, start, ...]. And not multiple of the same events after each other. This can be seen in the example data as the last two stop’s at the end. I tried something along the lines of:

import "dict"

l = ["name": "None"]

dedup = (lastValue, newName) => {
  result = dict.get(dict: lastValue, key: "name", default: "None") != newName
  lastValue = dict.insert(dict: lastValue, key: "name", value: newName)
  return result  
}

from(bucket: "sr-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "events" and r._field == "name")
  |> filter(fn: (r) => dedup(lastValue: l, newName: r._value))

But this is kind of mutable imperative code, as we are not allowed to modify lastValue. Which I understand, but to do this we need some kind of look-ahead or backtracking. As in: have we seen this a stop event before the current one that we are processing, any idea how to do this with flux?

The use-case is that I want to calculate the time between a start and a stop event. So either with a duration or the contrib events.duration. But because the program can crash for example, there can be a start without a corresponding stop.

Thanks! :slight_smile: (edited)

Someone in the slack channel recommended the function monitor.stateChangesOnly. I’ve tried this but it does not give any results. Renaming to _level works, but it doesn’t to be in the correct form somehow.

from(bucket: "sr-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "events" and r._field == "name")
  |> rename(columns: {_value: "_level"})
  |> monitor.stateChangesOnly()

I think I got it to work, by looking at the source for stateChangesOnly

This is what I ended up with:

import "experimental"

stateChangesOnly2 = (tables=<-) => {
    return tables
        |> map(
            fn: (r) => ({r with
                level_value: if r._level == "start" then
                    3
                else if r._level == "stop" then
                    4
                else
                    0,
            }),
        )
        |> duplicate(column: "_level", as: "____temp_level____")
        |> drop(columns: ["_level"])
        |> rename(columns: {"____temp_level____": "_level"})
        |> sort(columns: ["_time"], desc: false)
        |> difference(columns: ["level_value"], keepFirst: true)
        |> filter(fn: (r) => r.level_value != 0 or not exists r.level_value)
        |> drop(columns: ["level_value"])
}

from(bucket: "sr-data")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "events" and r._field == "name")
  |> rename(columns: {_value: "_level"})
  |> keep(columns: ["_level", "_time"])
  |> stateChangesOnly2()

The main gist is converting it to a numeric value, so that you can get a difference between the values and filter out the zero’s as these correspond to identical events, and can subsequently be filtered out. I also want to keep the first event (something that the original does not seem to do?), so that’s why I kept it.