Hi all,
I need some help with Kapacitor UDF (python). I’ve got implementation, seems like it works, i can log data from, and i see that data is correct. Here is tick blob of it:
var online_users_rate = online_users_count
|join(online_users_diff)
.as(‘count’, ‘diff’)
.tolerance(1s)
|eval(lambda: “diff.value” / “count.value”)
.as(‘value’)
var online_users_rate_hpf = online_users_rate
@hpf()
.field(‘value’)
.warmup(0)
.cutoff(0.0001)
.as(‘value’)
online_users_diff and online_users_count - batches.
When i’m trying to join:
online_users_rate
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(1s)
|log()
it failed, there is no results, but if i try to join like
online_users_rate_hpf
|join(online_users_rate_hpf)
.as(‘rate’, ‘rate_hpf’)
.tolerance(1s)
|log()
it works. So joining results of my @hpf UDF with it self works, joining with something - doesn’t. One side notice, i did the following logs:
def begin_batch(self, begin_req):
logger.info("Starting batch {}".format(begin_req))
self._input_points = []
self._filter_state.reset()
# Keep copy of begin_batch
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
and in log i see the following:
ts=2020-04-29T04:03:54.278Z lvl=info msg=“UDF log” service=kapacitor task_master=main task=test node=hpf6 text=“INFO:root:Starting batch name: “online_users””
Where it’s getting batch name from ??
Please i urgently need help, trying to push the product, but without custom UDF - there is no chance, from example above hpf - it’s high pass filter.