Kapacitor: ShiftNode and Join in Stream

Hi, I am using Kapacitor v1.3.1, below TICKscript is not working for me in the Join & Eval section, please help.

join15 [avg_exec_time_ns="95.639µs" errors="0" working_cardinality="14" ];
join15 -> eval16 [processed="392"];

eval16 [avg_exec_time_ns="116.981µs" errors="392" working_cardinality="14" ];
eval16 -> influxdb_out20 [processed="0"];

The log say:

2017/06/22 15:21:04 E! Cannot call function "float" with args signature (missing), available signatures are [(float), (boolean), (string), (int)].

All the field type is float from the measurement I am trying to query and in my eval, i have add float(…), i am not sure what is going on, I even added fill(‘null’) in eval statement as I see there was a github issue saying the first point is alway empty.

> show field keys from analysis
name: btm_sem_click_analysis
fieldKey         fieldType
--------         ---------
current_clicks   float
hour2hour_change float
lasthour_clicks  float
lastweek_clicks  float
threshold        float
week2week_change float
var past = stream
    |from()
        .database(database)
        .retentionPolicy(retention_policy)
        .measurement(measurement)
        .where(where)
        .groupBy(groups)
    |window()
        .period(period)
        .every(every)
        .align()
    |shift(last_mins)

var pastWk_change = past
    |mean('week2week_change')
        .as('pastWk_change')

var pastHr_change = past
    |mean('hour2hour_change')
        .as('pastHr_change')

var past_threshold = past
    |last('threshold')
        .as('threshold')

var current = stream
    |from()
        .database(database)
        .retentionPolicy(retention_policy)
        .measurement(measurement)
        .where(where)
        .groupBy(groups)
    |window()
        .period(period)
        .every(every)
        .align()

var currWk_change = current
    |mean('week2week_change')
        .as('currWk_change')

var currHr_change = current
    |mean('hour2hour_change')
        .as('currHr_change')

var joinData = pastWk_change
    |join(pastHr_change, past_threshold, currHr_change, currWk_change)
        .as('pastWk', 'pastHr', 'threshold', 'currHr', 'currWk')
        .tolerance(tolerence)
        .fill('null')
    |eval(lambda: (float("currWk.currWk_change") - float("pastWk.pastWk_change")), lambda: (float("currHr.currHr_change") - float("pastHr.pastHr_change")), lambda: float("pastWk.pastWk_change"), lambda: float("pastHr.pastWk_change"), lambda: float("currWk.currWk_change"), lambda: float("currHr.currHr_change"), lambda: float("threshold.threshold"))
        .as('differenceWk', 'differenceHr', 'past_w2w_change', 'past_h2h_change', 'current_w2w_change', 'current_h2h_change', 'threshold')

joinData
    |stateCount(lambda: "differenceWk" <= "threshold" AND "differenceHr" <= "threshold")
    |log()
        .prefix('btm_alert')
    |alert()
        .crit(lambda: "state_count" >= statecount)
        .stateChangesOnly(5m)
        .message('Product: {{ index .Tags "product" }}, Hour2Hour Change:  {{ index .Fields "differenceHr" }}, Week2Week Change: {{ index .Fields "differenceWk" }}')
        .slack()

Yeah, that error message is not very helpful, it should read more like field XXX is missing cannot call float(). Unfortunately the error message is not stating which field is missing. We will file an issue to fix the error message.

In the meantime since the the fields are already all floats, you could remove the calls to float and you will probably get a better error message.

1 Like

it’s weird when i try to query or log() my query, i don’t see the missing fields, and yet when i do the join, it’s reporting missing fields.

instead of one big join, if i break up into smaller joins, then the “missing field value” errors are gone…perhaps i am confuse how this works.

var joinDataWk = pastWk_change
  |join(currWk_change, past_threshold)
    .as('past', 'curr', 'threshold')
    .tolerance(tolerence)
  |eval(lambda: float("curr.currWk_change") - float("past.pastWk_change"), lambda: float("past.pastWk_change"), lambda: float("curr.currWk_change"), lambda: float("threshold.threshold"))
    .as('differenceWk', 'past_w2w_change', 'current_w2w_change', 'threshold')

var joinDataHr = pastHr_change
  |join(currHr_change)
    .as('past', 'curr')
    .tolerance(tolerence)
  |eval(lambda: float("curr.currHr_change") - float("past.pastHr_change"), lambda: float("past.pastHr_change"), lambda: float("curr.currHr_change"))
    .as('differenceHr', 'past_h2h_change', 'current_h2h_change')

var joinData = joinDataWk
  |join(joinDataHr)
    .as('joinWk', 'joinHr')
    .tolerance(tolerence)