Setup:
Points from two sources using the streamNode are continuously logged. They have identical tag and field keys. I am attempting to send an alert with the most recent data from s1 and s2 (cache each point and send an alert with the combined most recent data point from each stream on each new event).
Attempts:
I attempted several solutions using the joinNode, but I need to send the alert on every new update and joinNode reformats the timestamps to the specified interval. I most recently attempted using the unionNode, but the alert output only shows the most recent point’s data. I’ve tried to use flatten and intermediary points similar to another topic, but the result is the same: [kapacitor] Collect and compare two streams with sparse data
// STREAM 1
var s1 = stream
|from()
.database(db_in)
.retentionPolicy(rp_in)
.measurement(ms_in)
.where(f1)
|eval(lambda: "f1", lambda: "f2")
.as('s1_f1', 's1_f2')
// STREAM 2
var s2 = stream
|from()
.database(db_in)
.retentionPolicy(rp_in)
.measurement(ms_in)
.where(f2)
|eval(lambda: "f1", lambda: "f2")
.as('s2_f1', 's2_f2')
// UNION
s1
|union(s1, s2)
.rename('sp')
|alert()
.crit(lambda: 2 > 1)
.message('sp alert')
.log('/tmp/alerts.log')
Is there an alternate solution? Should I try and edit each last element of both stream node’s timestamps and then join after? Open to any and all suggestions.