Tickscript: Getting field values from multiple where() nodes based on single stream

Goal
To subtract the field of one chain from another, both coming from the same stream. I’m not sure if it’s possible and haven’t found any examples like this.

The chains need to be synchronized, there will be more topics and host so both must be observing the same topic and host simultaneously, only a week apart (or in this test case, a ~minute)

var whereFilter = lambda: (“attribute” == ‘OneMinuteRate’) AND (“beanMeasure” == ‘MessagesInPerSec’) AND (“beanTopic” == ‘test_test’)

var offsetWhereFilter = lambda: (“attribute” == ‘OneMinuteRate’) AND (“beanMeasure” == ‘MessagesInPerSec’) AND (“beanTopic” == ‘test_test’) AND “time” > now() - 2m AND “time” < now() - 1m
var groupBy = [‘host’, ‘topic’]

var data = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.groupBy(groupBy)
.quiet()

var past = data
|where(offsetWhereFilter)
|eval(lambda: “attr_value”)
.as(‘pastvalue’)
.keep()
.quiet()

var present = data
|where(whereFilter)
|eval(lambda: “attr_value”)
.as(‘nowvalue’)
.keep()
.quiet()

var trigger = present
|join(past)
.as(‘current’, ‘past’)
|eval(lambda: float(“pastvalue” - “nowvalue”) / float(“pastvalue”) * 100.0)
.keep()
.as(‘newValue’)
|log()
|alert()
.info(lambda: “current.value” > 0)
.stateChangesOnly()
.message(message + ‘past.value’ + ‘current.value’)
.id(idVar)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)

A good portion of the logs kapacitor displays for me are as follows

ts=2019-07-25T19:50:55.502Z lvl=error msg=“error while evaluating expression” service=kapacitor task_master=main task=partialTest node=influxdb_out12 err=“invalid math operator - for type time”
ts=2019-07-25T19:50:58.330Z lvl=error msg=“error while evaluating expression” service=kapacitor task_master=main task=partialTest node=influxdb_out12 err=“left reference value “topic” is missing value”

And for more context

stream0 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
stream0 -> from1 [processed=“102735”];

from1 [avg_exec_time_ns=“9.412µs” errors=“0” working_cardinality=“0” ];
from1 -> where4 [processed=“102735”];
from1 -> where2 [processed=“102735”];

where4 [avg_exec_time_ns=“36.334µs” errors=“435” working_cardinality=“112” ];
where4 -> eval5 [processed=“144”];

eval5 [avg_exec_time_ns=“12.05µs” errors=“0” working_cardinality=“9” ];
eval5 -> join7 [processed=“144”];

where2 [avg_exec_time_ns=“9.934µs” errors=“579” working_cardinality=“112” ];
where2 -> eval3 [processed=“0”];

eval3 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval3 -> join7 [processed=“0”];

join7 [avg_exec_time_ns=“8.827µs” errors=“0” working_cardinality=“9” ];
join7 -> eval8 [processed=“0”];

eval8 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval8 -> log9 [processed=“0”];

log9 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log9 -> alert10 [processed=“0”];

alert10 [alerts_inhibited=“0” alerts_triggered=“0” avg_exec_time_ns=“0s” crits_triggered=“0” errors=“0” infos_triggered=“0” oks_triggered=“0” warns_triggered=“0” working_cardinality=“0” ];
alert10 -> http_out13 [processed=“0”];
alert10 -> eval11 [processed=“0”];

http_out13 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];

eval11 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval11 -> influxdb_out12 [processed=“0”];

influxdb_out12 [avg_exec_time_ns=“0s” errors=“0” points_written=“0” working_cardinality=“0” write_errors=“0” ];

I’m still researching but I’ve been at it for a few hours and am not finding much documentation to support what I’m trying to do. If anyone wants to throw in suggestions I’d be more than grateful.

Hi @nated099,

Please check out the Shiftnode … in combination with the Batchnode …