Creating Kapacitor UDF that wants a STREAM and provides a BATCH (Python)

Hello friends!

I’m having trouble at making an UDF that wants a STREAM and provides a BATCH.

This way:

def info(self):
		response = udf_pb2.Response()
		response.info.wants = udf_pb2.STREAM
		response.info.provides = udf_pb2.BATCH
		response.info.options['field'].valueTypes.append(udf_pb2.STRING)
		return response

Is there anyone with an example code?? I searched around the web (foruns, documentation) but all the examples are for BATCH-BACH, STREAM-STREAM or BATCH-STREAM.

I saw in the examples that when writing the response to Kapacitor, in the “end_batch(self, end_req)” method, is necessary to kind of “comunicate” that the BATCH has ended, in an example this was made this way:

def end_batch(self, end_req):
		# Send begin batch with count of outliers
		self._begin_response.begin.size = len(self._batch)
		self._agent.write_response(self._begin_response)

		response = udf_pb2.Response()


                                      ...    


		# Send an identical end batch back to Kapacitor
            # HERE
		response.end.CopyFrom(end_req)
		self._agent.write_response(response)

In order to send the BATCH, I have to send it from the “point(self, point)” method, but can’t acess the end_req object and don’t know how to create one.

Thanks in advance!
Bye bye!

Hi @Vittorfp were you able to get this resolved? I am trying something similar and an example would really help. Thanks!