Kapacitor stream processing multiple fields

Hi,

i am trying to do a stream process for downsampling as follows

var period = 1s
var interval = 1s

var data = batch
|from()
    .database('db')
    .retentionPolicy('autogen')
    .measurement('ms1')
    .groupBy(*)
|eval(lambda: "latency1" - "latency2")
    .as('latency_diff')        
|mean('latency_diff')
    .as('latency_diff')
|mean('latency1')
    .as('latency1')
|mean('latency2')
    .as('latency2')
|window()
    .period(period)
    .every(interval)
|influxDBOut()
    .database('db_cold')
    .retentionPolicy('autogen')
    .measurement('ms1')
    .precision('s')   

but it does not work. using kapacitor cli i have found that some of the mean calcs return errors.

is the above possible?

Hi,

Do you get an error message when defining the TICK script in Kapacitor?
Firstly i think you need to change the line

var data = batch to var data = stream

I think you might need to stream each value you want to measure separately and then join them. I’m not 100% sure though.

the link below might help.
joining data

Hi,

the stream/batch was a type, it is stream.
do you suggest that multiple values can only be handled with multiple stream that are then joined in the end?

Based on the little knowledge i have i think that would be the way to go.

I’m relatively new to TICK but from what i read about the join node i think that would work.

it works but i have to create a stream for every filed i want to join in the end.
if like, i have, you have 3 fields you would need 3 streams and join them in order to get the data in db.

Hi,

Yep that’s how i understand it. AFAIK you stream them, do any aggregations on them that you want to do and store the value of that. Then you join them together to build the alert. i think it joins them back together based on the UNIX time stamp in the database.

Also, I’m not sure if you’re using telegraf but your script is looking for data every second but i think the default for telegraf itself is 10s so you might end up with points where there is no data. That might actually cause it to not work if there is no data. Or you might get false alarms.

There might be an easier or better way to do this but i haven’t found it yet. I don’t do much streaming based on multiple measurements. A majority of scripts i write i’m told which measurement i need to alert on and use that.

If you can define and enable your script in Kapacitor then a good tool to use is ‘kapacitor show task_name’ once it is running - That should give you an idea as to where abouts your script gets to before it stops working. I’ve found it quite useful over the last few days.

Hi,

kapacitor show, showed me that the script did not work due to errors.
the metric are custom directly from the application and do not use telegraf.

after watching today’s webinar about advanced kapacitor the problem is solved the following way:

var period = 1s
var interval = 1s

var data = stream
|from()
    .database('db')
    .retentionPolicy('autogen')
    .measurement('ms1')
    .groupBy(*)
|window()
.period(period)
.every(interval)


var diff = data |eval(lambda: "latency1" - "latency2")
           .as('latency_diff')        
          |mean('latency_diff')
          .as('latency_diff')

var latency1 = data | mean('latency1')
                .as('value')

var latency2 = data|mean('latency2')
                .as('value')

diff 
| join(latency1,latency2)
        .as('lat1', 'lat2')
|influxDBOut()
    .database('db_cold')
    .retentionPolicy('autogen')
    .measurement('ms1')
    .precision('s')

thanks to Michael DeSa

1 Like

Ah good news! glad you worked it out. Nice of you to post the solution as well!

i thought it might interest you as well. thanks for your input

It’s appreciated, as it happens I’m working on something to do with latency so it could prove helpful.

No worries.