Goal
To subtract the field of one chain from another, both coming from the same stream. I’m not sure if it’s possible and haven’t found any examples like this.
The chains need to be synchronized, there will be more topics and host so both must be observing the same topic and host simultaneously, only a week apart (or in this test case, a ~minute)
var whereFilter = lambda: (“attribute” == ‘OneMinuteRate’) AND (“beanMeasure” == ‘MessagesInPerSec’) AND (“beanTopic” == ‘test_test’)
var offsetWhereFilter = lambda: (“attribute” == ‘OneMinuteRate’) AND (“beanMeasure” == ‘MessagesInPerSec’) AND (“beanTopic” == ‘test_test’) AND “time” > now() - 2m AND “time” < now() - 1m
var groupBy = [‘host’, ‘topic’]var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy(groupBy)
.quiet()var past = data
|where(offsetWhereFilter)
|eval(lambda: “attr_value”)
.as(‘pastvalue’)
.keep()
.quiet()var present = data
|where(whereFilter)
|eval(lambda: “attr_value”)
.as(‘nowvalue’)
.keep()
.quiet()var trigger = present
|join(past)
.as(‘current’, ‘past’)
|eval(lambda: float(“pastvalue” - “nowvalue”) / float(“pastvalue”) * 100.0)
.keep()
.as(‘newValue’)
|log()
|alert()
.info(lambda: “current.value” > 0)
.stateChangesOnly()
.message(message + ‘past.value’ + ‘current.value’)
.id(idVar)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
A good portion of the logs kapacitor displays for me are as follows
ts=2019-07-25T19:50:55.502Z lvl=error msg=“error while evaluating expression” service=kapacitor task_master=main task=partialTest node=influxdb_out12 err=“invalid math operator - for type time”
ts=2019-07-25T19:50:58.330Z lvl=error msg=“error while evaluating expression” service=kapacitor task_master=main task=partialTest node=influxdb_out12 err=“left reference value "topic" is missing value”
And for more context
stream0 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
stream0 → from1 [processed=“102735”];from1 [avg_exec_time_ns=“9.412µs” errors=“0” working_cardinality=“0” ];
from1 → where4 [processed=“102735”];
from1 → where2 [processed=“102735”];where4 [avg_exec_time_ns=“36.334µs” errors=“435” working_cardinality=“112” ];
where4 → eval5 [processed=“144”];eval5 [avg_exec_time_ns=“12.05µs” errors=“0” working_cardinality=“9” ];
eval5 → join7 [processed=“144”];where2 [avg_exec_time_ns=“9.934µs” errors=“579” working_cardinality=“112” ];
where2 → eval3 [processed=“0”];eval3 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval3 → join7 [processed=“0”];join7 [avg_exec_time_ns=“8.827µs” errors=“0” working_cardinality=“9” ];
join7 → eval8 [processed=“0”];eval8 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval8 → log9 [processed=“0”];log9 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log9 → alert10 [processed=“0”];alert10 [alerts_inhibited=“0” alerts_triggered=“0” avg_exec_time_ns=“0s” crits_triggered=“0” errors=“0” infos_triggered=“0” oks_triggered=“0” warns_triggered=“0” working_cardinality=“0” ];
alert10 → http_out13 [processed=“0”];
alert10 → eval11 [processed=“0”];http_out13 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval11 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval11 → influxdb_out12 [processed=“0”];influxdb_out12 [avg_exec_time_ns=“0s” errors=“0” points_written=“0” working_cardinality=“0” write_errors=“0” ];
I’m still researching but I’ve been at it for a few hours and am not finding much documentation to support what I’m trying to do. If anyone wants to throw in suggestions I’d be more than grateful.