Joining Kapacitor and UDF data




I would like to write a user-defined function with behavior similar to that of Kapacitor’s inbuilt count, min, max etc. aggregation functions. In particular, the data coming from this UDF should be joined with Kapacitor data as in the following example:

dbrp "testdb"."autogen"                                                                                                                                                                             
var data = batch                                                                                                                                                                                    
    SELECT * FROM "testdb"."autogen"."tsm_data" WHERE f_l0 = 'Archive_1' AND f_l1 =~ /^dataNode.*$/                                                                                                  
var count = data                                                                                                                                                                                    
var status = data                                                                                                                                                                                   
        .as('count', 'status')                                                                                                                                                                                                                                                                                                                                                             
        lambda: unixNano(now()),                                                                                                                                                                    
        lambda: "count.value",                                                                                                                                                                      
        lambda: int("status.status_value"),                                                                                                                                                         
        lambda: "f_l1"                                                                                                                                                                              
        .as('f_changed', 'vl_1_Count_INT', 'vl_4_Status_INT', 'f_l1')                                                                                                                               
        .tag('f_l0', 'Archive_1_dest_kap')                                                                                                                                                                                                                                                                                                                         

The source measurement has this structure

name: tsm_data
time                f_changed           f_l0      f_l1      vl_0_int_INT vl_1_Status_INT
----                ---------           ----      ----      ------------ ---------------
1543497484673059335 1543497484673057278 Archive_1 dataNode1 10           1
1543497489676660290 1543497489676658110 Archive_1 dataNode1 10           1
1543497494698380454 1543497494698378299 Archive_1 dataNode1 10           1

When writing the UDF I used the code on as a template, except that my code is in Go. My problem is that the join in the script above does not work, i.e. that I see the following output in kapacitor show

query1 [avg_exec_time_ns="0s" batches_queried="2" errors="0" points_queried="6" working_cardinality="0" ];
query1 -> statusacc3 [processed="2"];
query1 -> count2 [processed="2"];

statusacc3 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
statusacc3 -> join5 [processed="2"];

count2 [avg_exec_time_ns="160ns" errors="0" working_cardinality="1" ];
count2 -> join5 [processed="2"];

join5 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
join5 -> eval6 [processed="0"];

eval6 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
eval6 -> influxdb_out7 [processed="0"];

and no data in the InfluxDb destination database. The code sending the point back from the UDF to Kapacitor looks like this:

func (h *StatusHandler) EndBatch(end *agent.EndBatch) error {                                                                                                                                       
    point := &agent.Point{                                                                                                                                                                          
        Time:      end.Tmax,                                                                                                                                                                        
        Name:      end.Name,                                                                                                                                                                        
        Group:     end.Group,                                                                                                                                                                       
        Tags:      end.Tags,                                                                                                                                                                        
        FieldsInt: map[string]int64{"status_value": int64(1)},                                                                                                                                      
    h.agent.Responses <- &agent.Response{                                                                                                                                                           
        Message: &agent.Response_Point{                                                                                                                                                             
            Point: point,                                                                                                                                                                           
    return nil                                                                                                                                                                                      

(the hard-coded status_value is just for demonstration purposes)

I suspect that Kapacitor is not able to merge the points coming directly from Kapacitor and the one coming from the UDF. Is there a way I can debug this? Is there something fundamentally wrong here that I am not seeing?