Flux operation that returns a bool whether every eache and every line respect the delegate function

Here’s a bit more insight on what I am trying to accomplish. I am creating threshold checks for various metrics. As of right now this is what I use :

data =
from(bucket: “telemetry”)
|> range(start: -30m)
|> filter(fn: (r) => r[“_measurement”] == “bacnet”)
|> filter(fn: (r) => r[“_field”] == “value”)
|> filter(fn: (r) => r[“deviceInstance”] == “213003”)
|> filter(fn: (r) => r[“network”] == “0”)
|> filter(fn: (r) => r[“objectId”] == “OBJECT_ANALOG_VALUE:0”)
|> last()

option task = {name: “2345”, every: 5s}

check = {_check_id: “0b18ff8b54569000”, _check_name: “My Check”, _type: “threshold”, tags: {}}
crit = (r) => r[“value”] > 2.0
ok = (r) => r[“value”] < 2.0
messageFn = (r) => “Check: ${ r._check_name } is: ${ r._level }”

data
|> v1"fieldsAsCols"
|> monitor[“check”](data: check, messageFn: messageFn, crit: crit, ok: ok)

It works fine for most use cases… If some metric is over 2.0 (in that example) that check is we get an alarm and every one is happy.

But a use case that this does not cover is delayed alarm. What I mean by that is I want the check to become critical only if the threshold has been consistently breached for more than 5 minutes (or more)

My idea was to modify it this way :

data =
from(bucket: “telemetry”)
|> range(start: -5m)
|> filter(fn: (r) => r[“_measurement”] == “bacnet”)
|> filter(fn: (r) => r[“_field”] == “value”)
|> filter(fn: (r) => r[“deviceInstance”] == “213003”)
|> filter(fn: (r) => r[“network”] == “0”)
|> filter(fn: (r) => r[“objectId”] == “OBJECT_ANALOG_VALUE:0”)

option task = {name: “2345”, every: 5s}

check = {_check_id: “0b18ff8b54569000”, _check_name: “My Check”, _type: “threshold”, tags: {}}
// LINQ style representation of what I try to accomplish
crit = (r) => r.all( (r) => r.[“value”] > 2.0 // True if all of the queried data exceeds 2.0
ok = (r) => r.any( (r) => r.[“value”] < 2.0 // True if any of the queried data is lower than 2.0
messageFn = (r) => “Check: ${ r._check_name } is: ${ r._level }”

data
|> v1"fieldsAsCols"
|> monitor[“check”](data: check, messageFn: messageFn, crit: crit, ok: ok)

But alas, the world seems to be devoid of any and all that could solve this quandary. The great lords of Influx, with their magical powers, could perhaps provide a solution.

Thanks!

I am just leaving the office, but I have done something like you are trying to accomplish.

there is a hint on how to do it, tag me if you get stuck!.

stateDuration() function | Flux 0.x Documentation (influxdata.com)

bye :wink:

@fercasjr Thanks I think that will do the trick!

This is what I came up with:

import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"

data =
    from(bucket: "telemetry")
        |> range(start: -30m)
        |> filter(fn: (r) => r["_measurement"] == "bacnet")
        |> filter(fn: (r) => r["_field"] == "value")
        |> filter(fn: (r) => r["deviceInstance"] == "213005")
        |> filter(fn: (r) => r["network"] == "0")
        |> filter(fn: (r) => r["objectId"] == "OBJECT_BINARY_OUTPUT:4")
        |> stateDuration(fn: (r) => r["_value"] > 0.0)
        |> last()

option task = {name: "bo4", every: 5s}

check = {_check_id: "0b2d715921fe5000", _check_name: "bo4", _type: "threshold", tags: {}}
crit = (r) => r.stateDuration >= 30
ok = (r) => r.stateDuration < 30
messageFn = (r) => "Check: ${r._check_name} is: ${r._level}"

data
    |> v1["fieldsAsCols"]()
    |> monitor["check"](data: check, messageFn: messageFn, crit: crit, ok: ok)

There’s no error syntax but the script fails with error message :

error @3:19-3:20: record is missing label timeRangeStart error @3:43-3:44: record is missing label timeRangeStop

image

Do you have any idea what causes this?

Thanks a lot!

Also is there a way to put printing traces for debugging?

I could get it the work!

The issue was that |> v1"fieldsAsCols" function removes all other columns and for some reasons that I do not fully understand removing that function also ends up in execution failure so the solution was :

import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"

data =
    from(bucket: "telemetry")
        |> range(start: -30m)
        |> filter(fn: (r) => r["_measurement"] == "bacnet")
        |> filter(fn: (r) => r["_field"] == "value")
        |> filter(fn: (r) => r["deviceInstance"] == "213005")
        |> filter(fn: (r) => r["network"] == "0")
        |> filter(fn: (r) => r["objectId"] == "OBJECT_BINARY_OUTPUT:4")
        |> aggregateWindow(every: 10s, fn: last, createEmpty: true)
        |> fill(usePrevious: true)
        |> stateDuration(fn: (r) => r["_value"] > 0.0, unit: 1s)
        |> last()
        |> map(fn: (r) => ({r with _value: float(v: r.stateDuration)}))
        |> v1["fieldsAsCols"]()

option task = {name: "bo4", every: 5s}

check = {_check_id: "0b2d715921fe5000", _check_name: "bo4", _type: "threshold", tags: {}}
crit = (r) => r.value >= 30
ok = (r) => r.value < 30
messageFn = (r) => "Check: ${r._check_name} is: ${r._level}. Breached for ${r.value}"

data |> monitor["check"](data: check, messageFn: messageFn, crit: crit)

Note that r.stateDuration MUST be converted to float or it also results in an execution error.

That was quite a pain to debug having no logs pointing me to what was the exact issue. Is there something I am missing? A debug tool or something?

Thanks!

I don’t know about fieldsAsCols , but I use pivot() and that one does not remove any other columns. however, sometimes you need to pay attention to group your data.

that sounds weird to me, maybe its related on how fieldAsCols works, maybe it kind of needs to be the same data type in order to “merge” the 2 fields on the same table?

I have the same question, that would be helpful. :thinking: