Why do we have to create a "field" for udf?

kapacitor

#1

Greetings,

I have been trying to write a python udf example to make future predictions using batch input batch output.

I always have single series and dont need to define field but all the examples come with one. Hence I wrote this starting example with "field’ which only returns me :

invalid TICKscript: line 14 char 3: no method or property "field" on *pipeline.UDFNode

Below is my python and tick files. Any help would be appreciated.

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
from scipy import stats
import math
import sys

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()

class ForecastHandler(Handler):

    class state(object):
        def __init__(self):
            self._entries = []

        def reset(self):
            self._entries = []

        def update(self, value, point):
            self._entries.append((value, point))

        def arima(self):
            arima = numpy.array(self._entries)+1
            return arima


    #Create a handler class
    def __init__(self, agent):

        self._agent = agent
        self._field = None
        self._state = ForecastHandler.state()
        self._begin_response = None

    def info(self):

        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)

        logger.info("info")
        return response

    def init(self, init_req):

        success = True
        msg = ''
        size = 0
        for opt in init_req.options:
            if opt.name == 'field':
                self._field = opt.values[0].stringValue

        if self._field is None:
            success = False
            msg += ' must supply a field name'

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]
        return response

    def snapshot(self):
        response = udf_pb2.Response()
        response.snapshot.snapshot = ''
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def begin_batch(self, begin_req):

        print >> sys.stderr, "START!!!!"
        self._state.reset()
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response

    def point(self, point):
        print >> sys.stderr, self._field
        value = point.fieldsDouble[self._field]
        self._state.update(value, point)

    def end_batch(self, end_req):

        predict  = self._state.arima()
        # Send begin batch with count of outliers

        response = udf_pb2.Response()
        response.point.fieldsDouble["predict"] = predict
        self._agent.write_response(response)
        # Send an identical end batch back to Kapacitor
        response.end.CopyFrom(end_req)
        self._agent.write_response(response)

if __name__ == '__main__':
    a = Agent()
    h = ForecastHandler(a)
    a.handler = h

    logger.info("Starting Agent")
    a.start()
    a.wait()
    logger.info("Agent finished")

and

dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(5m)
        .every(10s)
        .groupBy(time(1m))
data
    @forecast()
        .field('usage_system')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('c')

and the kapacitor.conf:

[udf]
    [udf.functions.forecast]
       prog = "/usr/bin/python2"
       args = ["-u", "/tmp/kapacitor_udf/forecast.py"]
       timeout = "10s"
       [udf.functions.forecast.env]
           PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"

#2

Hello friend!
How is your configuration in the kapacitor.conf?


#3

Hello mate, I have updated the post. Thank you!


#4

This solved the field problem:

dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)
data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

myarima:

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import numpy
import pandas as pd
from scipy import stats
import math
import sys
from statsmodels.tsa.stattools import arma_order_select_ic
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima_model import ARIMA
from pandas.tseries.offsets import Second

import logging
logging.basicConfig(filename='/tmp/myArima.log',level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
FIELD_TYPES = ['int', 'double']

def is_stationary(ts):
    results = adfuller(ts, regression='ct')
    return results[0] < results[4]['5%']

class state(object):
    def __init__(self):
        self._dates   = []
        self._values  = []
        self._predict = 0
        self._period  = None    # period to be calculated by Arima
        self._order   = None    #order of Arima to be calculated by brute_force
        self._model   = None
    def update(self, date, value):
        self._dates.append(date)
        self._values.append(value)

    def drop(self):
        self._dates  = []
        self._values = []
        self._order  = None

    def get_series(self):
        return pd.Series(data=self._values, index=pd.to_datetime(self._dates))      #prepare date for Arima forecast

    def min(self):
        return min(self._values)

    def max(self):
        return max(self._values)

    def select_order_brute_force(self):
        def objfunc(order, endog, exog):
            from statsmodels.tsa.arima_model import ARIMA
            fit = ARIMA(endog, order, exog).fit(full_output=False)
            return fit.aic

        ts = self.get_series()
        bic = arma_order_select_ic(ts).bic_min_order
        grid = (slice(bic[0], bic[0] + 2, 1), slice(1, 2, 1), slice(bic[1], bic[1] + 2, 1))
        from scipy.optimize import brute
        return brute(objfunc, grid, args=(ts, None), finish=None)

    def select_order(self):
        ts = self.get_series()
        if is_stationary(ts):
            bic = arma_order_select_ic(ts).bic_min_order
            return bic[0], 0, bic[1]

        ts1diff = ts.diff(periods=1).dropna()
        if is_stationary(ts1diff):
            bic = arma_order_select_ic(ts1diff).bic_min_order
            return bic[0], 1, bic[1]

        ts2diff = ts.diff(periods=2).dropna()
        bic = arma_order_select_ic(ts2diff).bic_min_order

        return bic[0], 2, bic[1]

    def arima(self):
        logger.info("arima \n")
        ts = self.get_series()
#        logger.info("size of input: %s" %len(ts))
        self._period = ts.index[1] - ts.index[0]
        frequency = Second(self._period.total_seconds())
        self._order = self.select_order()
        logger.debug(self._order)
#        logger.debug(frequency)
        self._model = ARIMA(self.get_series(), order=self._order, freq=frequency).fit()
        start_date = ts.index[-1] + self._period
        end_date = start_date + (self._predict -1) * self._period
        forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())

        if self._order[1] > 0:
            shift = self.max() - self.min()
            forecast += shift
        logger.debug(forecast)
        return forecast

class ForecastHandler(Handler):
    def __init__(self, agent, state):
        self._agent = agent
        self._state = state
        self._field = None
        self._field_type = None
        self._predict = 0
        self._begin_response = None
        self._point = None

    def info(self):
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH

        response.info.options['predict'].valueTypes.append(udf_pb2.INT)
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)
        response.info.options['type'].valueTypes.append(udf_pb2.STRING)
        logger.info("info")
        return response

    def init(self, init_req):
        success = True
        msg = ''

        for opt in init_req.options:
            if opt.name == 'predict':
                self._predict = opt.values[0].intValue
                self._state._predict = self._predict
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            if opt.name == 'type':
                self._field_type = opt.values[0].stringValue
        if self._predict < 1:
            succ = False
            msg += ' must supply number of values to be predicted > 0'
        if self._field is None:
            success = False
            msg += ' must supply a field name'
        if self._field_type not in FIELD_TYPES:
            succ = False
            msg += ' field type must be one of {}'.format(FIELD_TYPES)

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]
        return response

    def snapshot(self):
        response = udf_pb2.Response()
        response.snapshot.snapshot = ''
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def begin_batch(self, begin_req):
        self._state.drop()
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response

    def point(self, point):
        value = point.fieldsDouble[self._field]
        self._state.update(pd.to_datetime(point.time), value)
        self._point = point
        #logger.debug(str(point))

    def end_batch(self, end_req):
        forecast  = self._state.arima()
        logger.debug("size of forecast %s" %len(forecast))
        self._begin_response.begin.size = self._predict
        self._agent.write_response(self._begin_response)
        response = udf_pb2.Response()
        response.point.CopyFrom(self._point)

        for i in range(0, self._predict):
            response.point.time = forecast.index[i].value
            response.point.fieldsDouble[self._field] = forecast[i]
            self._agent.write_response(response)

        response.end.CopyFrom(end_req)
        self._agent.write_response(response)

if __name__ == '__main__':
    a = Agent()
    h = ForecastHandler(a,state())
    a.handler = h

    logger.info("Starting Agent")
    a.start()
    a.wait()
    logger.info("Agent finished")