# Calculating the Pearson Correlation Coefficient in Kapacitor

We would like to calculate the pearson correlation coefficient in a TICK-script in Kapacitor, so we can generate alerts when a correlation exceeds certain bounds.

One of our use cases is the combination of a valve position, and a water flow measurement. When the valve is opened, flow increases. When it closes, flow decreases. There is quite some correlation here. We know that when the correlation breaks down, something is wrong in our system. Either the setpoint for the valve position is not correctly coming through to the valve, or the valve is mechanically broken. Either way, when the correlation is gone, we want to be alerted.

Currently I don’t see any way to do this, without creating an UDF. I also see @nathaniel is using the `@pearsonr ` UDF in this MR.

Is there some working code for this UDF available somewhere? Or maybe some example code to get us going?

1 Like

@jaapz I have copied the python UDF code I have been using below. Its a quick implementation but has worked well for me so far.

We have plans to support this directly within the query language but there are some technical challenges we need to overcome with performing the join of the two fields. Until then the below python UDF code should work well.

The basic usage of the UDF is:

``````
// Input windowed data with two fields.
@pearsonr()
// Specify the two fields to use
.fields('position', 'flow')
// Specify the name of the resulting correlation field.
.as('r')
// Output is a single point per window with the correlation field.
``````
``````#!/usr/bin/python2

import sys
import json
from kapacitor.udf.agent import Agent, Handler, Server
from kapacitor.udf import udf_pb2
import signal
import math

from scipy.stats import pearsonr

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()

# PearsonR computes the Pearson correlation coefficient for a batch of points
class PearsonRHandler(Handler):
def __init__(self, agent):
self._agent = agent
self._as = 'r'
self._type = 'float'
self._fields = None
self._x = None
self._y = None

def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.STREAM
response.info.options['fields'].valueTypes.append(udf_pb2.STRING)
response.info.options['fields'].valueTypes.append(udf_pb2.STRING)
response.info.options['as'].valueTypes.append(udf_pb2.STRING)
response.info.options['type'].valueTypes.append(udf_pb2.STRING)
return response

def init(self, init_req):
success = True
msg = ''
for opt in init_req.options:
if opt.name == 'fields':
self._fields = [opt.values.stringValue, opt.values.stringValue]
if opt.name == 'as':
self._as = opt.values.stringValue
if opt.name == 'type':
self._type = opt.values.stringValue

if self._fields is None:
success = False
msg += ' must supply fields'

if self._type not in ['float', 'int']:
success = False
msg += ' type must be one of "float" or "int"'

response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]

return response

def snapshot(self):
response = udf_pb2.Response()
response.snapshot.snapshot = ''
return response

def restore(self, restore_req):
response = udf_pb2.Response()
response.restore.success = False
response.restore.error = 'not implemented'
return response

def begin_batch(self, begin_req):
self._x = []
self._y = []

def point(self, point):
try:
x = 0
y = 0
if self._type == 'float':
x = point.fieldsDouble[self._fields]
y = point.fieldsDouble[self._fields]
elif self._type == 'int':
x = float(point.fieldsInt[self._fields])
y = float(point.fieldsInt[self._fields])
self._x.append(x)
self._y.append(y)
except Exception as e:
logger.error(e)

def end_batch(self, end_req):

# Compute Pearson r
r, _ = pearsonr(self._x, self._y)
if math.isnan(r):
return

response = udf_pb2.Response()
response.point.name = end_req.name
response.point.time = end_req.tmax
response.point.group = end_req.group
for k,v in end_req.tags.iteritems():
response.point.dimensions.append(k)
response.point.tags[k] = v
response.point.fieldsDouble[self._as] = r

self._agent.write_response(response)

class accepter(object):
_count = 0
self._count += 1
a = Agent(conn, conn)
h = PearsonRHandler(a)
a.handler = h

logger.info("Starting Agent for connection %d", self._count)
a.start()
a.wait()
logger.info("Agent finished connection %d",self._count)

if __name__ == '__main__':
path = "/tmp/pearsonr.sock"
if len(sys.argv) == 2:
path = sys.argv
server = Server(path, accepter())
logger.info("Started server")
server.serve()
``````

Note that this code does not use an iterative implementation of the Pearson correlation coefficient, rather it buffers the data. If you need better performance I recommend looking into this as its not too difficult to implement an iterative single pass implementation.