Joining Kapacitor and UDF data

kapacitor

#1

Hello,

I would like to write a user-defined function with behavior similar to that of Kapacitor’s inbuilt count, min, max etc. aggregation functions. In particular, the data coming from this UDF should be joined with Kapacitor data as in the following example:

dbrp "testdb"."autogen"                                                                                                                                                                             
                                                                                                                                                                                                    
var data = batch                                                                                                                                                                                    
    |query('''                                                                                                                                                                                      
    SELECT * FROM "testdb"."autogen"."tsm_data" WHERE f_l0 = 'Archive_1' AND f_l1 =~ /^dataNode.*$/                                                                                                  
    ''')                                                                                                                                                                                            
        .period(15s)                                                                                                                                                                                
        .every(15s)                                                                                                                                                                                 
        .groupBy('f_l1')                                                                                                                                                                            
        .align()                                                                                                                                                                                    
                                                                                                                                                                                                    
var count = data                                                                                                                                                                                    
    |count('vl_0_int_INT')                                                                                                                                                                          
        .as('value')                                                                                                                                                                                
                                                                                                                                                                                                    
var status = data                                                                                                                                                                                   
    @statusacc()                                                                                                                                                                                  
        .statusFieldName('vl_1_Status_INT')                                                                                                                                                         
                                                                                                                                                                                                    
count                                                                                                                                                                                               
    |join(status)                                                                                                                                                                                   
        .as('count', 'status')                                                                                                                                                                                                                                                                                                                                                             
    |eval(                                                                                                                                                                                          
        lambda: unixNano(now()),                                                                                                                                                                    
        lambda: "count.value",                                                                                                                                                                      
        lambda: int("status.status_value"),                                                                                                                                                         
        lambda: "f_l1"                                                                                                                                                                              
    )                                                                                                                                                                                               
        .as('f_changed', 'vl_1_Count_INT', 'vl_4_Status_INT', 'f_l1')                                                                                                                               
        .tags('f_l1')                                                                                                                                                                               
    |influxDBOut()                                                                                                                                                                                  
        .database('testdb')                                                                                                                                                                         
        .retentionPolicy('autogen')                                                                                                                                                                 
        .measurement('tsm_data')                                                                                                                                                                    
        .tag('f_l0', 'Archive_1_dest_kap')                                                                                                                                                                                                                                                                                                                         
        .precision('ns')

The source measurement has this structure

name: tsm_data
time                f_changed           f_l0      f_l1      vl_0_int_INT vl_1_Status_INT
----                ---------           ----      ----      ------------ ---------------
1543497484673059335 1543497484673057278 Archive_1 dataNode1 10           1
1543497489676660290 1543497489676658110 Archive_1 dataNode1 10           1
1543497494698380454 1543497494698378299 Archive_1 dataNode1 10           1
...

When writing the UDF I used the code on https://docs.influxdata.com/kapacitor/v1.5/guides/socket_udf/ as a template, except that my code is in Go. My problem is that the join in the script above does not work, i.e. that I see the following output in kapacitor show

query1 [avg_exec_time_ns="0s" batches_queried="2" errors="0" points_queried="6" working_cardinality="0" ];
query1 -> statusacc3 [processed="2"];
query1 -> count2 [processed="2"];

statusacc3 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
statusacc3 -> join5 [processed="2"];

count2 [avg_exec_time_ns="160ns" errors="0" working_cardinality="1" ];
count2 -> join5 [processed="2"];

join5 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
join5 -> eval6 [processed="0"];

eval6 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
eval6 -> influxdb_out7 [processed="0"];

and no data in the InfluxDb destination database. The code sending the point back from the UDF to Kapacitor looks like this:

func (h *StatusHandler) EndBatch(end *agent.EndBatch) error {                                                                                                                                       
    point := &agent.Point{                                                                                                                                                                          
        Time:      end.Tmax,                                                                                                                                                                        
        Name:      end.Name,                                                                                                                                                                        
        Group:     end.Group,                                                                                                                                                                       
        Tags:      end.Tags,                                                                                                                                                                        
        FieldsInt: map[string]int64{"status_value": int64(1)},                                                                                                                                      
    }                                                                                                                                                                                               
                                                                                                                                                                                                    
    h.agent.Responses <- &agent.Response{                                                                                                                                                           
        Message: &agent.Response_Point{                                                                                                                                                             
            Point: point,                                                                                                                                                                           
        },                                                                                                                                                                                          
    }                                                                                                                                                                                               
                                                                                                                                                                                                    
    return nil                                                                                                                                                                                      
}

(the hard-coded status_value is just for demonstration purposes)

I suspect that Kapacitor is not able to merge the points coming directly from Kapacitor and the one coming from the UDF. Is there a way I can debug this? Is there something fundamentally wrong here that I am not seeing?