I was trying to replicate the above scenario for forecasting time series using Prophet model in Kapacitor. So i followed the steps as :
Objective: To fetch the data from Influxdb through udf function in kapacitor and apply time series forecasting over the same and write the response back to the Influxdb
OS: Ubuntu 18.04
Kapacitor version: Kapacitor OSS 1.5.0
Python version: 2.7.4
TICK Script:
dbrp “test”.“autogen”
var data = batch
|query(‘’’
SELECT mean(“usage_user”) as value
FROM “telegraf”.“autogen”.“cpu”
limit 400
‘’')
.groupBy(time(1h))
.offset(5m)
.align()
.period(30d)
.every(1m)
data
@prophet()
.periods(24)
.field(‘value’)
.freq(‘H’)
.changepoint_prior_scale(0.001)
.include_history(FALSE)
|influxDBOut()
.database(‘kapacitor’)
.measurement(‘fbprophet_hdap_api_ui’)
UDF:
import sys
import json
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import warnings
warnings.filterwarnings(“ignore”, message=“numpy.dtype size changed”)
warnings.filterwarnings(“ignore”, message=“numpy.ufunc size changed”)
warnings.filterwarnings(‘ignore’, message=“Conversion of the second argument”)
_ _
from fbprophet import Prophet
import pandas as pd
import numpy as np
_ _
import logging
logging.basicConfig(level=logging.DEBUG, format=‘%(asctime)s %(levelname)s:%(name)s: %(message)s’)
logger = logging.getLogger()
import pprint
_ _
class ProphetHandler(Handler):
class state(object):
_ def init(self):_
_ self.entries = []
_ def reset(self):_
_ self.entries = []
_ def update(self, value, ds):_
_ self.entries.append((value, ds))
_ def get_entries(self):_
_ return self.entries
def init(self, agent):
_ self.agent = agent
_ self.points = []
_ self.field = None
_ self.periods = 0
_ self.freq = None
_ self.changepoint_prior_scale = None
_ self.growth = None
_ self.cap = None
_ self.include_history = None
_ self.state = ProphetHandler.state()
_ self.begin_response = None
def info(self):
_ response = udf_pb2.Response()_
_ response.info.wants = udf_pb2.BATCH_
_ response.info.provides = udf_pb2.BATCH_
_ response.info.options[‘field’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘periods’].valueTypes.append(udf_pb2.INT)_
_ response.info.options[‘freq’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘changepoint_prior_scale’].valueTypes.append(udf_pb2.DOUBLE)_
_ response.info.options[‘growth’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘cap’].valueTypes.append(udf_pb2.DOUBLE)_
_ response.info.options[‘include_history’].valueTypes.append(udf_pb2.BOOL)_
_ return response_
def init(self, init_req):
_ success = True_
_ msg = ‘’_
_ for opt in init_req.options:_
_ if opt.name == ‘field’:_
_ self.field = opt.values[0].stringValue
_ elif opt.name == ‘periods’:_
_ self.periods = opt.values[0].intValue
_ elif opt.name == ‘cap’:_
_ self.cap = opt.values[0].doubleValue _
_ elif opt.name == ‘growth’:
_ self.growth = opt.values[0].stringValue
_ elif opt.name == ‘freq’: _
_ self.freq = opt.values[0].stringValue
_ elif opt.name == ‘changepoint_prior_scale’:_
_ self.changepoint_prior_scale = opt.values[0].doubleValue
_ elif opt.name == ‘include_history’:_
_ self.include_history = opt.values[0].boolValue
_ if self.field is None:
_ success = False_
_ msg += ’ must supply field’_
_ if self.periods <= 0:
_ success = False_
_ msg += ’ periods must be > to 0’_
_ response = udf_pb2.Response()_
_ response.init.success = success_
_ response.init.error = msg[1:]_
_ logger.info(‘init %s’, msg)_
_ return response_
def snapshot(self):
_ response = udf_pb2.Response()_
_ response.snapshot.snapshot = ‘’_
_ return response_
def restore(self, restore_req):
_ response = udf_pb2.Response()_
_ response.restore.success = False_
_ response.restore.error = ‘not implemented’_
_ return response_
def begin_batch(self, begin_req):
_ self.state.reset()
_ response = udf_pb2.Response()_
_ response.begin.CopyFrom(begin_req)_
_ self.begin_response = response
_ logger.info(‘begin batch’)_
def point(self, point):
_ value = point.fieldsDouble[self.field]
_ self.state.update(value, point.time)
def forecast(self):
_ entries = self.state.get_entries()
_ ds = _
_ y = _
_ for a, b in entries:_
_ ds.append(b)_
_ y.append(a)_
_ d = {‘y’: y, ‘ds’: ds}_
_ #d[‘y’] = np.log(d[‘y’])_
_ df = pd.DataFrame(d)_
_ if self.cap is not None:
_ df[‘cap’] = self.cap
_ m = None_
_ if self.changepoint_prior_scale is not None and self.growth is not None:
_ m = Prophet(changepoint_prior_scale=self.changepoint_prior_scale, growth=self.growth)
_ elif self.changepoint_prior_scale is not None:
_ m = Prophet(changepoint_prior_scale=self.changepoint_prior_scale)
_ elif self.growth is not None:
_ m = Prophet(growth=self.growth)
_ else:
_ m = Prophet()
_ logger.info(‘fit model’)_
_ m.fit(df)_
_ future = None_
_ if self._freq is not None and self.include_history is not None:
_ future = m.make_future_dataframe(periods=self._periods, include_history=self._include_history, freq=self.freq)
_ elif self.freq is not None:
_ future = m.make_future_dataframe(periods=self._periods, freq=self.freq)
_ elif self.include_history is not None:
_ future = m.make_future_dataframe(periods=self.periods, include_history=self.include_history)
_ else:
_ future = m.make_future_dataframe(periods=self.periods)
_ forecast = m.predict(future)_
_ logger.info(‘forecasted’)_
_ return forecast_
def end_batch(self, end_req):
_ forecast = self.forecast()_
_ self.begin_response.begin.size = len(forecast)
_ self._agent.write_response(self.begin_response)
_ response = udf_pb2.Response()_
_ for index, rows in forecast.iterrows():_
_ point = {‘yhat’: rows[‘yhat’], ‘yhat_lower’: rows[‘yhat_lower’], ‘yhat_upper’: rows[‘yhat_upper’]}_
_ response.point.time = int(rows[‘ds’].timestamp()) * 1000000000_
_ response.point.fieldsDouble[‘yhat’] = (rows[‘yhat’])_
_ response.point.fieldsDouble[‘yhat_upper’] = (rows[‘yhat_upper’])_
_ response.point.fieldsDouble[‘yhat_lower’] = (rows[‘yhat_lower’])_
_ response.point.CopyFrom(response.point)_
_ self.agent.write_response(response)
_ logger.info(response.point)_
_ logger.info(response.point)_
_ response.end.CopyFrom(end_req)_
_ self.agent.write_response(response)
_ logger.info(‘ending batch’)_
if name == ‘main’:
_ a = Agent()_
_ h = ProphetHandler(a)_
_ a.handler = h_
_ logger.info(“Starting Prophet Agent”)_
_ a.start()_
_ a.wait()_
_ logger.info(“Prophet Agent finished”)_
KAPACITOR.conf
[[influxdb]]
_ # Connect to an InfluxDB cluster_
_ # Kapacitor can subscribe, query and write to this cluster._
_ # Using InfluxDB is not required and can be disabled._
_ enabled = true_
_ default = true_
_ name = “test”_
_ urls = [“http://172.19.99.31:8086”]_
_ username = “”_
_ password = “”_
_ timeout = 0_
[udf]
[udf.functions]
_ [udf.functions.prophet]_
_ prog = “/usr/bin/python”_
_ args = [“-u”, “/etc/kapacitor/udf/prophet.py”]_
_ timeout = “60s”_
After the above configurations, have executed the below command to define the tick script:
$ kapacitor define cpu_alert -tick cpu_alert.tick
Note: cpu_alert.tick is the TICK Script defined above.
Error faced:
root@kapacitor:/etc/kapacitor# kapacitor define cpu_alert -tick cpu_alert.tick
invalid TICKscript: line 16 char 6: no method or property “prophet” on *pipeline.QueryNode
Can you please tell what i am doing wrong here?