Calculating the Pearson Correlation Coefficient in Kapacitor

We would like to calculate the pearson correlation coefficient in a TICK-script in Kapacitor, so we can generate alerts when a correlation exceeds certain bounds.

One of our use cases is the combination of a valve position, and a water flow measurement. When the valve is opened, flow increases. When it closes, flow decreases. There is quite some correlation here. We know that when the correlation breaks down, something is wrong in our system. Either the setpoint for the valve position is not correctly coming through to the valve, or the valve is mechanically broken. Either way, when the correlation is gone, we want to be alerted.

Currently I don’t see any way to do this, without creating an UDF. I also see @nathaniel is using the @pearsonr UDF in this MR.

Is there some working code for this UDF available somewhere? Or maybe some example code to get us going?

1 Like

@jaapz I have copied the python UDF code I have been using below. Its a quick implementation but has worked well for me so far.

We have plans to support this directly within the query language but there are some technical challenges we need to overcome with performing the join of the two fields. Until then the below python UDF code should work well.

The basic usage of the UDF is:


// Input windowed data with two fields.
@pearsonr()
     // Specify the two fields to use
    .fields('position', 'flow')
    // Specify the name of the resulting correlation field.
    .as('r')
// Output is a single point per window with the correlation field.
#!/usr/bin/python2

import sys
import json
from kapacitor.udf.agent import Agent, Handler, Server
from kapacitor.udf import udf_pb2
import signal
import math

from scipy.stats import pearsonr

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()


# PearsonR computes the Pearson correlation coefficient for a batch of points
class PearsonRHandler(Handler):
    def __init__(self, agent):
        self._agent = agent
        self._as = 'r'
        self._type = 'float'
        self._fields = None
        self._x = None
        self._y = None


    def info(self):
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.STREAM
        response.info.options['fields'].valueTypes.append(udf_pb2.STRING)
        response.info.options['fields'].valueTypes.append(udf_pb2.STRING)
        response.info.options['as'].valueTypes.append(udf_pb2.STRING)
        response.info.options['type'].valueTypes.append(udf_pb2.STRING)
        return response

    def init(self, init_req):
        success = True
        msg = ''
        for opt in init_req.options:
            if opt.name == 'fields':
                self._fields = [opt.values[0].stringValue, opt.values[1].stringValue]
            if opt.name == 'as':
                self._as = opt.values[0].stringValue
            if opt.name == 'type':
                self._type = opt.values[0].stringValue

        if self._fields is None:
            success = False
            msg += ' must supply fields'

        if self._type not in ['float', 'int']:
            success = False
            msg += ' type must be one of "float" or "int"'


        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]

        return response

    def snapshot(self):
        response = udf_pb2.Response()
        response.snapshot.snapshot = ''
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def begin_batch(self, begin_req):
        self._x = []
        self._y = []

    def point(self, point):
        try:
            x = 0
            y = 0
            if self._type == 'float':
                x = point.fieldsDouble[self._fields[0]]
                y = point.fieldsDouble[self._fields[1]]
            elif self._type == 'int':
                x = float(point.fieldsInt[self._fields[0]])
                y = float(point.fieldsInt[self._fields[1]])
            self._x.append(x)
            self._y.append(y)
        except Exception as e:
            logger.error(e)


    def end_batch(self, end_req):

        # Compute Pearson r
        r, _ = pearsonr(self._x, self._y)
        if math.isnan(r):
            return

        response = udf_pb2.Response()
        response.point.name = end_req.name
        response.point.time = end_req.tmax
        response.point.group = end_req.group
        for k,v in end_req.tags.iteritems():
            response.point.dimensions.append(k)
            response.point.tags[k] = v
        response.point.fieldsDouble[self._as] = r

        self._agent.write_response(response)


class accepter(object):
    _count = 0
    def accept(self, conn, addr):
        self._count += 1
        a = Agent(conn, conn)
        h = PearsonRHandler(a)
        a.handler = h

        logger.info("Starting Agent for connection %d", self._count)
        a.start()
        a.wait()
        logger.info("Agent finished connection %d",self._count)

if __name__ == '__main__':
    path = "/tmp/pearsonr.sock"
    if len(sys.argv) == 2:
        path = sys.argv[1]
    server = Server(path, accepter())
    logger.info("Started server")
    server.serve()

Note that this code does not use an iterative implementation of the Pearson correlation coefficient, rather it buffers the data. If you need better performance I recommend looking into this as its not too difficult to implement an iterative single pass implementation.