Kapacitor udf - read error: proto: can't skip unknown wire type 7 for agent.Response

Hi all,

I have integrated an ARIMA forecast into kapacitor by udf. I am getting a read error after successfully sending number batches. Any help is appreciated.

Here is my configuration and code:

I have downloaded kapacitor agent library from github.

git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor

Set-up kapacitor config:

[udf.functions]
    [udf.functions.myarima]
       prog = "/usr/bin/python2"
       args = ["-u", "/tmp/kapacitor_udf/models/myarima.py"]
       timeout = "10s"
       [udf.functions.myarima.env]
           PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"

Tick Script:

dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)
data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

Python Script: ( myarima.py)

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import numpy
import pandas as pd
from scipy import stats
import math
import sys
from statsmodels.tsa.stattools import arma_order_select_ic
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima_model import ARIMA
from pandas.tseries.offsets import Second

import logging
logging.basicConfig(filename='/tmp/myArima.log',level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
FIELD_TYPES = ['int', 'double']

def is_stationary(ts):
    results = adfuller(ts, regression='ct')
    return results[0] < results[4]['5%']

class state(object):
    def __init__(self):
        self._dates   = []
        self._values  = []
        self._predict = 0
        self._period  = None    # period to be calculated by Arima
        self._order   = None    #order of Arima to be calculated by brute_force
        self._model   = None
    def update(self, date, value):
        self._dates.append(date)
        self._values.append(value)

    def drop(self):
        self._dates  = []
        self._values = []
        self._order  = None

    def get_series(self):
        return pd.Series(data=self._values, index=pd.to_datetime(self._dates))      #prepare date for Arima forecast

    def min(self):
        return min(self._values)

    def max(self):
        return max(self._values)

    def select_order_brute_force(self):
        def objfunc(order, endog, exog):
            from statsmodels.tsa.arima_model import ARIMA
            fit = ARIMA(endog, order, exog).fit(full_output=False)
            return fit.aic

        ts = self.get_series()
        bic = arma_order_select_ic(ts).bic_min_order
        grid = (slice(bic[0], bic[0] + 2, 1), slice(1, 2, 1), slice(bic[1], bic[1] + 2, 1))
        from scipy.optimize import brute
        return brute(objfunc, grid, args=(ts, None), finish=None)

    def select_order(self):
        ts = self.get_series()
        if is_stationary(ts):
            bic = arma_order_select_ic(ts).bic_min_order
            return bic[0], 0, bic[1]

        ts1diff = ts.diff(periods=1).dropna()
        if is_stationary(ts1diff):
            bic = arma_order_select_ic(ts1diff).bic_min_order
            return bic[0], 1, bic[1]

        ts2diff = ts.diff(periods=2).dropna()
        bic = arma_order_select_ic(ts2diff).bic_min_order

        return bic[0], 2, bic[1]

    def arima(self):
        logger.info("arima \n")
        ts = self.get_series()
#        logger.info("size of input: %s" %len(ts))
        self._period = ts.index[1] - ts.index[0]
        frequency = Second(self._period.total_seconds())
        self._order = self.select_order()
        logger.debug(self._order)
#        logger.debug(frequency)
        self._model = ARIMA(self.get_series(), order=self._order, freq=frequency).fit()
        start_date = ts.index[-1] + self._period
        end_date = start_date + (self._predict -1) * self._period
        forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())

        if self._order[1] > 0:
            shift = self.max() - self.min()
            forecast += shift
        logger.debug(forecast)
        return forecast

class ForecastHandler(Handler):
    def __init__(self, agent, state):
        self._agent = agent
        self._state = state
        self._field = None
        self._field_type = None
        self._predict = 0
        self._begin_response = None
        self._point = None

    def info(self):
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH

        response.info.options['predict'].valueTypes.append(udf_pb2.INT)
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)
        response.info.options['type'].valueTypes.append(udf_pb2.STRING)
        logger.info("info")
        return response

    def init(self, init_req):
        success = True
        msg = ''

        for opt in init_req.options:
            if opt.name == 'predict':
                self._predict = opt.values[0].intValue
                self._state._predict = self._predict
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            if opt.name == 'type':
                self._field_type = opt.values[0].stringValue
        if self._predict < 1:
            succ = False
            msg += ' must supply number of values to be predicted > 0'
        if self._field is None:
            success = False
            msg += ' must supply a field name'
        if self._field_type not in FIELD_TYPES:
            succ = False
            msg += ' field type must be one of {}'.format(FIELD_TYPES)

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]
        return response

    def snapshot(self):
        response = udf_pb2.Response()
        response.snapshot.snapshot = ''
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        return response

    def begin_batch(self, begin_req):
        self._state.drop()
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response

    def point(self, point):
        value = point.fieldsDouble[self._field]
        self._state.update(pd.to_datetime(point.time), value)
        self._point = point
        #logger.debug(str(point))

    def end_batch(self, end_req):
        forecast  = self._state.arima()
        logger.debug("size of forecast %s" %len(forecast))
        self._begin_response.begin.size = self._predict
        self._agent.write_response(self._begin_response)
        response = udf_pb2.Response()
        response.point.CopyFrom(self._point)

        for i in range(0, self._predict):
            response.point.time = forecast.index[i].value
            response.point.fieldsDouble[self._field] = forecast[i]
            self._agent.write_response(response)

        response.end.CopyFrom(end_req)
        self._agent.write_response(response)

if __name__ == '__main__':
    a = Agent()
    h = ForecastHandler(a,state())
    a.handler = h

    logger.info("Starting Agent")
    a.start()
    a.wait()
    logger.info("Agent finished")

When I start kapacitor, and show the task:

ID: arima_task
Error:
Template:
Type: batch
Status: enabled
Executing: true
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)

data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

DOT:
digraph arima_task {
graph [throughput="0.00 batches/s"];

query1 [avg_exec_time_ns="0s" batches_queried="8" errors="0" points_queried="11528" working_cardinality="0" ];
query1 -> myarima2 [processed="8"];

myarima2 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
myarima2 -> influxdb_out3 [processed="8"];

influxdb_out3 [avg_exec_time_ns="4.2µs" errors="0" points_written="32" working_cardinality="0" write_errors="0" ];

Then it throws this error on batch 9.

ID: arima_task
Error: myarima2: read error: proto: can't skip unknown wire type 7 for agent.Response
Template:
Type: batch
Status: enabled
Executing: false
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query('''
        SELECT max(usage_system) as usage_system
        FROM "telegraf"."autogen"."cpu"
        WHERE "cpu"='cpu-total'
    ''')
        .period(24h)
        .every(1m)
        .groupBy(1m)

data
    @myarima()
        .field('usage_system')
        .predict(4)
        .type('double')
    |influxDBOut()
        .create()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('arima')

DOT:
digraph arima_task {
query1 -> myarima2;
myarima2 -> influxdb_out3;
}

The log file shows that the data in 9th batch is not any different than the previous ones.

Thanks
Turker

1 Like

Hi Turker,

Were you able to solve this issue?

Hi Ashish,
No, I could not solve the issue and switched to facebook prophet with socket-udf. You can see here:

Ok, i see.
I would appreciate, if you can share any documentation and scripts i can test for windows?

Hi @ttopal
Are you using linux?
I have had huge problems using it on Windows, so i will be switching to Linux.
What version you are using for Kapacitor, Kapacitor-udf?
For python i am sure it is Python2.

Would appreciate if you can share kapacitor.conf or tell me if you used it as a socket based?

Thanks
Ashish

Hi Ashish,

Sorry for late reply. We have moved away from influxdb for various reasons. And I am not using the forum as much.

Yes, I got the prophet to work with linux - ubuntu 16.04. I dont think it would be much different with other distros.

As far as I remember the code and kapacitor config setup for prophet shared here: Prophet Forecasting UDF - Unable to write response to InfluxDB - #15 by dpayne works fine . But the important thing was to switch to kapacitor user in linux to run python script, which creates the socket file, then you start kapacitor and it can read the socket file.

I hope it helps, good luck.

Thank you @ttopal for your response!
I am now able to access prophet from tickscript :slight_smile:
Now i need to figure out creating models and testing and tuning them. Do write if you were able to reach that level.
I am sorry to hear that you have moved away from influxdb and will miss on the forum.
I like Influxdb and hope to get the work done with it.

Thanks and Take care!