Hi all,
I have integrated an ARIMA forecast into kapacitor by udf. I am getting a read error after successfully sending number batches. Any help is appreciated.
Here is my configuration and code:
I have downloaded kapacitor agent library from github.
git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor
Set-up kapacitor config:
[udf.functions]
[udf.functions.myarima]
prog = "/usr/bin/python2"
args = ["-u", "/tmp/kapacitor_udf/models/myarima.py"]
timeout = "10s"
[udf.functions.myarima.env]
PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"
Tick Script:
dbrp "telegraf"."autogen"
var data = batch
|query('''
SELECT max(usage_system) as usage_system
FROM "telegraf"."autogen"."cpu"
WHERE "cpu"='cpu-total'
''')
.period(24h)
.every(1m)
.groupBy(1m)
data
@myarima()
.field('usage_system')
.predict(4)
.type('double')
|influxDBOut()
.create()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('arima')
Python Script: ( myarima.py)
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import numpy
import pandas as pd
from scipy import stats
import math
import sys
from statsmodels.tsa.stattools import arma_order_select_ic
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima_model import ARIMA
from pandas.tseries.offsets import Second
import logging
logging.basicConfig(filename='/tmp/myArima.log',level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
FIELD_TYPES = ['int', 'double']
def is_stationary(ts):
results = adfuller(ts, regression='ct')
return results[0] < results[4]['5%']
class state(object):
def __init__(self):
self._dates = []
self._values = []
self._predict = 0
self._period = None # period to be calculated by Arima
self._order = None #order of Arima to be calculated by brute_force
self._model = None
def update(self, date, value):
self._dates.append(date)
self._values.append(value)
def drop(self):
self._dates = []
self._values = []
self._order = None
def get_series(self):
return pd.Series(data=self._values, index=pd.to_datetime(self._dates)) #prepare date for Arima forecast
def min(self):
return min(self._values)
def max(self):
return max(self._values)
def select_order_brute_force(self):
def objfunc(order, endog, exog):
from statsmodels.tsa.arima_model import ARIMA
fit = ARIMA(endog, order, exog).fit(full_output=False)
return fit.aic
ts = self.get_series()
bic = arma_order_select_ic(ts).bic_min_order
grid = (slice(bic[0], bic[0] + 2, 1), slice(1, 2, 1), slice(bic[1], bic[1] + 2, 1))
from scipy.optimize import brute
return brute(objfunc, grid, args=(ts, None), finish=None)
def select_order(self):
ts = self.get_series()
if is_stationary(ts):
bic = arma_order_select_ic(ts).bic_min_order
return bic[0], 0, bic[1]
ts1diff = ts.diff(periods=1).dropna()
if is_stationary(ts1diff):
bic = arma_order_select_ic(ts1diff).bic_min_order
return bic[0], 1, bic[1]
ts2diff = ts.diff(periods=2).dropna()
bic = arma_order_select_ic(ts2diff).bic_min_order
return bic[0], 2, bic[1]
def arima(self):
logger.info("arima \n")
ts = self.get_series()
# logger.info("size of input: %s" %len(ts))
self._period = ts.index[1] - ts.index[0]
frequency = Second(self._period.total_seconds())
self._order = self.select_order()
logger.debug(self._order)
# logger.debug(frequency)
self._model = ARIMA(self.get_series(), order=self._order, freq=frequency).fit()
start_date = ts.index[-1] + self._period
end_date = start_date + (self._predict -1) * self._period
forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())
if self._order[1] > 0:
shift = self.max() - self.min()
forecast += shift
logger.debug(forecast)
return forecast
class ForecastHandler(Handler):
def __init__(self, agent, state):
self._agent = agent
self._state = state
self._field = None
self._field_type = None
self._predict = 0
self._begin_response = None
self._point = None
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.BATCH
response.info.options['predict'].valueTypes.append(udf_pb2.INT)
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
response.info.options['type'].valueTypes.append(udf_pb2.STRING)
logger.info("info")
return response
def init(self, init_req):
success = True
msg = ''
for opt in init_req.options:
if opt.name == 'predict':
self._predict = opt.values[0].intValue
self._state._predict = self._predict
if opt.name == 'field':
self._field = opt.values[0].stringValue
if opt.name == 'type':
self._field_type = opt.values[0].stringValue
if self._predict < 1:
succ = False
msg += ' must supply number of values to be predicted > 0'
if self._field is None:
success = False
msg += ' must supply a field name'
if self._field_type not in FIELD_TYPES:
succ = False
msg += ' field type must be one of {}'.format(FIELD_TYPES)
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]
return response
def snapshot(self):
response = udf_pb2.Response()
response.snapshot.snapshot = ''
return response
def restore(self, restore_req):
response = udf_pb2.Response()
response.restore.success = False
response.restore.error = 'not implemented'
return response
def begin_batch(self, begin_req):
self._state.drop()
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
def point(self, point):
value = point.fieldsDouble[self._field]
self._state.update(pd.to_datetime(point.time), value)
self._point = point
#logger.debug(str(point))
def end_batch(self, end_req):
forecast = self._state.arima()
logger.debug("size of forecast %s" %len(forecast))
self._begin_response.begin.size = self._predict
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
response.point.CopyFrom(self._point)
for i in range(0, self._predict):
response.point.time = forecast.index[i].value
response.point.fieldsDouble[self._field] = forecast[i]
self._agent.write_response(response)
response.end.CopyFrom(end_req)
self._agent.write_response(response)
if __name__ == '__main__':
a = Agent()
h = ForecastHandler(a,state())
a.handler = h
logger.info("Starting Agent")
a.start()
a.wait()
logger.info("Agent finished")
When I start kapacitor, and show the task:
ID: arima_task
Error:
Template:
Type: batch
Status: enabled
Executing: true
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"
var data = batch
|query('''
SELECT max(usage_system) as usage_system
FROM "telegraf"."autogen"."cpu"
WHERE "cpu"='cpu-total'
''')
.period(24h)
.every(1m)
.groupBy(1m)
data
@myarima()
.field('usage_system')
.predict(4)
.type('double')
|influxDBOut()
.create()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('arima')
DOT:
digraph arima_task {
graph [throughput="0.00 batches/s"];
query1 [avg_exec_time_ns="0s" batches_queried="8" errors="0" points_queried="11528" working_cardinality="0" ];
query1 -> myarima2 [processed="8"];
myarima2 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
myarima2 -> influxdb_out3 [processed="8"];
influxdb_out3 [avg_exec_time_ns="4.2µs" errors="0" points_written="32" working_cardinality="0" write_errors="0" ];
Then it throws this error on batch 9.
ID: arima_task
Error: myarima2: read error: proto: can't skip unknown wire type 7 for agent.Response
Template:
Type: batch
Status: enabled
Executing: false
Created: 24 Jul 18 03:33 UTC
Modified: 21 Aug 18 01:53 UTC
LastEnabled: 21 Aug 18 01:53 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"
var data = batch
|query('''
SELECT max(usage_system) as usage_system
FROM "telegraf"."autogen"."cpu"
WHERE "cpu"='cpu-total'
''')
.period(24h)
.every(1m)
.groupBy(1m)
data
@myarima()
.field('usage_system')
.predict(4)
.type('double')
|influxDBOut()
.create()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('arima')
DOT:
digraph arima_task {
query1 -> myarima2;
myarima2 -> influxdb_out3;
}
The log file shows that the data in 9th batch is not any different than the previous ones.
Thanks
Turker