Greetings,
I have been trying to write a python udf example to make future predictions using batch input batch output.
I always have single series and dont need to define field but all the examples come with one. Hence I wrote this starting example with "field’ which only returns me :
invalid TICKscript: line 14 char 3: no method or property "field" on *pipeline.UDFNode
Below is my python and tick files. Any help would be appreciated.
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
from scipy import stats
import math
import sys
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
class ForecastHandler(Handler):
class state(object):
def __init__(self):
self._entries = []
def reset(self):
self._entries = []
def update(self, value, point):
self._entries.append((value, point))
def arima(self):
arima = numpy.array(self._entries)+1
return arima
#Create a handler class
def __init__(self, agent):
self._agent = agent
self._field = None
self._state = ForecastHandler.state()
self._begin_response = None
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
logger.info("info")
return response
def init(self, init_req):
success = True
msg = ''
size = 0
for opt in init_req.options:
if opt.name == 'field':
self._field = opt.values[0].stringValue
if self._field is None:
success = False
msg += ' must supply a field name'
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]
return response
def snapshot(self):
response = udf_pb2.Response()
response.snapshot.snapshot = ''
return response
def restore(self, restore_req):
response = udf_pb2.Response()
response.restore.success = False
response.restore.error = 'not implemented'
return response
def begin_batch(self, begin_req):
print >> sys.stderr, "START!!!!"
self._state.reset()
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
def point(self, point):
print >> sys.stderr, self._field
value = point.fieldsDouble[self._field]
self._state.update(value, point)
def end_batch(self, end_req):
predict = self._state.arima()
# Send begin batch with count of outliers
response = udf_pb2.Response()
response.point.fieldsDouble["predict"] = predict
self._agent.write_response(response)
# Send an identical end batch back to Kapacitor
response.end.CopyFrom(end_req)
self._agent.write_response(response)
if __name__ == '__main__':
a = Agent()
h = ForecastHandler(a)
a.handler = h
logger.info("Starting Agent")
a.start()
a.wait()
logger.info("Agent finished")
and
dbrp "telegraf"."autogen"
var data = batch
|query('''
SELECT max(usage_system) as usage_system
FROM "telegraf"."autogen"."cpu"
WHERE "cpu"='cpu-total'
''')
.period(5m)
.every(10s)
.groupBy(time(1m))
data
@forecast()
.field('usage_system')
|influxDBOut()
.create()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('c')
and the kapacitor.conf:
[udf]
[udf.functions.forecast]
prog = "/usr/bin/python2"
args = ["-u", "/tmp/kapacitor_udf/forecast.py"]
timeout = "10s"
[udf.functions.forecast.env]
PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"