Kapacitor just keeps crashing on me. On inspecting the kapacitord.err file I can see:
"panic: unexpected response message
goroutine 107 [running]:
githubdotcom/influxdata/kapacitor/udf.(*Server).handleResponse(0xc4205689a0, 0xc420b618c0, 0x0, 0x0)
/root/go/src/githubdotcom/influxdata/kapacitor/udf/server.go:757 +0x13c1
github.com/influxdata/kapacitor/udf.(*Server).readData(0xc4205689a0, 0x0, 0x0)
/root/go/src/github/influxdata/kapacitor/udf/server.go:658 +0xa5
github/influxdata/kapacitor/udf.(*Server).Start.func2(0xc4205689a0)
/root/go/src/github.com/influxdata/kapacitor/udf/server.go:165 +0x2f
created by github.com/influxdata/kapacitor/udf.(*Server).Start
/root/go/src/github/influxdata/kapacitor/udf/server.go:164 +0x171
I’ve tried to diagnose the issue using the other log file kapacitor.log but there is NO error message at all. I have a single tick script enabled which looks like
stream
// Act on all received SMSes
|from()
.measurement(‘recv_sms’)
// launch the @sms_machine UDF
@sms_machine()
.fromNumFieldName(‘phNum’)
.SMSBodyFieldName(‘SMSBody’)
.countryFieldName(‘SMSCountry’)
// Add the SMS requests into influxDB
|influxDBOut()
.measurement(‘send_sms_req’)
I suspect that my issue is because I call into an external python module influxdb, which then outputs strings to the logger. It seems that the point method actually completes and kapacitor crashes after that.
My UDF is as follows:
from future import print_function
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
from pylibs import sms_handlerimport logging
logging.basicConfig(level=logging.DEBUG, format=‘%(asctime)s %(levelname)s:%(name)s: %(message)s’)
logger = logging.getLogger()class SmsMachineHandler(Handler):
def init(self, agent):
self._agent = agent
self._mapped_fields = {‘fromNumFieldName’: None,
‘SMSBodyFieldName’: None,
‘countryFieldName’: None}def add_field_name_to_field_name_map(self, internal_field_name, supplied_field_name): self._mapped_fields[internal_field_name] = supplied_field_name def info(self): response = udf_pb2.Response() response.info.wants = udf_pb2.STREAM response.info.provides = udf_pb2.STREAM # We want to pass in 3 field names from the tick script to make changing fields easier for internalfieldname in self._mapped_fields.keys(): response.info.options[internalfieldname].valueTypes.append(udf_pb2.STRING) return response def init(self, init_req): msg = '' success = True # Populate the fields from the tick script for opt in init_req.options: for internalfieldname in self._mapped_fields.keys(): if internalfieldname == opt.name: >self.add_field_name_to_field_name_map(internalfieldname, opt.values[0].stringValue) # Check that all values have been provided for internalfieldname, passedfieldname in self._mapped_fields.items(): if passedfieldname is None: success = False msg += ' must supply {}'.format(internalfieldname) response = udf_pb2.Response() response.init.success = success response.init.error = msg[1:] return response def snapshot(self): response = udf_pb2.Response() return response def restore(self, restore_req): response = udf_pb2.Response() response.restore.success = True return response def begin_batch(self, begin_req): raise Exception("not supported") def point(self, point): response = udf_pb2.Response() response.point.CopyFrom(point) response.point.ClearField('fieldsInt') response.point.ClearField('fieldsString') response.point.ClearField('fieldsDouble') # Get the data from the point from_number = point.fieldsString[self._mapped_fields['fromNumFieldName']] #from_country = point.fieldsString[self._mapped_fields['countryFieldName']] sms_body = point.fieldsString[self._mapped_fields['SMSBodyFieldName']] logger.info("Inbound SMS for processing") logger.info("Calling external module") result_dict = sms_handler.process_sms_contents(from_number, sms_body) # take data from the sms handler and create a new point in measurement 'sent_sms' based on the data returned response.point.fieldsInt['SMSInTimeStamp'] = point.time # Allows tracing from inbound sms to response sms logger.info("The returned dict looks like: {}".format(result_dict)) for key, value in result_dict.iteritems(): if type(value) is int: response.point.fieldsInt[key] = value else: response.point.fieldsString[key] = value # Suspect an issue here logfile = open('/tmp/point_log.txt', 'a') print("{}".format(response.point), file=logfile) self._agent.write_response(response) logger.info("Written response") def end_batch(self, end_req): raise Exception("not supported")
if name == ‘main’:
a = Agent()
h = SmsMachineHandler(a)
a.handler = hlogger.info("Starting Agent") a.start() a.wait() logger.info("Agent finished")
I see the logger message “Written response” in the log file and some time later I see the log in kapacitord.err and kapacitor stops