HI I was trying to experiment a UDF function which will take a point and return the same point back to TICK script using non socket based UDF.
Unfortunately as soon as I restart the kapacitor deamon, the log throws follwoing error with respect to sample udf function.
ts=2018-02-26T04:01:46.085+05:30 lvl=error msg=“encountered error” service=run err=“open server: open service *udf.Service: failed to load process info for
“my_udf”: write error: write |1: broken pipe”
I am not sure where I am going wrong any help would really encouraging. Tried my best to follow all the rules of UDF writting.
Here is the sample code of UDF. (udf_exa.py)
import sys
import json
from kapacitor.udf.agent import Agent, Handler
import kapacitor.udf.udf_pb2 as udf_pb2
import logging
logging.basicConfig(level=\
logging.DEBUG, format=\
'%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
class my_udf(Handler):
def __init__(self, agent):
# print >> sys.stderr, "DAS: Inside lin_reg constructor"
self._agent = agent
self.path = "DAS PATH"
def info(self):
# print >> sys.stderr, "DAS: Inside lin_reg info"
# print("DAS: Inside info")
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.STREAM
response.info.options['path'].valueTypes.append(udf_pb2.STRING)
# response.info.options['size'].valueTypes.append(udf_pb2.INT)
# response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)
return response
def init(self, init_req):
# print >> sys.stderr, "DAS: Inside lin_reg init"
# print("DAS: Inside init")
response = udf_pb2.Response()
response.init.success = True
return response
def begin_batch(self, begin_req):
raise Exception("Begin_batch not supported")
#print >> sys.stderr, "DAS: Inside lin_reg begin_batch"
# print("DAS:Called begin_batch of UDF\n")
# print(begin_req)
def point(self, passed_point):
response = udf_pb2.Response()
response.point.CopyFrom(passed_point)
self._agent.write_response(response, True)
# print("DAS:Called point method\n")
# print(point)
def end_batch(self, batch_meta):
raise Exception("END_BATCH: not supported")
# print >> sys.stderr, "DAS: Inside lin_reg end_batch"
# print(batch_meta)
if __name__ == '__main_':
# create an agent
agent = Agent()
myUDF = my_udf(agent)
agent.handler = myUDF
print >> sys.stderr, "DAS: STARTING MY UDF AGENT"
agent.start()
agent.wait()
print >> sys.stderr, "DAS: FINISHED AGENT FOR MY UDF AGENT"
Below is snippet of kapacitor.conf which describes about the sample udf config.
[udf]
# Configuration for UDFs (User Defined Functions)
[udf.functions]
----------------------------------snip---------------
#[udf.functions.goavg]
# prog = “./avg_udf”
# args = []
# timeout = “10s”
[udf.functions.my_udf]
prog = “/usr/bin/python2"
args = [”-u", “/home/lalatendu/influx/kapa/udf/udf_exa.py”]
timeout = “10s”
[udf.functions.my_udf.env]
PYTHONPATH = “/home/lalatendu/influx/kapa/udf/kapacitor/udf/agent/py”
# Run python
---------------------------------snip-----------------------------
I am clueless here how to debug this issue. Need some help here.