Task sending alerts randomly

I have a flux task that monitors windows devices and should send alerts according to configured thresholds.In my case the thresholds are if r._value > 92.0 then "crit" else if r._value > 80.0 then "warn" else "ok",
However, I keep getting wrong alerts. I get critical alert at 0%, normal at 62% or warn at 3%. And later on for example I get critical at 56% and normal at 0%. I can’t find any pattern in the alerts and can’t figure out what the problem is. I have more tasks like this one, just with different measurements, and everyone besides this one works fine. I’ll put the whole code at the bottom

import "http"
import "influxdata/influxdb/schema"
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
import "influxdata/influxdb/secrets"
import "math"
import "date"
import "json"
import "internal/debug"

option task = {name: "", every: 5m, timeout: 1m}

msg_grp = ""

_check_name = ""

FLUCTUATION_CAP = 4

ALERTNAME = "win_cpu"

MEASUREMENT = "win_cpu"

FIELD = "Percent_Processor_Time"


endpoint =
    http.endpoint(url: "")(
        mapFn: (r) => {
            body = {r with _version: 1}
            alert_msg = "Percent processor time on ${r.host} is  ${math.round(
                    x: float(v: r.Percent_Processor_Time) * 100.0,
                ) / 100.0}% Full. Level is ${r._level}"

            //bytes(v: "The value is ${r._value}")
            return {headers: headers, data: json["encode"](v: {r with _message: alert_msg})}
        },
    )

data =
    from(bucket: "bucket")
        |> range(start: -5m)
        |> filter(fn: (r) => r["_measurement"] == MEASUREMENT)
        |> filter(fn: (r) => r["_field"] == FIELD)
        |> toInt()
        //   |> last()
        |> map(
            fn: (r) =>
                ({r with _level:
                        if r._value > 92.0 then
                            "crit"
                        else if r._value > 80.0 then
                            "warn"
                        else
                            "ok",
                }),
        )
        |> filter(fn: (r) => r._level == "ok" or r._level == "warn" or r._level == "crit")

lastAlertData =
    from(bucket: "bucketalerts")
        |> range(start: -7d)
        |> filter(fn: (r) => r["_measurement"] == MEASUREMENT)
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])
        |> last()
        |> keep(columns: ["_value", "host"])
        |> rename(columns: {_value: "lastReportedLevel"})

levelInfoData =
    data
        |> filter(fn: (r) => r["_field"] == "Percent_Processor_Time")
        |> keep(columns: ["_level", "_time", "host"])

pivotedData =
    data
        |> schema.fieldsAsCols()
        |> filter(fn: (r) => exists r.Percent_Processor_Time)

unionedData =
    union(tables: [levelInfoData, pivotedData])
        |> group(columns: ["_time", "host"], mode: "by")
        |> sort(columns: ["host"])
        |> fill(column: "_level", usePrevious: true)
        |> tail(n: 1)
        |> group()
        |> sort(columns: ["host", "_time"])
        |> map(fn: (r) => ({r with msg_grp: msg_grp}))
        |> map(fn: (r) => ({r with _check_name: _check_name}))

statusChanges =
    unionedData
        |> group(columns: ["host"], mode: "by")
        |> monitor.stateChangesOnly()
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])

countChanges =
    statusChanges
        |> count(column: "_level")
        |> rename(columns: {_level: "changesCount"})

keepLowCountAndChange =
    union(tables: [statusChanges, countChanges, lastAlertData])
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])
        |> fill(column: "changesCount", usePrevious: true)
        |> fill(column: "lastReportedLevel", usePrevious: true)
        |> tail(n: 1)
        |> filter(fn: (r) => exists r.Percent_Processor_Time)
        |> map(
            fn: (r) =>
                ({r with lastReportedLevelTemp:
                        if exists r.lastReportedLevel then r.lastReportedLevel else "none",
                }),
        )
        |> drop(columns: ["lastReportedLevel"])
        |> rename(columns: {lastReportedLevelTemp: "lastReportedLevel"})
        |> filter(fn: (r) => r["_level"] != r["lastReportedLevel"])
        |> filter(fn: (r) => r["changesCount"] < FLUCTUATION_CAP)
        // If goes from warn to ok but used_percent more than 75%, exclude
        |> filter(
            fn: (r) =>
                not (r["lastReportedLevel"] == "warn" and r["_level"] == "ok"
                    and
                    r["Percent_Processor_Time"] >= 75),
        )
        // If goes from ok to warn but used_percent less than 85%, exclude
        |> filter(
            fn: (r) =>
                not (r["lastReportedLevel"] == "ok" and r["_level"] == "warn"
                    and
                    r["Percent_Processor_Time"] <= 85),
        )
        // If goes from crit to warn but used_percent more than 89%, exclude
        |> filter(
            fn: (r) =>
                not (r["lastReportedLevel"] == "crit" and r["_level"] == "warn"
                    and
                    r["Percent_Processor_Time"] >= 89),
        )
        // If goes from warn to crit but used_percent less than 92%, exclude
        |> filter(
            fn: (r) =>
                not (r["lastReportedLevel"] == "warn" and r["_level"] == "crit"
                    and
                    r["Percent_Processor_Time"] <= 92),
        )

keepLowCountAndChange
    |> endpoint()
    //|> filter(fn: (r) => r["lastReportedLevel"] != "none")
    |> set(key: "_measurement", value: MEASUREMENT)
    |> to(org: "org", bucket: "bucketalerts", fieldFn: (r) => ({"_level": r._level}))

Hello @Zcarevic,
We’re talking on slack. Lets continue the conversation there. But I would love if you could come back here and add your answer once we figure it out :slight_smile: