I’m struggling to understand how to build the Response object in my UDF when it’s the Batch type.
The only example I can find is outliers.py, so I’m trying to copy it.
My scenario is straight-forward, all I’m doing is running a batch query then a udf against the returned data. I’d like to then return multiple points from the UDF and raise alerts on all of them (or a selection of them, based on the .crit
)…
var sample = batch
|query('''SELECT col1, col2, last(col3)
FROM "db"."rp".measurement
''')
.period(90d)
.every(1m)
.groupBy(time(1h), 'col1')
sample
@anomalyDetect()
.tollerance(6.25)
|alert()
.id('anomaly_{{ index .Tags "value1"}}_{{ index .Tags "value2"}}')
.stateChangesOnly()
.crit(lambda: True)
.message('some text here')
.log('/tmp/udf/output.log')
.slack()
.channel('#whatever')
My alert is firing, but only for first point in my response (maybe it’s the only point, i think this is where I’m going wrong).
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.BATCH
response.info.options['tollerance'].valueTypes.append(udf_pb2.DOUBLE)
return response
My begin_batch
looks like this, it’s pretty-much taken straight from the example above…
def begin_batch(self, begin_req):
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
My end_batch
does some analysis of the data, and stores the points which i’d like to return inside self._points[]
. I’m attempting to return those points like this…
def end_batch(self, end_req):
# a bunch of analysis code on the points here
# points of interest stored in self._points[]
self._begin_response.begin.size = len(self._points)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
for p in self._points:
response.point.CopyFrom(p)
self._agent.write_response(response)
response.end.CopyFrom(end_req)
self._agent.write_response(response)
When I look in the /tmp/udf/output.log
I’m seeing one entry for the one alert that was generated, but it has 12 items inside values
(i.e. the 12 items from my self._points[]
)
"level": "CRITICAL",
"data": {
"series": [
{
"name": "xx",
"tags": {
"key": "value"
},
"columns": [
"aa",
"bb",
"cc",
"dd"
],
"values": [ [list_of_12_items_here] ]
I’m not sure what I’m doing wrong?
Can I generate an alert for each point in my response?
Am I even generating the response correctly ?