Prophet Forecasting UDF - Unable to write response to InfluxDB

kapacitor

#23

Hi @dpayne

we have use “atroncy” prohet.py
we have remove each time the udf_prohet.sock file if existing
the user used is the same as kapacitor process
??

again what we see is that the prophet.py script is lauched two times , hence a conflict on the socket file : one at the "service " start ( ```
systemctl start prophet-udf) and then one at the kapacitor service start ( from the kapacitor.conf)

Again , I have the feeling that this is not normal

said in other words : if we use the systemcl service for lauching the prohet(_socket).py script, then could it be that we have to REMOVE from the kapacitor.conf file te reference to the prophet.py script ?
thanks in advance
Philippe


#24

Hi @dpayne
Thanks for help.

As you say for @pdl86, I remove any socket files in /tmp… but I have this error: error" service=task_store err=“myfbprophet2: read error: proto: can’t skip unknown wire type 6 for agent.Response” task=fbprophet-atroncy-temp

And when I show the log I have the forecast values :
ts=2018-09-11T13:51:04.576+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“5 2018-09-11 11:56:00 … 1211.216024”
ts=2018-09-11T13:51:04.576+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“6 2018-09-11 11:57:00 … 1211.238898”
ts=2018-09-11T13:51:04.576+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“7 2018-09-11 11:58:00 … 1211.261784”
ts=2018-09-11T13:51:04.576+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“8 2018-09-11 11:59:00 … 1211.284684”

ts=2018-09-11T13:51:04.600+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“INFO:root:{‘yhat’: 1212.2844465557, ‘yhat_lower’: 1202.0588667123775, ‘yhat_upper’: 1222.3605993409883}”
ts=2018-09-11T13:51:04.600+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=fbprophet-atroncy-temp node=myfbprophet2 text=“INFO:root:{‘yhat’: 1212.3065496127754, ‘yhat_lower’: 1202.6517603521183, ‘yhat_upper’: 1222.6249815382316}”

I dont’ konw what mean this error : ts=2018-09-11T13:52:00.039+02:00 lvl=error msg=“task finished with error” service=task_store err=“myfbprophet2: read error: proto: can’t skip unknown wire type 6 for agent.Response” task=fbprophet-atroncy-temp
And How I can solve this ?

Thanks.

G Moussa


#25

Hi @dpayne @nathaniel

I try to run kapacitor UDF as sarimax model but i get this error after one query and the mesasurment doesn’t display on my database but I have the forecast value when I look the log

ts=2018-10-10T10:45:06.276+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:45:06,276 INFO:root: Starting Agent”
ts=2018-10-10T10:45:10.054+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:10.02820637+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=cba87cd4-cc68-11e8-8049-000000000000 duration=26.672267ms
ts=2018-10-10T10:45:10.303+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:10.30317943+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=cbd271f9-cc68-11e8-804a-000000000000 duration=722.474µs
ts=2018-10-10T10:45:20.097+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:20.054581439+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=d1a2641d-cc68-11e8-804b-000000000000 duration=43.147544ms
ts=2018-10-10T10:45:20.303+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:20.303221763+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=d1c854a0-cc68-11e8-804c-000000000000 duration=731.569µs
ts=2018-10-10T10:45:30.079+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:30.032903754+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=d794f653-cc68-11e8-804d-000000000000 duration=46.723668ms
ts=2018-10-10T10:45:30.304+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:30.303489883+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=d7be4019-cc68-11e8-804e-000000000000 duration=754.5µs
ts=2018-10-10T10:45:39.550+02:00 lvl=debug msg=“linking subscription for cluster” service=influxdb cluster=localhost cluster=localhost
ts=2018-10-10T10:45:40.088+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:40.050050565+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=dd8d7520-cc68-11e8-804f-000000000000 duration=38.045208ms
ts=2018-10-10T10:45:40.304+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:40.303439195+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=ddb41f20-cc68-11e8-8050-000000000000 duration=785.681µs
ts=2018-10-10T10:45:50.067+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:50.026203898+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=e37fb29c-cc68-11e8-8051-000000000000 duration=40.865767ms
ts=2018-10-10T10:45:50.303+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:45:50.303220581+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=e3a9f794-cc68-11e8-8052-000000000000 duration=730.981µs
ts=2018-10-10T10:46:00.000+02:00 lvl=debug msg=“starting next batch query” service=kapacitor task_master=main task=sarimax_sinus node=query1 query=“SELECT Sinus FROM “test_arima-sinus”.autogen.value WHERE time >= ‘2018-10-09T12:46:00Z’ AND time < ‘2018-10-10T08:46:00Z’”
ts=2018-10-10T10:46:00.108+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:00,108 INFO:root: SARIMAX "
ts=2018-10-10T10:46:00.108+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=
ts=2018-10-10T10:46:00.110+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:00,110 INFO:root: size of input: 240”
ts=2018-10-10T10:46:00.111+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:46:00.055918394+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=e97a1c56-cc68-11e8-8053-000000000000 duration=55.84566ms
ts=2018-10-10T10:46:00.302+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-10-10T10:46:00.302508819+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=e99fbcc5-cc68-11e8-8054-000000000000 duration=448.928µs
ts=2018-10-10T10:46:01.345+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:01,345 DEBUG:root: ORDER (1, 1, 0)”
ts=2018-10-10T10:46:02.272+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:02,271 DEBUG:root: SEASONAL (1, 0, 0, 4)”
ts=2018-10-10T10:46:02.322+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:02,322 DEBUG:root: MODEL <statsmodels.tsa.statespace.sarimax.SARIMAXResultsWrapper object at 0x7fb679e4bad0>”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:02,333 DEBUG:root: 2018-10-10 08:50:00 2.863662”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 08:55:00 2.575143”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:00:00 2.326858”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:05:00 2.111898”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:10:00 1.913609”
ts=2018-10-10T10:46:02.342+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“Freq: 5T, dtype: float64”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:02,342 DEBUG:root: forecast 2018-10-10 08:50:00 2.863662”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 08:55:00 2.575143”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:00:00 2.326858”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:05:00 2.111898”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 09:10:00 1.913609”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“Freq: 5T, dtype: float64”
ts=2018-10-10T10:46:02.343+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=“2018-10-10 10:46:02,343 DEBUG:root: size of forecast 5”
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=” This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
ts=2018-10-10T10:46:02.425+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax_sinus node=sarimax2 text=" This problem is unconstrained."
I don’t know if someaone can help me
Thanks


#26

I was trying to replicate the above scenario for forecasting time series using Prophet model in Kapacitor. So i followed the steps as :

Objective: To fetch the data from Influxdb through udf function in kapacitor and apply time series forecasting over the same and write the response back to the Influxdb

OS: Ubuntu 18.04
Kapacitor version: Kapacitor OSS 1.5.0
Python version: 2.7.4

TICK Script:
dbrp “test”.“autogen”

var data = batch
|query(’’’
SELECT mean(“usage_user”) as value
FROM “telegraf”.“autogen”.“cpu”
limit 400
‘’’)
.groupBy(time(1h))
.offset(5m)
.align()
.period(30d)
.every(1m)

data
@prophet()
.periods(24)
.field(‘value’)
.freq(‘H’)
.changepoint_prior_scale(0.001)
.include_history(FALSE)
|influxDBOut()
.database(‘kapacitor’)
.measurement(‘fbprophet_hdap_api_ui’)

UDF:

import sys
import json
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import warnings
warnings.filterwarnings(“ignore”, message=“numpy.dtype size changed”)
warnings.filterwarnings(“ignore”, message=“numpy.ufunc size changed”)
warnings.filterwarnings(‘ignore’, message=“Conversion of the second argument”)
_ _
from fbprophet import Prophet
import pandas as pd
import numpy as np
_ _
import logging
logging.basicConfig(level=logging.DEBUG, format=’%(asctime)s %(levelname)s:%(name)s: %(message)s’)
logger = logging.getLogger()

import pprint
_ _
class ProphetHandler(Handler):

class state(object):
_ def init(self):_
_ self.entries = []

_ def reset(self):_
_ self.entries = []

_ def update(self, value, ds):_
_ self.entries.append((value, ds))

_ def get_entries(self):_
_ return self.entries

def init(self, agent):
_ self.agent = agent
_ self.points = []
_ self.field = None
_ self.periods = 0
_ self.freq = None
_ self.changepoint_prior_scale = None
_ self.growth = None
_ self.cap = None
_ self.include_history = None
_ self.state = ProphetHandler.state()
_ self.begin_response = None

def info(self):
_ response = udf_pb2.Response()_
_ response.info.wants = udf_pb2.BATCH_
_ response.info.provides = udf_pb2.BATCH_
_ response.info.options[‘field’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘periods’].valueTypes.append(udf_pb2.INT)_
_ response.info.options[‘freq’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘changepoint_prior_scale’].valueTypes.append(udf_pb2.DOUBLE)_
_ response.info.options[‘growth’].valueTypes.append(udf_pb2.STRING)_
_ response.info.options[‘cap’].valueTypes.append(udf_pb2.DOUBLE)_
_ response.info.options[‘include_history’].valueTypes.append(udf_pb2.BOOL)_
_ return response_

def init(self, init_req):
_ success = True_
_ msg = ‘’_

_ for opt in init_req.options:_
_ if opt.name == ‘field’:_
_ self.field = opt.values[0].stringValue
_ elif opt.name == ‘periods’:_
_ self.periods = opt.values[0].intValue
_ elif opt.name == ‘cap’:_
_ self.cap = opt.values[0].doubleValue _
_ elif opt.name == ‘growth’:

_ self.growth = opt.values[0].stringValue
_ elif opt.name == ‘freq’: _
_ self.freq = opt.values[0].stringValue
_ elif opt.name == ‘changepoint_prior_scale’:_
_ self.changepoint_prior_scale = opt.values[0].doubleValue
_ elif opt.name == ‘include_history’:_
_ self.include_history = opt.values[0].boolValue

_ if self.field is None:
_ success = False_
_ msg += ’ must supply field’_
_ if self.periods <= 0:
_ success = False_
_ msg += ’ periods must be > to 0’_

_ response = udf_pb2.Response()_
_ response.init.success = success_
_ response.init.error = msg[1:]_
_ logger.info(‘init %s’, msg)_
_ return response_

def snapshot(self):
_ response = udf_pb2.Response()_
_ response.snapshot.snapshot = ‘’_
_ return response_

def restore(self, restore_req):
_ response = udf_pb2.Response()_
_ response.restore.success = False_
_ response.restore.error = ‘not implemented’_
_ return response_

def begin_batch(self, begin_req):
_ self.state.reset()

_ response = udf_pb2.Response()_
_ response.begin.CopyFrom(begin_req)_
_ self.begin_response = response
_ logger.info(‘begin batch’)_

def point(self, point):
_ value = point.fieldsDouble[self.field]
_ self.state.update(value, point.time)

def forecast(self):
_ entries = self.state.get_entries()

_ ds = []_
_ y = []_
_ for a, b in entries:_
_ ds.append(b)_
_ y.append(a)_

_ d = {‘y’: y, ‘ds’: ds}_
_ #d[‘y’] = np.log(d[‘y’])_
_ df = pd.DataFrame(d)_

_ if self.cap is not None:
_ df[‘cap’] = self.cap

_ m = None_
_ if self.changepoint_prior_scale is not None and self.growth is not None:
_ m = Prophet(changepoint_prior_scale=self.changepoint_prior_scale, growth=self.growth)
_ elif self.changepoint_prior_scale is not None:
_ m = Prophet(changepoint_prior_scale=self.changepoint_prior_scale)
_ elif self.growth is not None:
_ m = Prophet(growth=self.growth)
_ else:

_ m = Prophet()

_ logger.info(‘fit model’)_
_ m.fit(df)_

_ future = None_
_ if self._freq is not None and self.include_history is not None:
_ future = m.make_future_dataframe(periods=self._periods, include_history=self._include_history, freq=self.freq)
_ elif self.freq is not None:
_ future = m.make_future_dataframe(periods=self._periods, freq=self.freq)
_ elif self.include_history is not None:
_ future = m.make_future_dataframe(periods=self.periods, include_history=self.include_history)
_ else:

_ future = m.make_future_dataframe(periods=self.periods)

_ forecast = m.predict(future)_
_ logger.info(‘forecasted’)_

_ return forecast_

def end_batch(self, end_req):
_ forecast = self.forecast()_
_ self.begin_response.begin.size = len(forecast)
_ self._agent.write_response(self.begin_response)

_ response = udf_pb2.Response()_

_ for index, rows in forecast.iterrows():_
_ point = {‘yhat’: rows[‘yhat’], ‘yhat_lower’: rows[‘yhat_lower’], ‘yhat_upper’: rows[‘yhat_upper’]}_
_ response.point.time = int(rows[‘ds’].timestamp()) * 1000000000_
_ response.point.fieldsDouble[‘yhat’] = (rows[‘yhat’])_
_ response.point.fieldsDouble[‘yhat_upper’] = (rows[‘yhat_upper’])_
_ response.point.fieldsDouble[‘yhat_lower’] = (rows[‘yhat_lower’])_
_ response.point.CopyFrom(response.point)_
_ self.agent.write_response(response)
_ logger.info(response.point)_

_ logger.info(response.point)_
_ response.end.CopyFrom(end_req)_
_ self.agent.write_response(response)
_ logger.info(‘ending batch’)_

if name == ‘main’:
_ a = Agent()_
_ h = ProphetHandler(a)_
_ a.handler = h_
_ logger.info(“Starting Prophet Agent”)_
_ a.start()_
_ a.wait()_
_ logger.info(“Prophet Agent finished”)_

KAPACITOR.conf

[[influxdb]]
_ # Connect to an InfluxDB cluster_
_ # Kapacitor can subscribe, query and write to this cluster._
_ # Using InfluxDB is not required and can be disabled._
_ enabled = true_
_ default = true_
_ name = “test”_
_ urls = [“http://172.19.99.31:8086”]_
_ username = “”_
_ password = “”_
_ timeout = 0_

[udf]
[udf.functions]
_ [udf.functions.prophet]_
_ prog = “/usr/bin/python”_
_ args = ["-u", “/etc/kapacitor/udf/prophet.py”]_
_ timeout = “60s”_

After the above configurations, have executed the below command to define the tick script:
$ kapacitor define cpu_alert -tick cpu_alert.tick

Note: cpu_alert.tick is the TICK Script defined above.

Error faced:

root@kapacitor:/etc/kapacitor# kapacitor define cpu_alert -tick cpu_alert.tick
invalid TICKscript: line 16 char 6: no method or property “prophet” on *pipeline.QueryNode

Can you please tell what i am doing wrong here?


#27

Hi, the udf script might be failing upon kapacitor startup. Restart and check the logs, there should be a few lines like this to indicate a successful startup:

ts=2018-11-28T17:02:14.503Z lvl=debug msg=“opening service” source=srv service=*udf.Service
ts=2018-11-28T17:02:15.174Z lvl=info msg=“UDF log” service=udf text=“INFO:root:Starting Prophet Agent”
ts=2018-11-28T17:02:15.174Z lvl=info msg=“UDF log” service=udf text=“INFO:root:Prophet Agent finished”
ts=2018-11-28T17:02:15.230Z lvl=debug msg=“loaded UDF info” service=udf udf=prophet
ts=2018-11-28T17:02:15.230Z lvl=debug msg=“opened service” source=srv service=*udf.Service

If not, try running the script manually "python -u " and fix any errors. If the script was copied/pasted verbatim from this thread, you might have to fix indentations that were stripped in previous responses.

[~]$ python -u /etc/kapacitor/udf/prophet.py
INFO:root:Started server

Also, just want to point out that I wasn’t able to get the agent udf working from my original example. I had to replace it with this socket udf.


#28

Thanks for the prompt response.
After restarting kapacitor, below error logs are captured in /var/log/kapacitor/kapacitor.log file:

ts=2018-11-29T15:18:23.923+05:30 lvl=info msg=“kapacitor starting” service=run version=1.5.1 branch=HEAD commit=89828ffff6cf5cd4cb2b34bf883e134395f734de
ts=2018-11-29T15:18:23.923+05:30 lvl=info msg=“go version” service=run version=go1.10.2
ts=2018-11-29T15:18:23.923+05:30 lvl=info msg=“listing Kapacitor hostname” source=srv hostname=172.19.99.38
ts=2018-11-29T15:18:23.924+05:30 lvl=info msg=“listing ClusterID and ServerID” source=srv cluster_id=ebd783bc-57bc-4a37-93f7-f00ade773b4f server_id=578f14f1-835d-46d4-a0cb-c88cb10347af
ts=2018-11-29T15:18:23.924+05:30 lvl=info msg=“opened task master” service=kapacitor task_master=main
ts=2018-11-29T15:18:23.972+05:30 lvl=info msg=“UDF log” service=udf text=“Traceback (most recent call last):”
ts=2018-11-29T15:18:23.972+05:30 lvl=info msg=“UDF log” service=udf text=" File “/etc/kapacitor/udf/prophet.py”, line 3, in "
ts=2018-11-29T15:18:23.972+05:30 lvl=info msg=“UDF log” service=udf text=" from kapacitor.udf.agent import Agent, Handler, Server"
ts=2018-11-29T15:18:23.973+05:30 lvl=info msg=“UDF log” service=udf text=" File “/etc/kapacitor/udf/agent/py/kapacitor/udf/agent.py”, line 7, in "
ts=2018-11-29T15:18:23.973+05:30 lvl=info msg=“UDF log” service=udf text=" import udf_pb2"
ts=2018-11-29T15:18:23.973+05:30 lvl=info msg=“UDF log” service=udf text=" File “/etc/kapacitor/udf/agent/py/kapacitor/udf/udf_pb2.py”, line 6, in "
ts=2018-11-29T15:18:23.973+05:30 lvl=info msg=“UDF log” service=udf text=" from google.protobuf.internal import enum_type_wrapper"
ts=2018-11-29T15:18:23.974+05:30 lvl=info msg=“UDF log” service=udf text=“ImportError: No module named google.protobuf.internal”
ts=2018-11-29T15:18:53.928+05:30 lvl=info msg=“closed HTTP service” service=http
ts=2018-11-29T15:18:53.928+05:30 lvl=info msg=“closed HTTP service” service=http
ts=2018-11-29T15:18:53.928+05:30 lvl=info msg=“closed task master” service=kapacitor task_master=main
ts=2018-11-29T15:18:53.928+05:30 lvl=error msg=“encountered error” service=run err=“open server: open service *udf.Service: failed to load process info for “prophet”: write error: write |1: broken pipe”

As per your suggestion, have executed the python script prophet manually, but it is not able to import Agent file:

root@kapacitor:/etc/kapacitor/udf# python -u prophet.py
Traceback (most recent call last):
File “prophet.py”, line 3, in
from kapacitor.udf.agent import Agent, Handler, Server
ImportError: No module named kapacitor.udf.agent

What should be done to get this working?


#29

You’re missing the kapacitor udf agent module. It can be installed with pip:

pip install -Iv https://dl.influxdata.com/kapacitor/releases/python-kapacitor_udf-1.5.1.tar.gz