Avoid multiple notifications of same state

Hi Team,
I must implement a notification system to alert users when a state is maintained for “x time”.
I think the correct function to use is

stateDuration

.
But how can avoid multiple notifications of the same state?
At every iteration I check a period, but I cannot know if the “new” state has already been notified during the previous check.

first period

ACTIVE
ERROR
ERROR
ERROR
ERROR

I notify ERROR

second period

ERROR
ERROR
ERROR
ERROR

I don’t have to notify again ERROR

I hope I’ve been sufficiently clear :frowning:
Thanks in advance for your help.
Benjamin

Hello @bjbezzi,
Does this solution help?

Hello @Anaisdg ,
thank you very much for the suggestion, very interesting.
Inspired by the article, I wrote the following code (I have to notify only when the device is in FAIL status for more than x seconds and avoid multiple notifications).

import "contrib/tomhollingworth/events"
import "array"

previous_data = from(bucket: "device_data")
  |> range(start:0)
  |> filter(fn:(r)=> r._measurement== "previous_device_status")
  |> filter(fn: (r) => r["_field"] == "status")
  |> filter(fn: (r) => r["device_id"] == "dev0501") 
  |> findColumn(fn: (key) => true, column: "_value")

dataL = length(arr: previous_data)

array.from(rows: [{vv: dataL}])
  
current_data =
from(bucket: "device_data")
  |> range(start: 0)
  |> filter(fn: (r) => r["_measurement"] == "device_status")
  |> filter(fn: (r) => r["_field"] == "status")
  |> filter(fn: (r) => r["device_id"] == "dev001")
  |> stateDuration(fn: (r) => r._value == "FAIL", unit: 1s)
  |> last()
  |> filter(fn: (r)=> r._value == "FAIL")
  |> filter(fn: (r)=> r.stateDuration > 8)
  |> yield(name: "current_data")
  |> set(key: "_measurement", value: "previous_device_status")
  |> to(bucket : "device_data") 
  |> yield(name: "saved_data")

  data_to_send = current_data
  |> filter(fn: (r)=> dataL == 0)
  |> yield(name: "datatosend")

I know that’s a very scholastic implementation :face_with_hand_over_mouth:, but is the first complex script that i wrote.
Have you any suggestion?
Thanks in advance,
Benjamin

Hello @bjbezzi,
I’m confused by

data_to_send = current_data
|> filter(fn: (r)=> dataL == 0)
|> yield(name: “datatosend”)

What are you trying to do here? your current_data doesn’t have any dataL in it so you can’t filter that out of your current_data.

Hi @Anaisdg,
thanks for the quick reply.
This, in fact, is the part about which I had the greatest doubts.
To avoid reporting the same status, I verify that I did not do it during the previous check (length of previous_data would be equal to zero).
If I had already sent the notification, previous_data would have length greater than zero.
Do not consider ranges, they are for testing purposes only.
I hope I have clarified enough :slightly_smiling_face:

Benjamin

Hi @Anaisdg,
do you have any suggestions for my flux script?
What is the best strategy for verifying that I have already sent a notification for a particular state?
Thank you very much.

I apologize for the insistence.
Benjamin

Hello @bjbezzi,
I’m sorry for the delay.
Unfortunately StateDuration and StateCount don’t pick up from where they left off. I’m not sure how to accomplish this elegantly.
Here is one of coworkers solution for getting the StateCount across task executions. It calculates the Nth time a level == “crit” consecutively across state changes or a “true” Nth time state is reached. It does this by joining the statecount from the last run with the new run and filtering for the last time the statecount was reset. I don’t know if there’s something from this that can be repurposed.

import "influxdata/influxdb/tasks"
import "array"
import "join"
import "date"

today = date.truncate(t: now(), unit: 1d)
cutoff = tasks.lastSuccess(orTime: -6m)

// This will fetch the last status-count for each table, regardless of status
fetch =
    from(bucket: "status-count")
        |> range(start: -12m)
        |> filter(fn: (r) => r["_measurement"] == "statuses_counter")
        |> filter(fn: (r) => r["_check_id"] == "09952d758d3d6000")
        |> filter(fn: (r) => r["_field"] == "_message")
        |> group(
            columns: [
//                 "_level",
                "_type",
                "device_id",
                "topic",
            ],
        )
        |> sort(columns: ["_time"])
        |> last()
        |> keep(columns: ["crit_query_counter", "device_id", "_level"])
        |> group()

// A baseline record of count "0" ensures there is a table to pass into join (otherwise it fails)
sampleData = array.from(rows: [{_level: "xxxx", crit_query_counter: "0", device_id: "xxxx"}])
right = union(tables: [sampleData, fetch])

// count the new statuses since last run
left =
    from(bucket: "_monitoring")
        |> range(start: cutoff)
        |> filter(fn: (r) => r["_measurement"] == "statuses")
        |> filter(fn: (r) => r["_check_id"] == "09952d758d3d6000")
        |> filter(fn: (r) => r["_field"] == "_message")
        |> keep(
            columns: [
                "_measurement",
                "_field",
                "_value",
                "_check_id",
                "_check_name",
                "_level",
                "_source_measurement",
                "_start",
                "_stop",
                "_time",
                "_type",
                "device_id",
                "topic",
            ],
        )
        |> group()
        |> group(
            columns: [
                "_type",
                "device_id",
                "topic",
            ],
        )
        |> sort(columns: ["_time"])
        |> stateCount(fn: (r) => r._level == "crit", column: "crit_query_counter")
        |> group()

// join the tables on device_id and _level, if there's no count in the right table, right record is null
merge =
    join.left(
        left: left,
        right: right,
        on: (l, r) => l.device_id == r.device_id and l._level == r._level,
        as: (l, r) =>
            ({
                _measurement: l._measurement,
                _field: l._field,
                _value: l._value,
                _check_id: l._check_id,
                _check_name: l._check_name,
                _level: l._level,
                _source_measurement: l._source_measurement,
                _start: l._start,
                _stop: l._stop,
                _time: l._time,
                _type: l._type,
                crit_query_counter_new: l.crit_query_counter,
                crit_query_counter_prev: int(v: r.crit_query_counter),
                device_id: l.device_id,
                topic: l.topic,
            }),
    )
        |> fill(column: "crit_query_counter_prev", value: 0)
        |> group(
            columns: [
                "_type",
                "device_id",
                "topic",
            ],
        )
        |> sort(columns: ["_time"])

// fetch the record where the stateCount resets (i.e. not CRIT, therefore counter ==-1)
right05 =
    merge
        |> filter(fn: (r) => r["crit_query_counter_new"] == -1)
        |> sort(columns: ["_time"])
        |> first()
        |> keep(columns: ["_check_id", "device_id", "_time"])
        |> group()
// Again, add in dummy row so join doesn't break
sampleData2 = array.from(rows: [{_check_id: "xxxx", device_id: "xxxx", _time: today}])
right2 = union(tables: [sampleData2, right05])

left2 =
    merge
        |> group()

// Join tables, adding new column with the _time where the stateCount is reset (we don't want to add the rolling stateCount beyond this)
join.left(
    left: left2,
    right: right2,
    on: (l, r) => l.device_id == r.device_id and l._check_id == r._check_id,
    as: (l, r) =>
        ({
            _measurement: l._measurement,
            _field: l._field,
            _value: l._value,
            _check_id: l._check_id,
            _check_name: l._check_name,
            _level: l._level,
            _source_measurement: l._source_measurement,
            _start: l._start,
            _stop: l._stop,
            _time: l._time,
            _type: l._type,
            crit_query_counter_new: l.crit_query_counter_new,
            crit_query_counter_prev: l.crit_query_counter_prev,
            break_date: r._time, // New column
            device_id: l.device_id,
            topic: l.topic,
        }),
)
    |> group(
        columns: [
            "_type",
            "device_id",
            "topic",
        ],
    )
    // For all records that exist greater than the stateCount reset, DO NOT add the rolling stateCount
    |> map(
        fn: (r) =>
            ({r with crit_query_counter:
                    if r._time >= r.break_date then
                        string(v: r.crit_query_counter_new)
                    else
                        string(v: r.crit_query_counter_new + r.crit_query_counter_prev),
            }),
    )
    |> drop(columns: ["crit_query_counter_new", "crit_query_counter_prev", "break_date"])
    |> set(key: "_measurement", value: "statuses_counter")
    |> to(bucket: "status-count")

The following Flux query notifies on the Nth time level == “crit” within 1 day (or specified time period):

import "date"
import "influxdata/influxdb/tasks"

today = date.truncate(t: now(), unit: 1d)
cutoff = tasks.lastSuccess(orTime: -1m) // runs every minute

from(bucket: "_monitoring")
    |> range(start: today)
    |> filter(fn: (r) => r["_measurement"] == "statuses")
    |> filter(fn: (r) => r["_check_name"] == "Query Rate Limit Check")
    |> filter(fn: (r) => r["_field"] == "_message")
    |> filter(fn: (r) => r["_level"] == "crit" and exists r._value)
    |> stateCount(fn: (r) => r._level == "crit", column: "crit_query_counter")
    |> filter(fn: (r) => r.crit_query_counter % 10 == 0 and r._time >= cutoff)
    |> map(fn:(r) => ({ r with crit_query_counter: string(v: r.crit_query_counter)}))
    |> map(fn:(r) => ({r with _measurement: "statuses_counter"}))
    |> to(bucket: "_monitoring")

Perhaps this last Flux script can be amended to fit your needs. I realize you’re asking for stateDuration not stateCount but I think the challenges overlap because you can’t specify a column for stateCount or stateDurationi to pick up where it left off.