Hello friends!
I’m having trouble at making an UDF that wants a STREAM and provides a BATCH.
This way:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
Is there anyone with an example code?? I searched around the web (foruns, documentation) but all the examples are for BATCH-BACH, STREAM-STREAM or BATCH-STREAM.
I saw in the examples that when writing the response to Kapacitor, in the “end_batch(self, end_req)” method, is necessary to kind of “comunicate” that the BATCH has ended, in an example this was made this way:
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
In order to send the BATCH, I have to send it from the “point(self, point)” method, but can’t acess the end_req object and don’t know how to create one.
Thanks in advance!
Bye bye!