Influx DB monitoring system development error

Hi,

I am working with influx monitoring system and i an super excited.

I tried to create check, endpoint, rule using influx api service. I was facing some problem on specific condition and got reply on it from your team.

package main
//CPU Notification Rule 
import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
option task = {name: "CPU Notification Rule ", 
               every: 10m, 
               offset: 0s}
slack_endpoint = slack["endpoint"](url: "https:hooks.slack.com/services/xxx/xxx/xxx")
notification = {_notification_rule_id: "0758afea09061000",
                _notification_rule_name: "CPU Notification Rule ",
                _notification_endpoint_id: "0754bad181a5b000",
                _notification_endpoint_name: "My slack",}
statuses = monitor["from"](start: -10s, fn: (r) = r["_check_name"] == "CPU Check")
crit = statuses 
       |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit 
               |> filter(fn: (r) => (r["_time"] >= experimental["subDuration"](from: now(), d: 10m)))

count_statuses =  all_statuses |> count() |> findRecord(fn: (key) => true), idx: 0)


notify = (tables<-) => { 
final_status = tables 
|> monitor["notify"](data: notification, 
                     endpoint: slack_endpoint(mapFn: (r) = (
{channel: "", 
 text: "Notification Rule: ${ r._notification_rule_name } triggered by     check: ${ r._check_name }: ${ r._message }", 
 color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))

condition_met = if count_statuses._value > 3 then final_status else 0 

return condition_met
}

all_statuses
|> notify()

I tried this to create a check but getting some syntax problem.

I am confused that - solution providing rule endpoint id but also specifying slack endpoint. How both will work.

I am trying to do -

Want to get notification on blood pressure(influx col - bp) cross 113, 3times since last 15min.

So what is the exact procedure or steps. Should i create a task like that or create check, rule, endpoint separately.

I don’t know how to start.
Please help me out.

Thanks

Hello @debnath,
There’s just an = missing: notify = (tables<-) =>

package main
//CPU Notification Rule 
import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
option task = {name: "CPU Notification Rule ", 
               every: 10m, 
               offset: 0s}
slack_endpoint = slack["endpoint"](url: "https:hooks.slack.com/services/xxx/xxx/xxx")
notification = {_notification_rule_id: "0758afea09061000",
                _notification_rule_name: "CPU Notification Rule ",
                _notification_endpoint_id: "0754bad181a5b000",
                _notification_endpoint_name: "My slack",}
statuses = monitor["from"](start: -10s, fn: (r) = r["_check_name"] == "CPU Check")
crit = statuses 
       |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit 
               |> filter(fn: (r) => (r["_time"] >= experimental["subDuration"](from: now(), d: 10m)))

count_statuses =  all_statuses |> count() |> findRecord(fn: (key) => true), idx: 0)


notify = (tables=<-) => { 
final_status = tables 
|> monitor["notify"](data: notification, 
                     endpoint: slack_endpoint(mapFn: (r) = (
{channel: "", 
 text: "Notification Rule: ${ r._notification_rule_name } triggered by     check: ${ r._check_name }: ${ r._message }", 
 color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))

condition_met = if count_statuses._value > 3 then final_status else 0 

return condition_met
}

all_statuses
|> notify()

All checks and notification rules are tasks under the hood. Notification endpoints id and name are used by the monitor.notify function to create metadata about your task execution so that you can more easily debug your task later. The endpoint id and notification rule ids are any alphanumeric 16 character string that you want to pick.
I recommend reading:

to learn more

@debnath actually hold please. Still working something out.

@Anaisdg Thank you for your reply. The article is very useful.
But your solution that you mentioned has still some error. I put it in task but there is still some red lines. Please have a look.

Hello @debnath,
Sorry for the delay, my brain was refusing to cooperate yesterday…anywho you could do:

  1. a check and notification combined into one task.
import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
import "slack"
import "experimental"
import "array"

option task = {
    name: "check_cpu",
    every: 1h,
    offset: 0s,
}



endpoint = slack.endpoint(url: "https://hooks.slack.com/services/xxx/xxx")(
    mapFn: (r) => ({
        channel: "",
        text: r._message,
        color: "danger",
    }),
)

notification_data = {
    _notification_rule_id: "0000000000000001",
    _notification_rule_name: "cpu_check",
    _notification_endpoint_id: "0000000000000002",
    _notification_endpoint_name: "cpu_check_slack",
}

status = from(bucket: "jetson-stats")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
    |> filter(fn: (r) => r["_field"] == "jetson_CPU1")
    |> group(columns: ["_measurement"])
    |> monitor.check(
        crit: (r) => r._value > 90.0,
        warn: (r) => r._value > 80.0,
        info: (r) => r._value > 70.0,
        ok: (r) => r._value <= 60.0,
        messageFn: (r) => if r._level == "crit" then
            "Critical alert!! CPU usage is at ${r._value}%!"
        else if r._level == "warn" then
            "Warning! CPU usage is at ${r._value}%."
        else if r._level == "info" then
            "CPU usage is at ${r._value}%."
        else
            "Things are looking good.",
        data: {
            _check_name: "CPU Utilization (Used Percentage)",
            _check_id: "cpu_used_percent",
            _type: "threshold",
            tags: {},
        },
    )

crit = status |> filter(fn: (r) => r._level == "ok")

all_statuses = crit
    |> filter(fn: (r) => r["_time"] >= experimental.subDuration(d: 10m, from: now()))

count_statuses = all_statuses
//why the heck did you add a _time column here?
//because of https://github.com/influxdata/flux/blob/f88b6c8560b248f93e6a9435c6b7278f915b547f/stdlib/influxdata/influxdb/monitor/monitor.flux#L126
    |> map(fn: (r) => ({r with _time: now() }))
    |> count()
    |> yield(name: "count")

    record = count_statuses |> findRecord(fn: (key) => key._type == "threshold", idx: 0)

notify = (tables=<-) => {
    final_status = tables
        |> monitor.notify(
            endpoint: endpoint,
            data: notification_data,
        )

noneTable = array.from(rows: [{ _time: now(), _field: "myCount", _value: 0, _level: "", _notification_rule_name: "", _check_name: "", _sent: "", _message: "", _measurement:"", _status_timestamp: 0 }])

//WHY did i create a noneTable? See below 

    condition_met = if record._value > 3 then final_status else noneTable

    return condition_met
}
 count_statuses |> notify()

Some things to note here:

why can’t I just return a noneTable of noneTable = array.from(rows: [{ _time: now(), value:0.0 }]) in other words or why do I have to match the input table for an if statement?

The reason for this is that the the if expression requires that the two branches must match in their type, so nondTable has to have the same type as final_notify . This is why I also can’t do the following:


notify = (tables=<-) => {
    final_status = tables
        |> monitor.notify(
            endpoint: endpoint,
            data: notification_data,
        )

//Originally I wanted to do the following but tables which is all_statuses doesn't match the final_status after the monitor.notify() function adds metadata. 

    condition_met = if record._value > 3 then final_status else tables

    return condition_met
}

Also Note:
I don’t need to write a custom function for this. I took that approach originally because I wanted to compare two tables with different columns (as described in the note above).
Instead you could do:

import "influxdata/influxdb/monitor"
import "influxdata/influxdb/v1"
import "slack"
import "experimental"
import "array"

option task = {
    name: "check_cpu",
    every: 1h,
    offset: 0s,
}



endpoint = slack.endpoint(url: "https://hooks.slack.com/services/xxx/xxx")(
    mapFn: (r) => ({
        channel: "",
        text: r._message,
        color: "danger",
    }),
)

notification_data = {
    _notification_rule_id: "0000000000000001",
    _notification_rule_name: "cpu_check",
    _notification_endpoint_id: "0000000000000002",
    _notification_endpoint_name: "cpu_check_slack",
}

status = from(bucket: "jetson-stats")
    |> range(start: -1h)
    |> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
    |> filter(fn: (r) => r["_field"] == "jetson_CPU1")
    |> group(columns: ["_measurement"])
    |> monitor.check(
        crit: (r) => r._value > 90.0,
        warn: (r) => r._value > 80.0,
        info: (r) => r._value > 70.0,
        ok: (r) => r._value <= 60.0,
        messageFn: (r) => if r._level == "crit" then
            "Critical alert!! CPU usage is at ${r._value}%!"
        else if r._level == "warn" then
            "Warning! CPU usage is at ${r._value}%."
        else if r._level == "info" then
            "CPU usage is at ${r._value}%."
        else
            "Things are looking good.",
        data: {
            _check_name: "CPU Utilization (Used Percentage)",
            _check_id: "cpu_used_percent",
            _type: "threshold",
            tags: {},
        },
    )

crit = status |> filter(fn: (r) => r._level == "ok")

all_statuses = crit
    |> filter(fn: (r) => r["_time"] >= experimental.subDuration(d: 10m, from: now()))

count_statuses = all_statuses
//why the heck did you add a _time column here?
//because of https://github.com/influxdata/flux/blob/f88b6c8560b248f93e6a9435c6b7278f915b547f/stdlib/influxdata/influxdb/monitor/monitor.flux#L126
    |> map(fn: (r) => ({r with _time: now() }))
    |> count()
    |> yield(name: "count")

    record = count_statuses |> findRecord(fn: (key) => key._type == "threshold", idx: 0)

final_notify = all_statuses |> last() |> map(fn: (r) => ({ r with_time: now() }))  |> monitor.notify( endpoint: endpoint, data: notification_data )

noneTable = array.from(rows: [{ _time: now(), _field: "myCount", _value: 0, _level: "", _notification_rule_name: "", _check_name: "", _sent: "", _message: "", _measurement:"", _status_timestamp: 0 }])

if int(v: record._value) > 3 then final_notify else noneTable

@debnath we’re going from worse to better here…

  1. a check as you normally would do and then a custom notification rule. In other words you can just create the check through the UI and split it into two tasks as you’re used to doing.
import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
import "array"

option task = {name: "every 3", every: 15s, offset: 5s}

slack_endpoint = slack["endpoint"](url: "https://hooks.slack.com/services/xxx/xxx")
notification = {_notification_rule_id: "0870018fd4453000", _notification_rule_name: "every 3", _notification_endpoint_id: "078311d230cc1000", _notification_endpoint_name: "My Slack"}
statuses = monitor["from"](start: -30s)
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))


count_statuses = all_statuses
    |> count()
    |> yield(name: "count")

record = count_statuses |> findRecord(fn: (key) => true, idx: 0)


final_notify = all_statuses |> last() |> map(fn: (r) => ({ r with_time: now() })) |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "", text: "Notification Rule: ${r._notification_rule_name} triggered by check: ${r._check_name}: ${r._message}", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))

if int(v: record._value) > 3 then final_notify else all_statuses

Getting a little smarter:
3) use stateCount instead:

import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
import "array"

option task = {name: "every 3", every: 15s, offset: 5s}

slack_endpoint = slack["endpoint"](url: "https://hooks.slack.com/services/xxx/xxx")
notification = {_notification_rule_id: "0870018fd4453000", _notification_rule_name: "every 3", _notification_endpoint_id: "078311d230cc1000", _notification_endpoint_name: "My Slack"}
statuses = monitor["from"](start: -30s)
crit = statuses |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit |> filter(fn: (r) => r["_time"] >= experimental["subDuration"](from: now(), d: 1h))

all_statuses
  |> stateCount(fn: (r) => true)
  |> filter(fn: (r) => r._value >= 3)
  |> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) => ({channel: "", text: "Notification Rule: ${r._notification_rule_name} triggered by check: ${r._check_name}: ${r._message}", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))

@debnath
This solution is more or less equivalent to number 3)
4) a) not shown: create a check like you normally would
4) b) create a task to count the levels/statuses on top of the check
4) c) notify on top of that count:
the task to count the level=crit instances

//Query Rate Limit Counter Task 
//Queryies the statuses from the "_monitoring" bucket and counts the number of 
// times the query rate limit has been surpassed/"_level"="crit"
// writes the count to a new measurement, "statuses_counter" in the "_monitoring" bucket
import "date"
today = date.truncate(t: now(), unit: 1d)
option task = { 
  name: "Query Rate Limit Counter Task",
  every: 1h,
  offset: 5m
}
check_query_rate_limit_data = 
    from(bucket: "_monitoring")
    |> range(start: today)
    |> filter(fn: (r) => r["_measurement"] == "statuses")
    |> filter(fn: (r) => r["_check_name"] == "check_query_rate_limit")
    |> filter(fn: (r) => r["_field"] == "_message")
check_quert_rate_limit_data
  |> stateCount(fn: (r) => r._level == "crit", column: "crit_query_counter")
  |> filter(fn: (r) => r.crit_query_counter % 10 == 0)
  |> map(fn:(r) => ({ r with crit_query_counter: string(v: r.crit_query_counter)}))
  |> map(fn:(r) => ({r with _measurement: "statuses_counter"}))
  |> to(bucket: "rate_limits")

the notification rule:

//Query Rate Limit Notification Rule 
//Query notifications for every time the write limit is hit 10 times within a day
package main
import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
option task = {name: "Query Rate Limit Notification Rule", 
               every: 1h, 
               offset: 5m}
slack_endpoint = slack["endpoint"](url: "https:hooks.slack.com/services/xxx/xxx/xxx")
notification = {_notification_rule_id: "0758afea09061000",
                _notification_rule_name: "Query Rate Limit Notification Rule",
                _notification_endpoint_id: "0754bad181a5b000",
                _notification_endpoint_name: "My slack",}
statuses = from(bucket: "_monitoring")
|> range(start: -task.every)
|> filter(fn: (r) => r["_measurement"] == "statuses_counter")
|> filter(fn: (r) => r["_check_name"] == "Query Rate Limit Check")
crit = statuses 
       |> filter(fn: (r) => r["_measurement"] == "statuses_counter")
       |> filter(fn: (r) => r["_level"] == "crit")
all_statuses = crit 
               |> filter(fn: (r) => (r["_time"] >= experimental["subDuration"](from: now(), d: 10m)))
all_statuses 
|> monitor["notify"](data: notification, 
                     endpoint: slack_endpoint(mapFn: (r) => ({
    channel: "",
    text: "Notification Rule: ${ r._notification_rule_name } triggered by check: ${ r._check_name }: ${ r._message }",
    color: "danger"
  }))())

@debnath what I like about number 4) is that it’ll give you a notification each time you reach 3 (or in that example 10) levels=crit.
However you have to (or it assumes that) create a new bucket (“rate_limits”) to write that count to.

Finally friendly reminder that you can visualize all theses outputs in the data explorer to test

@debnath please note you’ll have to adjust the scripts accordingly to fit your data. But the logic is there

@Anaisdg at 1st sorry for late query as i was outside for company meeting. And Thank you for the beautiful explanation. Really appreciated.

Can i debug line by line of my task? Because something is wrong that is why my slack endpoint is not receiving message over task run successfully.

  1. This my check which is for crit status on that query

  2. This is my endpoint

  3. This is my notification rule -

  4. My task -

import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "experimental"
import "array"

option task = {name: "Notification Task", every: 1m, offset: 5s}

slack_endpoint = slack["endpoint"](url: "https://hooks.slack.com/services/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")
notification = {
	_notification_rule_id: "08762fc8a72f2000",
	_notification_rule_name: "My Test Rule",
	_notification_endpoint_id: "07fd2b39e56ee000",
	_notification_endpoint_name: "Slack Service",
}
statuses = monitor["from"](start: -1h)
crit = statuses
	|> filter(fn: (r) =>
		(r["_level"] == "crit"))
all_statuses = crit
	|> filter(fn: (r) =>
		(r["_time"] >= experimental["subDuration"](from: now(), d: 1h)))

all_statuses
	|> stateCount(fn: (r) =>
		(true))
	|> filter(fn: (r) =>
		(r._value >= 3))
	|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
		({channel: "", text: "Custom Task Notification Rule: ${r._notification_rule_name} triggered by check: ${r._check_name}: ${r._message}", color: if r["_level"] == "crit" then "danger" else if r["_level"] == "warn" then "warning" else "good"})))

Result - On crit status the of the check im getting the message in slack but not firing from task.

Am i not understanding the flow?
Or is there more to learn.

Problem - My expectation is when ever the count of crit state on testField of myMeasurement will >= 3 then send message to slack endpoint.

I am new to this task thing i may ask you silly question.
Please help me to learn this thing as I’m super excited to learn it.

Hello @debnath,
how are you getting the message in slack if it’s not firing from the task? Are you using that endpoint in any other code?

The code is the in the first example above.

To debug your code, I always recommend running the script in the data explorer and including multiple yield() statements throughout the script to make sure you’re getting what you expect.

Hi @Anaisdg,

Thank a lot for the help. Its working now. I am improving day by day and very excited for more new thing to learn. :grinning:

1 thing, i am trying to add a new task with below code.

import "influxdata/influxdb/monitor"
import "slack"
import "influxdata/influxdb/secrets"
import "array"

option task = {name: "No Task", every: 1m, offset: 2s}

slack_endpoint = slack["endpoint"](url: "https://hooks.slack.com/services/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")
notification = {
	_notification_rule_id: "08762fc8a72f2000",
	_notification_rule_name: "My Test Rule",
	_notification_endpoint_id: "07fd2b39e56ee000",
	_notification_endpoint_name: "Slack Service",
}


from(bucket: "SENSOR_DATA_DB")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "myMeasurement")
  |> filter(fn: (r) => r["_field"] == "testField")
  |> aggregateWindow(every: 1m, fn: count, createEmpty: false)
  |> filter(fn: (r) => r["_value"] > 3)
	|> monitor["notify"](data: notification, endpoint: slack_endpoint(mapFn: (r) =>
		({channel: "", text: "Count is getting higher...", color: "danger"})))

But task is not saving raising an error.

My Task -

Error -
image

I followed this link. I thing they are doing same - monitor.notify() function | Flux 0.x Documentation

Can you say where it is wrong?

1 User Suggestion (In my point of view)- When i am trying to add a task if there is any error in script then whole section is vanishing and showing the error. Again i have to write the whole thing and save. If the modal stay if there is any error then it will be a help for user.

Hello @debnath,
That might be a bug with the UI.
Sometimes to create a new task script through the UI. I have to first create a dummy one.
and I configure the options through the UI only. and make the script "foo". I hit save. Then I reopen and edit the task.
Can you create an issue/feature request here?

Thank you

@Anaisdg
Yes sure. I will.
Thank you.

1 Like