Issues in setting up Kapacitor Template Tasks

I am trying to create a template task in Kapacitor, same as the one mentioned in documentation (Template tasks | Kapacitor 1.5 Documentation). After days of drilling down, it turned out in the script

 |alert()
         .warn(warn)
         .crit(crit)

thorws error and was fixed by replacing the segment as

 |alert()
         .warn(lambda: warn)
         .crit(lambda: crit)

keeping the script same and vars.json

{
    "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" },
    "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }
}

Can someone confirm if this is the right behavior and is missed from the documentation?

Hi I have tried using the same documentation ,
and I didn’t have to do the modification you did …

kapacitor show cpu_alert
ID: cpu_alert
Error:
Template: generic_mean_alert
Type: stream
Status: enabled
Executing: true
Created: 16 Apr 19 10:25 CEST
Modified: 16 Apr 19 10:35 CEST
LastEnabled: 16 Apr 19 10:35 CEST
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
// Which measurement to consume
var measurement string

// Optional where filter
var where_filter = lambda: TRUE

// Optional list of group by dimensions
var groups = [*]

// Which field to process
var field string

// Warning criteria, has access to 'mean' field
var warn lambda

// Critical criteria, has access to 'mean' field
var crit lambda

// How much data to window
var window = 5m

// The slack channel for alerts
// var slack_channel = '#alerts'
stream
    |from()
        .measurement(measurement)
        .where(where_filter)
        .groupBy(groups)
    |window()
        .period(window)
        .every(window)
    |mean(field)
    |alert()
        .warn(warn)
        .crit(crit)
        .slack()

//         .channel(slack_channel)


Vars:
Name                          Type      Value
crit                          lambda    "mean" < 10.0
field                         string    usage_idle
groups                        list      [host, dc]
measurement                   string    cpu
warn                          lambda    "mean" < 80.0
where_filter                  lambda    "cpu" == 'cpu-total'
window                        duration  1m0s
DOT:
digraph cpu_alert {
graph [throughput="0.00 points/s"];

stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="54"];

from1 [avg_exec_time_ns="19.145µs" errors="0" working_cardinality="0" ];
from1 -> window2 [processed="18"];

window2 [avg_exec_time_ns="0s" errors="0" working_cardinality="1" ];
window2 -> mean3 [processed="2"];

mean3 [avg_exec_time_ns="1.963µs" errors="0" working_cardinality="1" ];
mean3 -> alert4 [processed="2"];

alert4 [alerts_inhibited="0" alerts_triggered="2" avg_exec_time_ns="0s" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="1" warns_triggered="1" working_cardinality="1" ];
}

I used the following script and vars and 3 commands: define-template, define and enable.


var measurement = 'measurement_name'

var groupBy = []

var name = 'executions'

var idVar = name

var field = 'field_name_from_influx'

var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{ index .Fields "value" }} {{.Level}} {{.Time}}'

var idTag = 'alertID'

var levelTag = 'level'

var messageField = 'message'

var durationField = 'duration'

var outputDB = 'chronograf'

var outputRP = 'autogen'

var outputMeasurement = 'chronograf_executions'

var triggerType = 'threshold'

var crit lambda

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("dc" == 'cloudwatch'))
    |mean(field)
        .as('value')
    |alert()
        .crit(crit)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()
        .log('/tmp/lambdaExecutions.log')
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)

vars.json

{
        "crit": {"type" : "lambda", "value" : "\"value\" > 100"},
        "warn": {"type" : "lambda", "value" : "\"value\" > 6000"}
}

and kept getting errors
like
ts= lvl=error msg=“failed to realize reduce context from fields” service=kapacitor task_master=main task=executions node=mean2 err=“field \“field_name\” missing from point”
and
error evaluating expression for level" service=kapacitor task_master=main task=executions node=alert4 err="mismatched type to binary operator. got string >= float. see bool(), int(), float(), string(), duration()"

only after doing the modifications as in the original post seemed to work and report alerts

Hi @mandeep147 ,

is it your complete vars.json file ?

I see no variable for measurement nor field ,
( that causes the error : node=mean2 err=“field \“field_name\” missing from point” )
and the second error is caused by the first error

I am defining those variables in the script itself.

var field = 'field_name_from_influx'

and
var measurement = 'measurement_name'
I am mentioning the exact measurement and field names

Hi , I used your template and vars ,
$ kapacitor version
Kapacitor OSS 1.5.2

I modified

  • var measurement = 'cpu'
    
  • var field = 'usage_idle'
    
  • where(lambda: ("cpu" == 'cpu-total'))
    

and I had no errors …

var measurement = 'cpu'
var groupBy = []
var name = 'executions'
var idVar = name
var field = 'usage_idle'
var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{ index .Fields "value" }} {{.Level}} {{.Time}}'
var idTag = 'alertID'
var levelTag = 'level'
var messageField = 'message'
var durationField = 'duration'
var outputDB = 'chronograf'
var outputRP = 'autogen'
var outputMeasurement = 'chronograf_executions'
var triggerType = 'threshold'
var crit lambda

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("cpu" == 'cpu-total'))
    |mean(field)
        .as('value')
    |alert()
        .crit(crit)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()
        .log('/tmp/lambdaExecutions.log')
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)




$ kapacitor show cpu_alert
ID: cpu_alert
Error:
Template: tmp2
Type: stream
Status: enabled
Executing: true
Created: 16 Apr 19 16:56 CEST
Modified: 16 Apr 19 16:57 CEST
LastEnabled: 16 Apr 19 16:57 CEST
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
var measurement = 'cpu'

var groupBy = []

var name = 'executions'

var idVar = name

var field = 'usage_idle'

var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{ index .Fields "value" }} {{.Level}} {{.Time}}'

var idTag = 'alertID'

var levelTag = 'level'

var messageField = 'message'

var durationField = 'duration'

var outputDB = 'chronograf'

var outputRP = 'autogen'

var outputMeasurement = 'chronograf_executions'

var triggerType = 'threshold'

var crit lambda

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("cpu" == 'cpu-total'))
    |mean(field)
        .as('value')
    |alert()
        .crit(crit)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()
        .log('/tmp/lambdaExecutions.log')
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)

Vars:
Name                          Type      Value
crit                          lambda    "value" > 100
warn                          lambda    "value" > 6000
DOT:
digraph cpu_alert {
graph [throughput="0.00 points/s"];

stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="102"];

from1 [avg_exec_time_ns="22.17µs" errors="0" working_cardinality="0" ];
from1 -> mean2 [processed="34"];

mean2 [avg_exec_time_ns="16.614µs" errors="0" working_cardinality="1" ];
mean2 -> alert3 [processed="33"];

alert3 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="52.556µs" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="1" ];
alert3 -> influxdb_out4 [processed="0"];

influxdb_out4 [avg_exec_time_ns="0s" errors="0" points_written="0" working_cardinality="0" write_errors="0" ];
}

If I change my field to usage_idle_from_measurement
I can reproduce your error ,

you use : var field = ‘field_name_from_influx’ but your error said :
err=“field \“field_name\” missing from point”
So the script is looking for a field named field_name and not for ‘field_name_from_influx’ ?
Maybe there is a special character in : var field = ‘field_name_from_influx’ ?

my error was :
ts=2019-04-16T17:19:10.001+02:00 lvl=error msg=“failed to realize reduce context from fields” service=kapacitor task_master=main task=cpu_alert node=influxdb_out4 err=“field “usage_idle_from_measurement” missing from point”

I tried again with the following script

dbrp "database"."retentionpolicy"

var measurement = 'cloudwatch_aws_lambda'

var groupBy = []

var name = 'executions'

var idVar = name

var field = 'duration_maximum'

var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{ index .Fields "value" }} {{.Level}} {{.Time}}'

var idTag = 'alertID'

var levelTag = 'level'

var messageField = 'message'

var durationField = 'duration'

var outputDB = 'chronograf'

var outputRP = 'autogen'

var outputMeasurement = 'lambda_executions'

var triggerType = 'threshold'

var crit lambda

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("dc" == 'ea-cloudwatch'))
    |mean(field)
        .as('value')
    |alert()
        .crit(crit)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()
        .log('/tmp/lambdaExecutions.log')
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)

Vars:
Name                          Type      Value                                   
crit                          lambda    "value" > 100  
kapacitor define-template template -tick lambdaExecutions.tick 
kapacitor define alerts -template template -vars vars.json 
kapacitor enable alerts

msg="failed to realize reduce context from fields" service=kapacitor task_master=main task=alerts node=mean2 err="field \"duration_maximum\" missing from point"
Field names and measurement names are correct, as I copied directly from my influx database

I also tried

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("dc" == 'ea-cloudwatch'))
    |default()
        .field(field, 0)
    |eval(lambda: field)
        .as('value')

keeping everything same in the latest script but to no avail. Only thing worked was having

alert().crit(lambda: crit)

My tests were done on linux el7.x86_64 / Kapacitor OSS 1.5.2 ,
what is your environment ?

can you try :

stream
|from()
.measurement(measurement)
.where(lambda: (“dc” == ‘ea-cloudwatch’))
|default()
.field(‘field’, 0)
|eval(lambda: “field”)
.as(‘value’)

I am on Kapacitor 1.5.2, Influx version is 1.7.0 and EC2 Linux machine. Also, new data is inserted every hour

Does this work if you add the quotes ?

can you post the first few lines from your measurement , maybe you have fields defined with singel or double quotes ?

for example : 

> insert quotefields "field1"=10
> select * from quotefields
name: quotefields
time                "field1"
----                --------
1555430814755737029 10
dbrp "database"."retentionpolicy"

var measurement = 'cloudwatch_aws_lambda'

var groupBy = []

var name = 'executions'

var idVar = name

var field = 'duration_maximum'

var message = ' {{.ID}} {{.Name}} {{.TaskName}} {{ index .Fields "value" }} {{.Level}} {{.Time}}'

var idTag = 'alertID'

var levelTag = 'level'

var messageField = 'message'

var durationField = 'duration'

var outputDB = 'chronograf'

var outputRP = 'autogen'

var outputMeasurement = 'lambda_executions'

var triggerType = 'threshold'

var crit lambda

stream
    |from()
        .measurement(measurement)
        .where(lambda: ("dc" == 'ea-cloudwatch'))
    |default()
        .field('field', 0)
    |eval(lambda: "field")
        .as('value')
    |alert()
        .crit(crit)
        .message(message)
        .id(idVar)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .stateChangesOnly()
        .log('/tmp/lambdaExecutions.log')
    |influxDBOut()
        .create()
        .database(outputDB)
        .retentionPolicy(outputRP)
        .measurement(outputMeasurement)
        .tag('alertName', name)
        .tag('triggerType', triggerType)

Vars:
Name                          Type      Value                                   
crit                          lambda    "value" > 100                           
DOT:
digraph alerts {
graph [throughput="0.00 points/s"];

stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="8"];

from1 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
from1 -> default2 [processed="8"];

default2 [avg_exec_time_ns="0s" errors="0" fields_defaulted="8" tags_defaulted="0" working_cardinality="0" ];
default2 -> eval3 [processed="8"];

eval3 [avg_exec_time_ns="1.527µs" errors="0" working_cardinality="1" ];
eval3 -> alert4 [processed="8"];

alert4 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="0s" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="1" ];
alert4 -> influxdb_out5 [processed="0"];

influxdb_out5 [avg_exec_time_ns="0s" errors="0" points_written="0" working_cardinality="0" write_errors="0" ];
}

It did not generate any error in kapacitor.log but didn’t trigger any alert either. For the same input, other script triggers the alerts.

in my influx, all field names and the entries are without quotes in the queried database

Is it possible to reference template variables from the eval() ?