Kapacitor UDF (python)

Hi all,
I need some help with Kapacitor UDF (python). I’ve got implementation, seems like it works, i can log data from, and i see that data is correct. Here is tick blob of it:

var online_users_rate = online_users_count
|join(online_users_diff)
.as(‘count’, ‘diff’)
.tolerance(1s)
|eval(lambda: “diff.value” / “count.value”)
.as(‘value’)

var online_users_rate_hpf = online_users_rate
@hpf()
.field(‘value’)
.warmup(0)
.cutoff(0.0001)
.as(‘value’)

online_users_diff and online_users_count - batches.

When i’m trying to join:

online_users_rate
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(1s)
|log()

it failed, there is no results, but if i try to join like
online_users_rate_hpf
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(1s)
|log()

it works. So joining results of my @hpf UDF with it self works, joining with something - doesn’t. One side notice, i did the following logs:

def begin_batch(self, begin_req):
    logger.info("Starting batch {}".format(begin_req))
    self._input_points = []
    self._filter_state.reset()

    # Keep copy of begin_batch
    response = udf_pb2.Response()
    response.begin.CopyFrom(begin_req)
    self._begin_response = response

and in log i see the following:

ts=2020-04-29T04:03:54.278Z lvl=info msg=“UDF log” service=kapacitor task_master=main task=test node=hpf6 text=“INFO:root:Starting batch name: “online_users””

Where it’s getting batch name from ??

Please i urgently need help, trying to push the product, but without custom UDF - there is no chance, from example above hpf - it’s high pass filter.

Hello @borei,
I’m not sure–asking around. Out of curiosity, what’s your Kapacitor Alert name?

Hi, at this moment there is no alert yet

the same situation with UDFs in go. for tests i use outlier.go example.
can’t join outlier node with any other, but i can join outlier node with itself. seems like there is something common at low level.

Hello @borei,
The name comes from:
https://github.com/influxdata/kapacitor/blob/85403fa0115fd4fc0248057b700fc37b38b7bc7f/edge/messages.go#L806

12:18

Can you please. share the logs of

When i’m trying to join:

online_users_rate
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(1s)
|log()

Where it fails? Thank you.

It doesn’t actually fail, i don’t see any errors in the log, it’s just silence.
i did some debug info gathering, when im trying to join my udf node and any other one, im getting the following:

DOT:
digraph test {
graph [throughput=“0.00 batches/s”];

query2 [avg_exec_time_ns=“1.135153ms” batches_queried=“5” errors=“0” points_queried=“10” working_cardinality=“0” ];
query2 -> join4 [processed=“5”];

query1 [avg_exec_time_ns=“0s” batches_queried=“5” errors=“0” points_queried=“15” working_cardinality=“0” ];
query1 -> join4 [processed=“5”];

join4 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“1” ];
join4 -> eval5 [processed=“5”];

eval5 [avg_exec_time_ns=“1.654µs” errors=“0” working_cardinality=“1” ];
eval5 -> hpf6 [processed=“5”];
eval5 -> join8 [processed=“5”];

hpf6 [avg_exec_time_ns=“43.21µs” errors=“0” working_cardinality=“0” ];
hpf6 -> join8 [processed=“5”];

join8 [avg_exec_time_ns=“19.439µs” errors=“0” working_cardinality=“1” ];
join8 -> log9 [processed=“8”];

log9 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];

log9 has processed 8 requests, but there only 5 incoming,

<------------------------------------------------->
if i join udf node with itself:

number of requests are the same for all nodes in the pipeline:
query2 [avg_exec_time_ns=“0s” batches_queried=“6” errors=“0” points_queried=“13” working_cardinality=“0” ];
query2 -> join4 [processed=“6”];

query1 [avg_exec_time_ns=“0s” batches_queried=“6” errors=“0” points_queried=“19” working_cardinality=“0” ];
query1 -> join4 [processed=“6”];

join4 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“1” ];
join4 -> eval5 [processed=“6”];

eval5 [avg_exec_time_ns=“1.659µs” errors=“0” working_cardinality=“1” ];
eval5 -> hpf6 [processed=“6”];

hpf6 [avg_exec_time_ns=“27.043µs” errors=“0” working_cardinality=“0” ];
hpf6 -> join8 [processed=“6”];
hpf6 -> join8 [processed=“6”];

join8 [avg_exec_time_ns=“11.165µs” errors=“0” working_cardinality=“1” ];
join8 -> log9 [processed=“6”];

log9 [avg_exec_time_ns=“172.365µs” errors=“0” working_cardinality=“0” ];
<---------------------------------->

forgot one thing, tick script itself:
var title_name = ‘user_title’

var sliding_window = 190s

var frequency = 10s

var online_users_count = batch
|query(
‘SELECT “0” AS value FROM “graphite”.“autogen”.“online_users”’ +
‘WHERE “title” = ’ +
‘’’ +
title_name +
‘’’
)
.every(frequency)
.period(sliding_window)

var online_users_diff = batch
|query(
‘SELECT difference(“0”) as value FROM “graphite”.“autogen”.“online_users”’ +
‘WHERE “title” = ’ +
‘’’ +
title_name +
‘’’
)
.every(frequency)
.period(sliding_window)

var online_users_rate = online_users_count
|join(online_users_diff)
.as(‘count’, ‘diff’)
.tolerance(1s)
|eval(lambda: “diff.value” / “count.value”)
.as(‘value’)

var online_users_rate_hpf = online_users_rate
@hpf()
.field(‘value’)
.warmup(0)
.cutoff(0.0001)
.as(‘value’)

var online_users = online_users_rate_hpf
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(2s)
|log()
.level(‘DEBUG’)

Looks like there is some “miss-alignment”, don’t know how to express it properly in kapacitor’s terms

eventually with time being that ratio asymptotically gets to 2

DOT:
digraph test {
graph [throughput=“0.00 batches/s”];

query2 [avg_exec_time_ns=“1.366191ms” batches_queried=“40” errors=“0” points_queried=“87” working_cardinality=“0” ];
query2 -> join4 [processed=“40”];

query1 [avg_exec_time_ns=“1.27682ms” batches_queried=“40” errors=“0” points_queried=“127” working_cardinality=“0” ];
query1 -> join4 [processed=“40”];

join4 [avg_exec_time_ns=“10.204µs” errors=“0” working_cardinality=“1” ];
join4 -> eval5 [processed=“40”];

eval5 [avg_exec_time_ns=“4.719µs” errors=“0” working_cardinality=“1” ];
eval5 -> hpf6 [processed=“40”];
eval5 -> join8 [processed=“40”];

hpf6 [avg_exec_time_ns=“39.474µs” errors=“0” working_cardinality=“0” ];
hpf6 -> join8 [processed=“40”];

join8 [avg_exec_time_ns=“25.082µs” errors=“0” working_cardinality=“1” ];
join8 -> log9 [processed=“78”];

log9 [avg_exec_time_ns=“67.416µs” errors=“0” working_cardinality=“0” ];
}

After several hours of investigating im coming to the conclusion that there is some problem with join node, more particularly, it’s not happy for the batches with different timestamp formats. Actually im not the first who mentioned it - https://github.com/influxdata/kapacitor/issues/2083
Example, my initial batch, as a result of query to DB has timestamp as:
time=2020-05-04T16:13:42.184755156Z
derivative from original barch has format:
time=2020-05-04T16:13:42Z
That time stamps are eventually within my tolerance window = 1sec, but join failed to join them because of the time format.
I created workaround for that problem, but it would be cool to have timestamp agnostic join node.

Hello @borei,
Thanks for sharing that issue. Is there anything else I can help with at the moment?

Hi @Anaisdg,
Is it known-bug ? What are the plans about it ?

Hi @Anaisdg,
Any words on it ?

Hello @borei,
Thanks for reminding me to get back to you! Yes I asked the team, that issue has been moved to the backlog so it should be addressed soon. It accidentally slipped through the cracks–thanks for bringing it to our attention.

Cool, hope that it will be addressed soon. Pretty critical one.

@borei,
Agreed. Commenting/checking in on the issue can’t hurt either :wink:

This topic was automatically closed 60 minutes after the last reply. New replies are no longer allowed.