Help with Batch response from Python UDF (kapacitor)

I’m struggling to understand how to build the Response object in my UDF when it’s the Batch type.

The only example I can find is outliers.py, so I’m trying to copy it.

My scenario is straight-forward, all I’m doing is running a batch query then a udf against the returned data. I’d like to then return multiple points from the UDF and raise alerts on all of them (or a selection of them, based on the .crit)…

var sample = batch
    |query('''SELECT col1, col2, last(col3)
              FROM "db"."rp".measurement
           ''')
        .period(90d)
        .every(1m)
        .groupBy(time(1h), 'col1')

sample
    @anomalyDetect()
        .tollerance(6.25)
    |alert()
        .id('anomaly_{{ index .Tags "value1"}}_{{ index .Tags "value2"}}')
        .stateChangesOnly()
        .crit(lambda: True)
        .message('some text here')
        .log('/tmp/udf/output.log')
        .slack()
            .channel('#whatever')

My alert is firing, but only for first point in my response (maybe it’s the only point, i think this is where I’m going wrong).

def info(self):
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH
        response.info.options['tollerance'].valueTypes.append(udf_pb2.DOUBLE)
        return response

My begin_batch looks like this, it’s pretty-much taken straight from the example above…

def begin_batch(self, begin_req):
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response

My end_batch does some analysis of the data, and stores the points which i’d like to return inside self._points[]. I’m attempting to return those points like this…

def end_batch(self, end_req):
        # a bunch of analysis code on the points here
        # points of interest stored in self._points[]

        self._begin_response.begin.size = len(self._points)
        self._agent.write_response(self._begin_response)

        response = udf_pb2.Response()
        for p in self._points:
            response.point.CopyFrom(p)
            self._agent.write_response(response)

        response.end.CopyFrom(end_req)
        self._agent.write_response(response)

When I look in the /tmp/udf/output.log I’m seeing one entry for the one alert that was generated, but it has 12 items inside values (i.e. the 12 items from my self._points[])

"level": "CRITICAL",
  "data": {
    "series": [
      {
        "name": "xx",
        "tags": {
          "key": "value"
        },
        "columns": [
          "aa",
          "bb",
          "cc",
          "dd"
        ],
        "values": [ [list_of_12_items_here] ]

I’m not sure what I’m doing wrong?

Can I generate an alert for each point in my response?

Am I even generating the response correctly ?

:confused:

The alert node will generate one alert per batch. Since you are passing all point back as a single batch only one alert is triggered.

If you want each point to be treated individually have your UDF return points as a stream and not a batch. That means declaring that response.info.provides = udf_pb2.STREAM and then not writing begin end messages to the agent.

Can I tack on a question about this script?
In the stream definition there is a group by…

It looks like you are saving state between runs inside of kapacitor , and only running the anomaly detction on new points… what i can’t figure out is how you keep your states seperate per “group”.

Is this handled by Kapacitor under the covers somewhere?
(we are planning to implement some of the anomaly detection algorithms like Twitters into Kapacitor in the near future)