Merging two tables keeps failing

Hi, I’m making a flux task that sends alerts only if lastReportedLevel and level are both WARN. The problem is that once an alert sends the same hosts keep sending alerts until the level is back down at OK, and I need to send just one alert.
To solve this I added a counter that counts the number of WARN levels in lastReportedLevel and level and stops sending alerts if it is above 1.
I’m having issues with merging that table with the final table. I keep getting two different tables everytime.
I’ll post my code below, please give me a hand.

import "http"
import "influxdata/influxdb/schema"
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
import "influxdata/influxdb/secrets"
import "math"
import "date"
import "json"
import "internal/debug"

option task = {name: "", every: 10m}

msg_grp = "WIN"

_check_name = "a1hr_task_win_cpu"

FLUCTUATION_CAP = 8

ALERTNAME = "win_cpu"

MEASUREMENT = "win_cpu"

FIELD = "Percent_Processor_Time"

headers = {

}

endpoint =
    http.endpoint(url: "")(
        mapFn: (r) => {
            body = {r with _version: 1}
            alert_msg = "Percent processor time on ${r.host} is  ${math.round(
                    x: float(v: r.Percent_Processor_Time) * 100.0,
                ) / 100.0}% Full. Level is ${r._level}"

            //bytes(v: "The value is ${r._value}")
            return {headers: headers, data: json["encode"](v: {r with _message: alert_msg})}
        },
    )

data =
    from(bucket: "a1hrinfrawin/autogen")
        |> range(start: -10m)
        |> filter(fn: (r) => r["_measurement"] == MEASUREMENT)
        |> filter(fn: (r) => r["_field"] == FIELD)
        |> toInt()
        //   |> last()
        |> map(
            fn: (r) =>
                ({r with _level:
                        if r._value < 90.0 then
                            "warn"
                        else
                            "ok",
                }),
        )
        |> filter(fn: (r) => r._level == "ok" or r._level == "warn")

lastAlertData =
    from(bucket: "a1hrwinalerts")
        |> range(start: -7d)
        |> filter(fn: (r) => r["_measurement"] == MEASUREMENT)
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])
        |> last()
        |> keep(columns: ["_value", "host"])
        |> rename(columns: {_value: "lastReportedLevel"})

levelInfoData =
    data
        |> filter(fn: (r) => r["_field"] == "Percent_Processor_Time")
        |> keep(columns: ["_level", "_time", "host"])

pivotedData =
    data
        |> schema.fieldsAsCols()
        |> filter(fn: (r) => exists r.Percent_Processor_Time)

unionedData =
    union(tables: [levelInfoData, pivotedData])
        |> group(columns: ["_time", "host"], mode: "by")
        |> sort(columns: ["host"])
        |> fill(column: "_level", usePrevious: true)
        |> tail(n: 1)
        |> group()
        |> sort(columns: ["host", "_time"])
        |> map(fn: (r) => ({r with msg_grp: msg_grp}))
        |> map(fn: (r) => ({r with _check_name: _check_name}))

statusChanges =
    unionedData
        |> group(columns: ["host"], mode: "by")
        |> monitor.stateChangesOnly()
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])

countChanges =
    statusChanges
        |> count(column: "_level")
        |> rename(columns: {_level: "changesCount"})

warn_level =
    from(bucket: "a1hrwinalerts")
        |> range(start: -7d)
        |> filter(fn: (r) => r["_measurement"] == "win_cpu")
        |> stateCount(
            fn: (r) => r._value == "warn" and r.lastReportedLevel == "warn",
            column: "warn_level",
        )
        |> group()
        |> last()

keepLowCountAndChange =
    union(tables: [statusChanges, countChanges, lastAlertData])
        |> group(columns: ["host"], mode: "by")
        |> sort(columns: ["_time"])
        |> fill(column: "changesCount", usePrevious: true)
        |> fill(column: "lastReportedLevel", usePrevious: true)
        |> tail(n: 1)
        |> filter(fn: (r) => exists r.Percent_Processor_Time)
        |> map(
            fn: (r) =>
                ({r with lastReportedLevelTemp:
                        if exists r.lastReportedLevel then r.lastReportedLevel else "none",
                }),
        )
        |> drop(columns: ["lastReportedLevel"])
        |> rename(columns: {lastReportedLevelTemp: "lastReportedLevel"})
        //|> filter(fn: (r) => r["_level"] != r["lastReportedLevel"])
        |> filter(fn: (r) => r["changesCount"] < FLUCTUATION_CAP)

keepLowCountAndChange
    |> map(
        fn: (r) =>
            ({r with _level:
                    if r.Percent_Processor_Time > 90.0 then
                        "warn"
                    else
                        "ok",
            }),
    )
    |> set(key: "_measurement", value: MEASUREMENT)
    |> to(org: "", bucket: "a1hrwinalerts", fieldFn: (r) => ({"_level": r._level}))
    |> filter(fn: (r) => r["environment"] == "production")
    |> filter(
        fn: (r) =>
            r["_level"] == "warn" and r["lastReportedLevel"] == "warn" or r["_level"] == "ok",
    )
    |> filter(fn: (r) => not r["_level"] == "ok" and r["lastReportedLevel"] == "ok")
    |> filter(fn: (r) => r["warn_level"] < 2)
    |> endpoint()

So I need to merge warn_level with keepLowCountAndChange so that I get a warn_level column in keepLowCountAndChange

Hello @Zcarevic,
I believe I messaged you in the community slack. Lets keep our conversation going there :slight_smile: