A sample udf throwing error while starting kapacitor daemon as error msg="encountered error" service=run err="open server: open service *udf.Service: failed to load process info for \"my_udf\": write error: write |1: broken pipe"

HI I was trying to experiment a UDF function which will take a point and return the same point back to TICK script using non socket based UDF.
Unfortunately as soon as I restart the kapacitor deamon, the log throws follwoing error with respect to sample udf function.

ts=2018-02-26T04:01:46.085+05:30 lvl=error msg=“encountered error” service=run err=“open server: open service *udf.Service: failed to load process info for
“my_udf”: write error: write |1: broken pipe”

I am not sure where I am going wrong any help would really encouraging. Tried my best to follow all the rules of UDF writting.

Here is the sample code of UDF. (udf_exa.py)

import sys
import json
from kapacitor.udf.agent import Agent, Handler
import kapacitor.udf.udf_pb2 as udf_pb2

import logging
logging.basicConfig(level=\
                    logging.DEBUG, format=\
                    '%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()

class my_udf(Handler):

    def __init__(self, agent):
        # print >> sys.stderr, "DAS: Inside lin_reg constructor"
        self._agent = agent
        self.path = "DAS PATH"

    def info(self):
        # print >> sys.stderr, "DAS: Inside lin_reg info"
        # print("DAS: Inside info")
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.STREAM
        response.info.provides = udf_pb2.STREAM
        response.info.options['path'].valueTypes.append(udf_pb2.STRING)
        # response.info.options['size'].valueTypes.append(udf_pb2.INT)
        # response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)

        return response

    def init(self, init_req):
        # print >> sys.stderr, "DAS: Inside lin_reg init"
        # print("DAS: Inside init")
        response = udf_pb2.Response()
        response.init.success = True

        return response

    def begin_batch(self, begin_req):
        raise Exception("Begin_batch not supported")
        #print >> sys.stderr, "DAS: Inside lin_reg begin_batch"
        # print("DAS:Called begin_batch of UDF\n")
        # print(begin_req)

    def point(self, passed_point):
        response = udf_pb2.Response()
        response.point.CopyFrom(passed_point)
        self._agent.write_response(response, True)
        # print("DAS:Called point method\n")
        # print(point)

    def end_batch(self, batch_meta):
        raise Exception("END_BATCH: not supported")
        # print >> sys.stderr, "DAS: Inside lin_reg end_batch"
        # print(batch_meta)

if __name__ == '__main_':
    # create an agent
    agent = Agent()
    myUDF = my_udf(agent)

    agent.handler = myUDF

    print >> sys.stderr, "DAS: STARTING MY UDF AGENT"
    agent.start()
    agent.wait()
    print >> sys.stderr, "DAS: FINISHED AGENT FOR MY UDF AGENT"

Below is snippet of kapacitor.conf which describes about the sample udf config.
[udf]
# Configuration for UDFs (User Defined Functions)
[udf.functions]
----------------------------------snip---------------
#[udf.functions.goavg]
# prog = “./avg_udf”
# args = []
# timeout = “10s”
[udf.functions.my_udf]
prog = “/usr/bin/python2"
args = [”-u", “/home/lalatendu/influx/kapa/udf/udf_exa.py”]
timeout = “10s”
[udf.functions.my_udf.env]
PYTHONPATH = “/home/lalatendu/influx/kapa/udf/kapacitor/udf/agent/py”
# Run python
---------------------------------snip-----------------------------

I am clueless here how to debug this issue. Need some help here.

I figured out what the problem was. Should have realised that it was crashing after approx 100s of enabling the task.
It seemed that I was sending an empty response in the snapshot which was crashing kapacitor after running the task for 100s. What a great way to waste a few hours, I think there should be a more detailed explanation about the expected return values for each of the handler methods in the UDF.

1 Like

so what is the practical solution?

i am experiencing the same issue,

this is my default snapshot func:

def snapshot(self):
		logger.info("************** Hello SNAPSHOT")
		response = udf_pb2.Response()
		response.snapshot.snapshot = ''
		logger.info("************** ByeBye SNAPSHOT")
		return response

really dont care about it, how can i fix this?

thanks.

I had similar problem while running moving average tutorial from Kapacitor. I was getting failed to load process info while restarting kapacitor.
For me, issue was with python2 environment.
I am using Anaconda distribution. So I created new environment for python 2.7.14 and use that bin/python2 for UDF (i.e. update conf file with new python2). I also installed protobuf.
It was a hunch, but it worked.