Sure, @Anaisdg, this is what I’ve got so far. Not sure, but there may be a better way of doing this!
var idx_chg = batch
| query('SELECT "value" FROM "change_bps" WHERE symbol="IDX"')
.period(span)
.every(frequency)
| last('value')
.as('value')
var stk_chg = batch
| query('SELECT "value" FROM "change_bps" WHERE symbol="S1"')
.period(span)
.every(frequency)
| last('value')
.as('value')
idx_chg
| join (stk_chg)
.as('idx', 'stk')
.tolerance(1s)
| eval(lambda: "idx.value", lamba: "stk.value")
.as('idx_val', 'stk_val')
| alert()
.info(lambda: "stk_val" <= "idx_val")
.warn(lambda: "stk_val" > "idx_val")
.stateChangesOnly()
.message('{{ if eq .Level "INFO" }} S1 below IDX {{ else }} S1 above IDX {{ end }}')
.log(alert_log)
| influxDBOut()
.database(db)
.retentionPolicy(rp)
.measurement('alerts')
Looking at Add ability to combine a stream with itself dynamically. by nathanielc · Pull Request #693 · influxdata/kapacitor · GitHub, it does look like combine
is the way to go.
I modified the script to use combine
, but it’s crashing the Kapacitor and I don’t find any error in the logs.
Are there any know issues with this pipeline – query() | last() | combine()
?
I’ve to use last()
for the reasons discussed in Unable to join data in TICK script - #10 by rawkode – to get the correct timestamp on the generated alert.