Kapacitor udf - read error: proto: can't skip unknown wire type 7 for agent.Response

Hi all,

I have integrated an ARIMA forecast into kapacitor by udf. I am getting a read error after successfully sending number batches. Any help is appreciated.

Here is my configuration and code:

I have downloaded kapacitor agent library from github.

git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor

Set-up kapacitor config:

[udf.functions]
    [udf.functions.myarima]
       prog = "/usr/bin/python2"
       args = ["-u", "/tmp/kapacitor_udf/models/myarima.py"]
       timeout = "10s"
       [udf.functions.myarima.env]
           PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"

Tick Script:

dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)
data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

Python Script: ( myarima.py)

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import numpy
import pandas as pd
from scipy import stats
import math
import sys
from statsmodels.tsa.stattools import arma_order_select_ic
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima_model import ARIMA
from pandas.tseries.offsets import Second

import logging
logging.basicConfig(filename='/tmp/myArima.log',level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
FIELD_TYPES = ['int', 'double']

def is_stationary(ts):
    results = adfuller(ts, regression='ct')
    return results[0] < results[4]['5%']

class state(object):
    def __init__(self):
        self._dates   = []
        self._values  = []
        self._predict = 0
        self._period  = None    # period to be calculated by Arima
        self._order   = None    #order of Arima to be calculated by brute_force
        self._model   = None
    def update(self, date, value):
        self._dates.append(date)
        self._values.append(value)

    def drop(self):
        self._dates  = []
        self._values = []
        self._order  = None

    def get_series(self):
        return pd.Series(data=self._values, index=pd.to_datetime(self._dates))      #prepare date for Arima forecast

    def min(self):
        return min(self._values)

    def max(self):
        return max(self._values)

    def select_order_brute_force(self):
        def objfunc(order, endog, exog):
            from statsmodels.tsa.arima_model import ARIMA
            fit = ARIMA(endog, order, exog).fit(full_output=False)
            return fit.aic

        ts = self.get_series()
        bic = arma_order_select_ic(ts).bic_min_order
        grid = (slice(bic[0], bic[0] + 2, 1), slice(1, 2, 1), slice(bic[1], bic[1] + 2, 1))
        from scipy.optimize import brute
        return brute(objfunc, grid, args=(ts, None), finish=None)

    def select_order(self):
        ts = self.get_series()
        if is_stationary(ts):
            bic = arma_order_select_ic(ts).bic_min_order
            return bic[0], 0, bic[1]

        ts1diff = ts.diff(periods=1).dropna()
        if is_stationary(ts1diff):
            bic = arma_order_select_ic(ts1diff).bic_min_order
            return bic[0], 1, bic[1]

        ts2diff = ts.diff(periods=2).dropna()
        bic = arma_order_select_ic(ts2diff).bic_min_order

        return bic[0], 2, bic[1]

    def arima(self):
        logger.info("arima \n")
        ts = self.get_series()
#        logger.info("size of input: %s" %len(ts))
        self._period = ts.index[1] - ts.index[0]
        frequency = Second(self._period.total_seconds())
        self._order = self.select_order()
        logger.debug(self._order)
#        logger.debug(frequency)
        self._model = ARIMA(self.get_series(), order=self._order, freq=frequency).fit()
        start_date = ts.index[-1] + self._period
        end_date = start_date + (self._predict -1) * self._period
        forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())

        if self._order[1] > 0:
            shift = self.max() - self.min()
            forecast += shift
        logger.debug(forecast)
        return forecast

class ForecastHandler(Handler):
    def __init__(self, agent, state):
        self._agent = agent
        self._state = state
        self._field = None
        self._field_type = None
        self._predict = 0
        self._begin_response = None
        self._point = None

    def info(self):
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH

        response.info.options['predict'].valueTypes.append(udf_pb2.INT)
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)
        response.info.options['type'].valueTypes.append(udf_pb2.STRING)
        logger.info("info")
        return response

    def init(self, init_req):
        success = True
        msg = ''

        for opt in init_req.options:
            if opt.name == 'predict':
                self._predict = opt.values[0].intValue
                self._state._predict = self._predict
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            if opt.name == 'type':
                self._field_type = opt.values[0].stringValue
        if self._predict < 1:
            succ = False
            msg += ' must supply number of values to be predicted > 0'
        if self._field is None:
            success = False
            msg += ' must supply a field name'
        if self._field_type not in FIELD_TYPES:
            succ = False
            msg += ' field type must be one of {}'.format(FIELD_TYPES)

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]
        return response

    def snapshot(self):
        response = udf_pb2.Response()
        response.snapshot.snapshot = ''
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def begin_batch(self, begin_req):
        self._state.drop()
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response

    def point(self, point):
        value = point.fieldsDouble[self._field]
        self._state.update(pd.to_datetime(point.time), value)
        self._point = point
        #logger.debug(str(point))

    def end_batch(self, end_req):
        forecast  = self._state.arima()
        logger.debug("size of forecast %s" %len(forecast))
        self._begin_response.begin.size = self._predict
        self._agent.write_response(self._begin_response)
        response = udf_pb2.Response()
        response.point.CopyFrom(self._point)

        for i in range(0, self._predict):
            response.point.time = forecast.index[i].value
            response.point.fieldsDouble[self._field] = forecast[i]
            self._agent.write_response(response)

        response.end.CopyFrom(end_req)
        self._agent.write_response(response)

if __name__ == '__main__':
    a = Agent()
    h = ForecastHandler(a,state())
    a.handler = h

    logger.info("Starting Agent")
    a.start()
    a.wait()
    logger.info("Agent finished")

When I start kapacitor, and show the task:

ID: arima_task
Error:
Template:
Type: batch
Status: enabled
Executing: true
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)

data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

DOT:
digraph arima_task {
graph [throughput="0.00 batches/s"];

query1 [avg_exec_time_ns="0s" batches_queried="8" errors="0" points_queried="11528" working_cardinality="0" ];
query1 -> myarima2 [processed="8"];

myarima2 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
myarima2 -> influxdb_out3 [processed="8"];

influxdb_out3 [avg_exec_time_ns="4.2µs" errors="0" points_written="32" working_cardinality="0" write_errors="0" ];

Then it throws this error on batch 9.

ID: arima_task
Error: myarima2: read error: proto: can't skip unknown wire type 7 for agent.Response
Template:
Type: batch
Status: enabled
Executing: false
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)

data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

DOT:
digraph arima_task {
query1 -> myarima2;
myarima2 -> influxdb_out3;
}

The log file shows that the data in 9th batch is not any different than the previous ones.

Thanks
Turker

1 Like