Hello,
I would like to write a user-defined function with behavior similar to that of Kapacitor’s inbuilt count, min, max etc. aggregation functions. In particular, the data coming from this UDF should be joined with Kapacitor data as in the following example:
dbrp "testdb"."autogen"
var data = batch
|query('''
SELECT * FROM "testdb"."autogen"."tsm_data" WHERE f_l0 = 'Archive_1' AND f_l1 =~ /^dataNode.*$/
''')
.period(15s)
.every(15s)
.groupBy('f_l1')
.align()
var count = data
|count('vl_0_int_INT')
.as('value')
var status = data
@statusacc()
.statusFieldName('vl_1_Status_INT')
count
|join(status)
.as('count', 'status')
|eval(
lambda: unixNano(now()),
lambda: "count.value",
lambda: int("status.status_value"),
lambda: "f_l1"
)
.as('f_changed', 'vl_1_Count_INT', 'vl_4_Status_INT', 'f_l1')
.tags('f_l1')
|influxDBOut()
.database('testdb')
.retentionPolicy('autogen')
.measurement('tsm_data')
.tag('f_l0', 'Archive_1_dest_kap')
.precision('ns')
The source measurement has this structure
name: tsm_data
time f_changed f_l0 f_l1 vl_0_int_INT vl_1_Status_INT
---- --------- ---- ---- ------------ ---------------
1543497484673059335 1543497484673057278 Archive_1 dataNode1 10 1
1543497489676660290 1543497489676658110 Archive_1 dataNode1 10 1
1543497494698380454 1543497494698378299 Archive_1 dataNode1 10 1
...
When writing the UDF I used the code on Write socket-based user-defined functions (UDFs) | Kapacitor 1.5 Documentation as a template, except that my code is in Go. My problem is that the join in the script above does not work, i.e. that I see the following output in kapacitor show
query1 [avg_exec_time_ns="0s" batches_queried="2" errors="0" points_queried="6" working_cardinality="0" ];
query1 -> statusacc3 [processed="2"];
query1 -> count2 [processed="2"];
statusacc3 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
statusacc3 -> join5 [processed="2"];
count2 [avg_exec_time_ns="160ns" errors="0" working_cardinality="1" ];
count2 -> join5 [processed="2"];
join5 [avg_exec_time_ns="0s" errors="0" working_cardinality="2" ];
join5 -> eval6 [processed="0"];
eval6 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
eval6 -> influxdb_out7 [processed="0"];
and no data in the InfluxDb destination database. The code sending the point back from the UDF to Kapacitor looks like this:
func (h *StatusHandler) EndBatch(end *agent.EndBatch) error {
point := &agent.Point{
Time: end.Tmax,
Name: end.Name,
Group: end.Group,
Tags: end.Tags,
FieldsInt: map[string]int64{"status_value": int64(1)},
}
h.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: point,
},
}
return nil
}
(the hard-coded status_value
is just for demonstration purposes)
I suspect that Kapacitor is not able to merge the points coming directly from Kapacitor and the one coming from the UDF. Is there a way I can debug this? Is there something fundamentally wrong here that I am not seeing?