I’m having trouble at making an UDF that wants a STREAM and provides a BATCH.
def info(self): response = udf_pb2.Response() response.info.wants = udf_pb2.STREAM response.info.provides = udf_pb2.BATCH response.info.options['field'].valueTypes.append(udf_pb2.STRING) return response
Is there anyone with an example code?? I searched around the web (foruns, documentation) but all the examples are for BATCH-BACH, STREAM-STREAM or BATCH-STREAM.
I saw in the examples that when writing the response to Kapacitor, in the “end_batch(self, end_req)” method, is necessary to kind of “comunicate” that the BATCH has ended, in an example this was made this way:
def end_batch(self, end_req): # Send begin batch with count of outliers self._begin_response.begin.size = len(self._batch) self._agent.write_response(self._begin_response) response = udf_pb2.Response() ... # Send an identical end batch back to Kapacitor # HERE response.end.CopyFrom(end_req) self._agent.write_response(response)
In order to send the BATCH, I have to send it from the “point(self, point)” method, but can’t acess the end_req object and don’t know how to create one.
Thanks in advance!