Tickscript for dynamic number of measurements

Looking for some advice on a scalable tickscript-based solution!

The script is to fire an alert when change in a stock goes above or below that of its corresponding index.

Time IDX S1 S2 Alert
t0 11 9 10
t1 12 S2 above IDX
t1 13 S2 below IDX
t2 11
t3 14 S1 above IDX
t4 12 S1 below IDX

I managed to come up with a tickscript that does the job given an index and a stock.

  • Two batch queries – one to get the latest value of index and the other to get the latest value of stock
  • Join the results
  • Check the value and generate an alert.

However, it’s not clear to me how this can be extended to a dynamic number of stocks.

Join doesn’t seem to be an option. I tried to see if combine can work, but query|last|combine is causing the Kapacitor to crash (unable to see what the error is though).

Any pointers/tips on how to go about this are appreciated!

Hello @prmkbr,
Can you please share your tickscript? That’s a great example, and I think it would be really useful to the community.
Hmm…that’s a tricky question. The best way to probably go is to create a task for each index comparison and use regex with tags that pair stocks with the indexes, to make a massive join.

Sure, @Anaisdg, this is what I’ve got so far. Not sure, but there may be a better way of doing this! :slightly_smiling_face:

var idx_chg = batch
    | query('SELECT "value" FROM "change_bps" WHERE symbol="IDX"')
        .period(span)
        .every(frequency)

    | last('value')
        .as('value')


var stk_chg = batch
    | query('SELECT "value" FROM "change_bps" WHERE symbol="S1"')
        .period(span)
        .every(frequency)

    | last('value')
        .as('value')


idx_chg
    | join (stk_chg)
        .as('idx', 'stk')
        .tolerance(1s)

    | eval(lambda: "idx.value", lamba: "stk.value")
        .as('idx_val', 'stk_val')

    | alert()
        .info(lambda: "stk_val" <= "idx_val")
        .warn(lambda: "stk_val" > "idx_val")
        .stateChangesOnly()
        .message('{{ if eq .Level "INFO" }} S1 below IDX {{ else }} S1 above IDX {{ end }}')
        .log(alert_log)

    | influxDBOut()
        .database(db)
        .retentionPolicy(rp)
        .measurement('alerts')

Looking at Add ability to combine a stream with itself dynamically. by nathanielc · Pull Request #693 · influxdata/kapacitor · GitHub, it does look like combine is the way to go.

I modified the script to use combine, but it’s crashing the Kapacitor and I don’t find any error in the logs.

Are there any know issues with this pipeline – query() | last() | combine()?

I’ve to use last() for the reasons discussed in Unable to join data in TICK script - #10 by rawkode – to get the correct timestamp on the generated alert.

What is your span (how many points in it).

Span is 1day. There could be any number of points (in the order of hundreds). In my test data, I had about 50-60 points.