Python UDF callback's argument description

This is regarding using UDF function via kapacitor.
Is there any pointer or documentation which explains nature of argument passed to different callback defined by python UDF agent. Because these callbacks are called by kapacitor module, it is little unclear to know the behavior of each argument passed to that callback like point(), end_batch(), init() etc…

For example:

  1. what are members of the point structure and which member are filled by whom ?
  2. In the similar line what is batch_meta struct signifies with description about its members?
  3. How response should be formed in different callback and Is there any callback is it mandatory to send a response back?

Any code level pointer will also be useful. I am experimenting python based UDF agent.

Here is a sample code I was trying. Intention is to know even before using the udf function in TICK script how kapacitor merge them to its environment, by calling info() & init(). Hence I haven’t mentioned anything in end_batch(), begin_batch() and point() method

udf_exa.py – The udf defining file.
import sys
import json
from kapacitor.udf.agent import Agent, Handler
import kapacitor.udf.udf_pb2 as udf_pb2

import logging
logging.basicConfig(level=\
logging.DEBUG, format=\
’%(asctime)s %(levelname)s:%(name)s: %(message)s’)
logger = logging.getLogger()

class lin_reg(Handler):

def __init__(self, agent):                                                     
    print >> sys.stderr, "DAS: Inside lin_reg constructor"                     
    self._agent = agent                                                        
    self._field = ''                                                           
    self._history = None                                                       
    self._batch = None                                                         
    self._alpha = 0.0                                                          
                                                                               
def info(self):                                                                
    print >> sys.stderr, "DAS: Inside lin_reg info"                            
    # print("DAS: Inside info")                                                
    response = udf_pb2.Response()                                              
    response.info.wants = udf_pb2.STREAM                                       
    response.info.provides = udf_pb2.STREAM                                    
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)           
    response.info.options['size'].valueTypes.append(udf_pb2.INT)               
    response.info.options['as'].valueTypes.append(udf_pb2.DOUBLE)              
                                                                               
    return response                                                            
                                                                               
def init(self, init_req):                                                      
    print >> sys.stderr, "DAS: Inside lin_reg init"                            
    # print("DAS: Inside init")                                                
    msg = ''                                                                   
    size = 0                                                                   
                                                                               
    for opt in init_req.options:                                               
        if opt.name == 'field':                                                
            self._field = opt.values[0].stringValue                            
        elif opt.name == 'size':                                               
            size = opt.values[0].intValue                                      
        elif opt.name == 'alpha':                                              
            self._alpha = opt.values[0].doubleValue                            
                                                                               
    if size <= 1:                                                              
        success = False                                                        
        msg += ' must supply window size > 1'                                  
    if self._field == '':                                                      
        success = False                                                        
        msg += ' must supply a field name'                                     
    if self._alpha == 0:                                                       
        success = False                                                        
        msg += ' must supply an alpha value'                                   
                                                                               
    response = udf_pb2.Response()                                              
    response.init.success = success                                            
    response.init.error = msg[1:]                                              
                                                                               
    return response                                                            
                                                                               
def begin_batch(self, begin_req):                                              
    print >> sys.stderr, "DAS: Inside lin_reg begin_batch"                     
    # print("DAS:Called begin_batch of UDF\n")                                 
    # print(begin_req)

def point(self, point):                                                     
    print >> sys.stderr, "DAS: Inside lin_reg point"                        
    # print("DAS:Called point method\n")                                    
    # print(point)                                                          
                                                                            
def end_batch(self, batch_meta):                                            
    print >> sys.stderr, "DAS: Inside lin_reg end_batch"                    
    print(batch_meta)                                                       

if name == ‘_main’:
# create an agent
agent = Agent()
lr = lin_reg(agent)

agent.handler = lr                                                          
                                                                            
print >> sys.stderr, "DAS: Starting agent for lin_reg"                      
agent.start()                                                               
agent.wait()                                                                
print >> sys.stderr, "DAS: Finished agent for lin_reg"               

Similarly The kapacitor.conf file looks as below pasted the udf
[udf]

Configuration for UDFs (User Defined Functions)

[udf.functions]
# Example go UDF.
# First compile example:
# go build -o avg_udf ./udf/agent/examples/moving_avg.go
#
# Use in TICKscript like:
# stream.goavg()
# .field(‘value’)
# .size(10)
# .as(‘m_average’)
#
# uncomment to enable
#[udf.functions.goavg]
# prog = “./avg_udf”
# args = []
# timeout = “10s”
[udf.functions.my_udf]
prog = “/usr/bin/python2"
args = [”-u", “/home/lalatendu/influx/kapa/udf/udf_exa.py”]
timeout = “10s”
[udf.functions.my_udf.env]
PYTHONPATH = “/home/lalatendu/influx/kapa/udf/kapacitor/udf/agent/py”
# Run python