Kapacitor constantly crashing

client-libraries
kapacitor
#1

Kapacitor just keeps crashing on me. On inspecting the kapacitord.err file I can see:

"panic: unexpected response message

goroutine 107 [running]:
githubdotcom/influxdata/kapacitor/udf.(*Server).handleResponse(0xc4205689a0, 0xc420b618c0, 0x0, 0x0)
/root/go/src/githubdotcom/influxdata/kapacitor/udf/server.go:757 +0x13c1
github.com/influxdata/kapacitor/udf.(*Server).readData(0xc4205689a0, 0x0, 0x0)
/root/go/src/github/influxdata/kapacitor/udf/server.go:658 +0xa5
github/influxdata/kapacitor/udf.(*Server).Start.func2(0xc4205689a0)
/root/go/src/github.com/influxdata/kapacitor/udf/server.go:165 +0x2f
created by github.com/influxdata/kapacitor/udf.(*Server).Start
/root/go/src/github/influxdata/kapacitor/udf/server.go:164 +0x171

I’ve tried to diagnose the issue using the other log file kapacitor.log but there is NO error message at all. I have a single tick script enabled which looks like

stream
// Act on all received SMSes
|from()
.measurement(‘recv_sms’)
// launch the @sms_machine UDF
@sms_machine()
.fromNumFieldName(‘phNum’)
.SMSBodyFieldName(‘SMSBody’)
.countryFieldName(‘SMSCountry’)
// Add the SMS requests into influxDB
|influxDBOut()
.measurement(‘send_sms_req’)

I suspect that my issue is because I call into an external python module influxdb, which then outputs strings to the logger. It seems that the point method actually completes and kapacitor crashes after that.

My UDF is as follows:

from future import print_function
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
from pylibs import sms_handler

import logging
logging.basicConfig(level=logging.DEBUG, format=’%(asctime)s %(levelname)s:%(name)s: %(message)s’)
logger = logging.getLogger()

class SmsMachineHandler(Handler):
def init(self, agent):
self._agent = agent
self._mapped_fields = {‘fromNumFieldName’: None,
‘SMSBodyFieldName’: None,
‘countryFieldName’: None}

def add_field_name_to_field_name_map(self, internal_field_name, supplied_field_name):
    self._mapped_fields[internal_field_name] = supplied_field_name

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.STREAM
    response.info.provides = udf_pb2.STREAM

    # We want to pass in 3 field names from the tick script to make changing fields easier
    for internalfieldname in self._mapped_fields.keys():
     response.info.options[internalfieldname].valueTypes.append(udf_pb2.STRING)
    return response

def init(self, init_req):
    msg = ''
    success = True

    # Populate the fields from the tick script
    for opt in init_req.options:
        for internalfieldname in self._mapped_fields.keys():
            if internalfieldname == opt.name:
            >self.add_field_name_to_field_name_map(internalfieldname, opt.values[0].stringValue)

    # Check that all values have been provided
    for internalfieldname, passedfieldname in self._mapped_fields.items():
        if passedfieldname is None:
            success = False
            msg += ' must supply {}'.format(internalfieldname)

    response = udf_pb2.Response()
    response.init.success = success
    response.init.error = msg[1:]

    return response

def snapshot(self):
    response = udf_pb2.Response()
    return response

def restore(self, restore_req):
    response = udf_pb2.Response()
    response.restore.success = True
    return response

def begin_batch(self, begin_req):
    raise Exception("not supported")

def point(self, point):
    response = udf_pb2.Response()
    response.point.CopyFrom(point)
    response.point.ClearField('fieldsInt')
    response.point.ClearField('fieldsString')
    response.point.ClearField('fieldsDouble')

    # Get the data from the point
    from_number = point.fieldsString[self._mapped_fields['fromNumFieldName']]
    #from_country = point.fieldsString[self._mapped_fields['countryFieldName']]
    sms_body = point.fieldsString[self._mapped_fields['SMSBodyFieldName']]

    logger.info("Inbound SMS for processing")
    logger.info("Calling external module")

    result_dict = sms_handler.process_sms_contents(from_number, sms_body)


    # take data from the sms handler and create a new point in measurement 'sent_sms' based on the data returned
    response.point.fieldsInt['SMSInTimeStamp'] = point.time  # Allows tracing from inbound sms to response sms

    logger.info("The returned dict looks like: {}".format(result_dict))

    for key, value in result_dict.iteritems():
        if type(value) is int:
            response.point.fieldsInt[key] = value
        else:
            response.point.fieldsString[key] = value

    # Suspect an issue here
    logfile = open('/tmp/point_log.txt', 'a')
    print("{}".format(response.point), file=logfile)

    self._agent.write_response(response)

    logger.info("Written response")

def end_batch(self, end_req):
    raise Exception("not supported")

if name == ‘main’:
a = Agent()
h = SmsMachineHandler(a)
a.handler = h

logger.info("Starting Agent")
a.start()
a.wait()
logger.info("Agent finished")

I see the logger message “Written response” in the log file and some time later I see the log in kapacitord.err and kapacitor stops

#2

It seems that it consistently crashes after ten of these
ts=2018-05-06T20:31:42.029+10:00 lvl=info msg=“http request” service=http host=127.0.0.1 username=- start=2018-05-06T20:31:42.027562124+10:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor protocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=aabb6faf-5118-11e8-800a-000000000000 duration=2.267648ms