Prophet Forecasting UDF - Unable to write response to InfluxDB

I’m writing my first python batch UDF for kapacitor that uses the Facebook Prophet model to forecast future cpu usage on a group of servers. The goal is to create graphs with these metrics that show our prophet prediction vs current usage.

OS: CentOS Linux release 7.2.1511
Kapacitor Version: Kapacitor OSS 1.5.0 (git: HEAD 4f10efc41b4dcac070495cf95ba2c41cfcc2aa3a)
Python Version: Python 2.7.5, protobuf (3.6.1), kapacitor-udf (1.0.0)

TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query(''' 
           SELECT mean("usage_user") as value 
           FROM "telegraf"."autogen"."cpu" 
           WHERE "role"='hdap_api_ui' 
    ''')
        .groupBy(time(1h))
        .offset(5m)
        .align()
        .period(30d)
        .every(1m)

data
    @prophet()
        .periods(24)
        .field('value')
        .freq('H')
        .changepoint_prior_scale(0.001)
        .include_history(FALSE)
    |influxDBOut()
        .database('kapacitor')
        .measurement('fbprophet_hdap_api_ui')

UDF:

import sys
import json
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import warnings
warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")
warnings.filterwarnings('ignore', message="Conversion of the second argument")
 
from fbprophet import Prophet
import pandas as pd
import numpy as np
  
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()

import pprint
 
class ProphetHandler(Handler):

class state(object):
    def __init__(self):
        self._entries = []

    def reset(self):
        self._entries = []

    def update(self, value, ds):
        self._entries.append((value, ds))

    def get_entries(self):
        return self._entries

def __init__(self, agent):
    self._agent = agent
    self._points = []
    self._field = None
    self._periods = 0
    self._freq = None
    self._changepoint_prior_scale = None
    self._growth = None
    self._cap = None
    self._include_history = None
    self._state  = ProphetHandler.state()
    self._begin_response = None

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.BATCH
    response.info.provides = udf_pb2.BATCH
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    response.info.options['periods'].valueTypes.append(udf_pb2.INT)
    response.info.options['freq'].valueTypes.append(udf_pb2.STRING)
    response.info.options['changepoint_prior_scale'].valueTypes.append(udf_pb2.DOUBLE)
    response.info.options['growth'].valueTypes.append(udf_pb2.STRING)
    response.info.options['cap'].valueTypes.append(udf_pb2.DOUBLE)
    response.info.options['include_history'].valueTypes.append(udf_pb2.BOOL)
    return response

def init(self, init_req):
    success = True
    msg = ''

    for opt in init_req.options:
        if opt.name == 'field':
            self._field = opt.values[0].stringValue
        elif opt.name == 'periods':
            self._periods = opt.values[0].intValue
        elif opt.name == 'cap':
            self._cap = opt.values[0].doubleValue         
        elif opt.name == 'growth':
            self._growth = opt.values[0].stringValue
        elif opt.name == 'freq':  
            self._freq = opt.values[0].stringValue
        elif opt.name == 'changepoint_prior_scale':
            self._changepoint_prior_scale = opt.values[0].doubleValue
        elif opt.name == 'include_history':
            self._include_history = opt.values[0].boolValue

    if self._field is None:
        success = False
        msg += ' must supply field'
    if self._periods <= 0:
        success = False
        msg += ' periods must be > to 0'

    response = udf_pb2.Response()
    response.init.success = success
    response.init.error = msg[1:]
    logger.info('init %s', msg)
    return response

def snapshot(self):
    response = udf_pb2.Response()
    response.snapshot.snapshot = ''
    return response

def restore(self, restore_req):
    response = udf_pb2.Response()
    response.restore.success = False
    response.restore.error = 'not implemented'
    return response

def begin_batch(self, begin_req):
    self._state.reset()

    response = udf_pb2.Response()
    response.begin.CopyFrom(begin_req)
    self._begin_response = response
    logger.info('begin batch')

def point(self, point):
    value = point.fieldsDouble[self._field]
    self._state.update(value, point.time)

def forecast(self):
    entries = self._state.get_entries()

    ds = []
    y = []
    for a, b in entries:
        ds.append(b)
        y.append(a)

    d = {'y': y, 'ds': ds}
    #d['y'] = np.log(d['y'])
    df = pd.DataFrame(d)

    if self._cap is not None:
        df['cap'] = self._cap

    m = None
    if self._changepoint_prior_scale is not None and self._growth is not None:
        m = Prophet(changepoint_prior_scale=self._changepoint_prior_scale, growth=self._growth)
    elif self._changepoint_prior_scale is not None:
        m = Prophet(changepoint_prior_scale=self._changepoint_prior_scale)
    elif self._growth is not None:
        m = Prophet(growth=self._growth)
    else:
        m = Prophet()

    logger.info('fit model')
    m.fit(df)

    future = None
    if self._freq is not None and self._include_history is not None:
        future = m.make_future_dataframe(periods=self._periods, include_history=self._include_history, freq=self._freq)
    elif self._freq is not None:
        future = m.make_future_dataframe(periods=self._periods, freq=self._freq)
    elif self._include_history is not None:
        future = m.make_future_dataframe(periods=self._periods, include_history=self._include_history)
    else:
        future = m.make_future_dataframe(periods=self._periods)

    forecast = m.predict(future)
    logger.info('forecasted')

    return forecast

def end_batch(self, end_req):
    forecast = self.forecast()
    self._begin_response.begin.size = len(forecast)
    self._agent.write_response(self._begin_response)

    response = udf_pb2.Response()

    for index, rows in forecast.iterrows():
        point = {'yhat': rows['yhat'], 'yhat_lower': rows['yhat_lower'], 'yhat_upper': rows['yhat_upper']}
        response.point.time = int(rows['ds'].timestamp()) * 1000000000
        response.point.fieldsDouble['yhat'] = (rows['yhat'])
        response.point.fieldsDouble['yhat_upper'] = (rows['yhat_upper'])
        response.point.fieldsDouble['yhat_lower'] = (rows['yhat_lower'])
        response.point.CopyFrom(response.point)
        self._agent.write_response(response)
        logger.info(response.point)

    logger.info(response.point)
    response.end.CopyFrom(end_req)
    self._agent.write_response(response)
    logger.info('ending batch')


if __name__ == '__main__':
    a = Agent()
    h = ProphetHandler(a)
    a.handler = h
    logger.info("Starting Prophet Agent")
    a.start()
    a.wait()
    logger.info("Prophet Agent finished")

Show Task:

ID: prophet
Error: prophet2: read error: proto: can't skip unknown wire type 6 for agent.Response
Template: 
Type: batch
Status: enabled
Executing: false
Created: 22 Aug 18 16:57 UTC
Modified: 23 Aug 18 18:55 UTC
LastEnabled: 23 Aug 18 18:55 UTC
Databases Retention Policies: ["telegraf"."autogen"]
TICKscript:
dbrp "telegraf"."autogen"

var data = batch
    |query(''' 
           SELECT mean("usage_user") as value 
           FROM "telegraf"."autogen"."cpu" 
           WHERE "role"='hdap_api_ui' 
    ''')
        .groupBy(time(1h))
        .offset(5m)
        .align()
        .period(30d)
        .every(1m)

data
    @prophet()
        .periods(24)
        .field('value')
        .freq('H')
        .changepoint_prior_scale(0.001)
        .include_history(FALSE)
    |influxDBOut()
        .database('kapacitor')
        .measurement('fbprophet_hdap_api_ui')

DOT:
digraph prophet {
query1 -> prophet2;
prophet2 -> influxdb_out3;
}

The UDF “seems” to be running properly when I output the response to log.

ts=2018-08-23T18:56:00.000Z lvl=debug msg="starting next batch query" service=kapacitor task_master=main task=prophet node=query1 query="SELECT mean(usage_user) AS value FROM telegraf.autogen.cpu WHERE role = 'hdap_api_ui' AND time >= '2018-07-24T18:51:00Z' AND time < '2018-08-23T18:51:00Z' GROUP BY time(1h, 0s)"
ts=2018-08-23T18:56:00.274Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:begin batch"
ts=2018-08-23T18:56:00.287Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:fit model"
ts=2018-08-23T18:56:00.298Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:fbprophet.forecaster:Disabling yearly seasonality. Run prophet with yearly_seasonality=True to override this."
ts=2018-08-23T18:56:02.623Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=INFO:root:forecasted
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535050800000000000"
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat\""
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 3.23529776024"
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_lower\""
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 1.97385821819"
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.624Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_upper\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 4.48230925411"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535054400000000000"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 3.1808145572"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_lower\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 1.89447694143"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_upper\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 4.40134824909"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535058000000000000"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 2.94734546818"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_lower\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 1.64447013801"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_upper\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 4.19842758367"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535061600000000000"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 2.69944990441"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_lower\""
ts=2018-08-23T18:56:02.625Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 1.34293183869"
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_upper\""
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 3.95074219364"
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535065200000000000"
ts=2018-08-23T18:56:02.626Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"

More data here, but truncated to fit in message.

ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:time: 1535133600000000000"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat\""
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 2.74559253375"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_lower\""
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 1.52741850481"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="fieldsDouble {"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  key: \"yhat_upper\""
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="  value: 3.98503808403"
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=}
ts=2018-08-23T18:56:02.634Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text=
ts=2018-08-23T18:56:02.635Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:ending batch"
ts=2018-08-23T18:56:02.635Z lvl=info msg="UDF log" service=kapacitor task_master=main task=prophet node=prophet2 text="INFO:root:Prophet Agent finished"

But after the first run, no data is added to InfluxDB, and I receive the error “prophet2: read error: proto: can’t skip unknown wire type 6 for agent.Response”

ts=2018-08-23T18:56:37.548Z lvl=error msg="encountered error" service=kapacitor task_master=main task=prophet node=prophet2 err="keepalive timedout, last keepalive received was: 2018-08-23 18:55:37.547791363 +0000 UTC"
ts=2018-08-23T18:57:00.000Z lvl=debug msg="starting next batch query" service=kapacitor task_master=main task=prophet node=query1 query="SELECT mean(usage_user) AS value FROM telegraf.autogen.cpu WHERE role = 'hdap_api_ui' AND time >= '2018-07-24T18:52:00Z' AND time < '2018-08-23T18:52:00Z' GROUP BY time(1h, 0s)"
ts=2018-08-23T18:57:00.271Z lvl=debug msg="closing edge" service=kapacitor task_master=main task=prophet parent=prophet2 child=influxdb_out3 collected=0 emitted=0
ts=2018-08-23T18:57:00.271Z lvl=error msg="node failed" service=kapacitor task_master=main task=prophet node=prophet2 err="read error: proto: can't skip unknown wire type 6 for agent.Response"
ts=2018-08-23T18:57:00.271Z lvl=debug msg="task finished" service=task_store task=prophet
ts=2018-08-23T18:57:00.271Z lvl=debug msg="closing edge" service=kapacitor task_master=main task=prophet parent=batch child=batch0 collected=2 emitted=2
ts=2018-08-23T18:57:00.271Z lvl=debug msg="closing edge" service=kapacitor task_master=main task=prophet parent=query1 child=prophet2 collected=2 emitted=2
ts=2018-08-23T18:57:00.271Z lvl=error msg="failed to stop task with out error" service=kapacitor task_master=main task=prophet err="prophet2: read error: proto: can't skip unknown wire type 6 for agent.Response"
ts=2018-08-23T18:57:00.271Z lvl=error msg="task finished with error" service=task_store err="prophet2: read error: proto: can't skip unknown wire type 6 for agent.Response" task=prophet

Any help debugging this issue would be much appreciated.

kapacitor.conf

data_dir = "/var/lib/kapacitor"
hostname = "REMOVED"

[hipchat]
enabled = true
room = "Kapacitor Monitoring Dev"
state-changes-only = true
token = "REMOVED"
url = "https://REMOVED/v2/room"

[http]
log-enabled = false

[[influxdb]]
default = true
enabled = true
name = "localhost"

[influxdb.subscriptions]
_internal = ["monitor"]
app = ["autogen"]
telegraf = ["autogen"]

[logging]
file = "/var/log/kapacitor/kapacitor.log"
level = "DEBUG"

[opsgenie2]
api-key = "REMOVED"
enabled = true
recovery_action = "close"

[replay]
dir = "/var/lib/kapacitor/replay"

[reporting]
enabled = false

[storage]
boltdb = "/var/lib/kapacitor/kapacitor.db"

[udf.functions.percentileAbnormality]
args = [""]
prog = "/etc/kapacitor/udf/percentile_abnormality"
timeout = "10s"

[udf.functions.serverVersion]
args = [""]
prog = "/etc/kapacitor/udf/cluster_server_version"
timeout = "10s"

[udf.functions.prophet]
  prog = "/usr/bin/python"
  args = ["-u", "/etc/kapacitor/udf/prophet.py"]
  timeout = "60s"

Hi,

Very interested in this. I hope you don’t mind a couple of questions, I’m really just starting with Kapacitor and there doesn’t seem to be much resources around with this kind of implementations.

Do you mind sharing what you used as the basis for creating the UDF? And The integration with Prophet, would it work generically for any series or does it have specifics to your use case?

Hey nice to see that some people got interested into using prophet too.

For op, it’s seem you used a bit of my work earlier, but choose the child process approach instead of the udp socket server.

Is there any reason you used kapacitor-udf (1.0.0) ?.
Right now you can download the 1.5.1 here https://dl.influxdata.com/kapacitor/releases/python-kapacitor_udf-1.5.1.tar.gz .

On my end i have been able to use this version correctly kapacitor_prophet-udf/prophet.py at master · atroncy/kapacitor_prophet-udf · GitHub with kapacitor 1.4 if i remember correctly. For your info i just updated it earlier to fix some issue with the cap variable and added the floor variable too.

1 Like

Hi,

On my part i used this
kapacitor/udf/agent at master · influxdata/kapacitor · GitHub to understand how it work.
For your python env you need python 2.7, if i remember correctly i had a lot of issue with python 3… and gave up quickly.
Also it was kind of hard to find how to download the basic package, but basically every udf version are located here https://dl.influxdata.com/kapacitor/releases/python-kapacitor_udf-[VERSION].tar.gz
I’m not too sure about the code itself, since i’m not a python dev, but it worked.

For the integration with prophet it does not support every feature that prophet has. But overall it should work fine for any series…

1 Like

@atroncy many thanks for the feedback. Indeed I had to give up on Python3, but I was able to get it going with stock Python2 after all and with the agent code from the master repo. I ended up with the same problem that @dpayne was facing. I’m going to try your code.

And… it’s working :slight_smile: Time to play!

Thanks!

1 Like

@atroncy Many thanks for the Socket udf example which was the basis for this one. It works perfectly in my environment. I preferred an Agent based UDF that Kapacitor would manage, rather than an external process, and playing around with the code gave me some hands on experience with UDFs.

You’re right about the udf package download links, they’re hard to find. Version 1.0.0 was the only one I found through Google searches. After upgrading to kapacitor_udf-1.5.1, the issue still persists. So I’m leaning towards using your script for this proof-of-concept phase.

1 Like

@voiprodrigo I’m glad this post and the guidance from @atroncy helped get you over that last hump. Time to play with some metrics!

On another note, for anyone setting up a socket base UDF, don’t make the same mistake as me. I started the script as root, which caused a Kapacitor startup failures… Make sure you start the script with the same user as your Kapacitor process.

2 Likes

Hello

while trying to use the prophet UDF ( from atroncy) , ( in agent /server mode) , we get stuck as after lauching the py script, the kapacitor daemon hang adn does not finish ti start.

we have upgraded in the latest version of influxdb ( 1.6.2) , kapacitor ( 1.5.1) .

in parllel , if trying to use a server less mode ( STDIN/OUT) then we have an error about “fbprophet: read error: proto: can’t skip unknown wire type 6 for agent.Response”

so one way or the other we are stuck.
Any help would be greatly appreciated

A detail , we have absolutely no log entry whatsover, the latest one being :“INFO::root:Started server”.

The kapacitord and the python script are running as kapacitor UID :slight_smile:

Hence why the sevrer seems to run as root ? could it be a probleme ( as dpayne suggested it) ?
if so how to control the “user” and force the same as kapacitor ?

thanks in advance

Hi @pdl86,

I’m using a systemd unit file to start both, the Socket based python script and Kapacitor. This is where the user for these services are configured in my environment. If you would like to test this setup (I’m using CentOS7), here’s the files:

Kapacitor Server Unit File - /usr/lib/systemd/system/kapacitor.service

[Unit]
Description=Time series data processing engine.
Documentation=https://github.com/influxdb/kapacitor
After=network.target

[Service]
User=kapacitor
Group=kapacitor
LimitNOFILE=65536
EnvironmentFile=-/etc/default/kapacitor
ExecStart=/usr/bin/kapacitord -config /etc/kapacitor/kapacitor.conf $KAPACITOR_OPTS
KillMode=process
Restart=on-failure

[Install]
WantedBy=multi-user.target

Prophet UDF Unit File - /etc/systemd/system/prophet-udf.service

[Unit]
Description=Time series data processing engine.
After=network.target

[Service]
User=kapacitor
Group=kapacitor
LimitNOFILE=65536
ExecStart=/bin/python /etc/kapacitor/udf/prophet_socket.py
KillMode=process
Restart=on-failure

[Install]
WantedBy=multi-user.target

Reload daemon after creating files

systemctl daemon-reload

Start Both Services

systemctl start prophet-udf
systemctl start kapacitor

or

service prophet-udf start
service kapacitor start

In regards to the log file, I would’ve expected more output. Check your kapacitor.conf for the log level setting. If it’s on INFO, change it to DEBUG.

[logging]
file = "/var/log/kapacitor/kapacitor.log"
level = "DEBUG"

Hi @dpayne, @atroncy, @voiprodrigo

Thank you so much for this post and your input. I have been trying to get either Arima or Prophet work with kapacitor and having the same wire type errors.

Now I am trying your code and get this error:

ts=2018-09-03T08:50:28.234+10:00 lvl=error msg="encountered error" service=run err="open server: open service *udf.Service: failed to load process info for \"prophet\": dial unix /tmp/prophet.sock: connect: no such file or directory"

Do i have to create the socket file somehow before I run kapacitor service?

Thanks

I will answer my question in case someone will come up with the same problem.
We need to call python script with kapacitor user which creates socket file then starting kapacitor will see the socket file.

Hi @dpayne

thanks.

the prophet-udf service definition was missing, so we have created it.

But we have a doubt

what is this prophet_socket.py script you are refering to ?

if this is the “full” agent script ( aka : kapacitor_prophet-udf/prophet.py at master · atroncy/kapacitor_prophet-udf · GitHub) , then as it is also referenced in the kapacitor.conf , it is lauched two times with a conflict on the “socket” file ( already in use) .
We are certainly missing something , but what and where ?

thanks in advance
Philippe

Hello @dpayne

did you have a chance to look at our question
thanks in advance
Philippe

Hi I try to run SARIMAX as Kpacitor udf for forecasting.
after some batches_queried
query1 [avg_exec_time_ns=“162.960204ms” batches_queried=“20” errors=“0” points_queried=“201020” working_cardinality=“0” ];

query1 -> sarimax2 [processed=“20”];

And it write this :
ts=2018-09-07T14:31:10.679+02:00 lvl=error msg=“failed to snapshot task” service=kapacitor task_master=main task=sarimax_model1 err=“read error: proto: can’t skip unknown wire type 7 for agent.Response”

and no data point_writen :
influxdb_out3 [avg_exec_time_ns=“0s” errors=“0” points_written=“0” working_cardinality=“0” write_errors=“0” ]

Thanks for your help

GARBA Moussa

@pdl86 Sorry for the delay. Yes, you’ll need this socket based UDF script that was created by @atroncy (kapacitor_prophet-udf/prophet.py at master · atroncy/kapacitor_prophet-udf · GitHub). This UDF will create a socket file (/tmp/udf_prophet.sock) that kapacitor communicates with. It can be started using the systemd method I posted earlier or your own custom startup script. Two things to keep in mind:

  1. Before starting the UDF, remove any existing socket files (/tmp/udf_prophet.sock) that may be stale from previous runs.
  2. Start the UDF with the same user as your kapacitor process. If not, kapacitor with fail to connect to the socket file due to permission issues.

prophet_socket.py is the name of the UDF file in my environment. It’s exactly the same as atroncy’s example.