Kapacitor UDF is giving read error: bad wiretype for oneof field in *udf.Response

Hi

Python Script UDF

# GridSearch the hyperparameters of p, d, q and P, D, Q, m
    p = d = q = range(0, 2)
    param_grid = list(itertools.product(p, d, q))
    seasonal_param_grid = [(x[0], x[1], x[2], 30) for x in list(itertools.product(p, d, q))]
    logger.debug("param_grid %s",param_grid)
    logger.debug("seasonal_param_grid = %s",seasonal_param_grid)
    ts = self.get_series()
    ts.index = pd.DatetimeIndex(ts.index.values, freq=ts.index.inferred_freq)

    for param in param_grid:
        for param_seasonal in seasonal_param_grid:
            try:
                 mdl = sm.tsa.statespace.SARIMAX(ts, order=param, seasonal_order=param_seasonal, enforce_stationarity=True, enforce_invertibility=True)
                 results = mdl.fit(maxiter=1000, method='nm')

            # Store results
                 current_aic = results.aic
                 logger.debug("current_aic %s",current_aic)

            # Set baseline for aic

                 if (lowest_aic == None):
                     lowest_aic = results.aic
                     logger.debug("lowest_aic %s",lowest_aic)

            # Compare results

                 if (current_aic <= lowest_aic):

                     lowest_aic = current_aic
                     lowest_parm = param
                     lowest_param_seasonal = param_seasonal

            #print ('SARIMA{}x{} - AIC:{}'.format(param, param_seasonal, results.aic))
            except:

                 continue

    return lowest_parm,lowest_param_seasonal



def sarimax3(self):
    logger.info("sarimax3 \n")
    ts = self.get_series()
    logger.info("size of input: %s" %len(ts))
    self._period = ts.index[1] - ts.index[0]
    frequency = Second(self._period.total_seconds())
    self._Gridsearch = self.Gridsearch()
    logger.debug(self._Gridsearch)
    self._model = SARIMAX(ts, order=lowest_parm, seasonal_order=lowest_param_seasonal, enforce_stationarity=False, enforce_invertibility=False).fit()
    logger.debug(' model  %s', self._model)
    start_date = ts.index[-1] + self._period
    end_date = start_date + (self._predict -1) * self._period
    forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())

    if self._Gridsearch[1] > 0:
        shift = self.max() - self.min()
        forecast += shift
    logger.debug(forecast)
    return forecast

class ForecastHandler(Handler):
def init(self, agent, state):
self._agent = agent
self._state = state
self._field = None
self._field_type = None
self._predict = 0
self._begin_response = None
self._point = None

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.BATCH
    response.info.provides = udf_pb2.BATCH

    response.info.options['predict'].valueTypes.append(udf_pb2.INT)
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    response.info.options['type'].valueTypes.append(udf_pb2.STRING)
    logger.info("info")
    return response


def init(self, init_req):
    success = True
    msg = ''

    for opt in init_req.options:
        if opt.name == 'predict':
            self._predict = opt.values[0].intValue
            self._state._predict = self._predict
        if opt.name == 'field':
            self._field = opt.values[0].stringValue
        if opt.name == 'type':
            self._field_type = opt.values[0].stringValue
    if self._predict < 1:
        succ = False
        msg += ' must supply number of values to be predicted > 0'
    if self._field is None:
        success = False
        msg += ' must supply a field name'
    if self._field_type not in FIELD_TYPES:
        succ = False
        msg += ' field type must be one of {}'.format(FIELD_TYPES)

    response = udf_pb2.Response()
    response.init.success = success
    response.init.error = msg[1:]
    return response

 def snapshot(self):
    response = udf_pb2.Response()
    response.snapshot.snapshot = ''
    return response

def restore(self, restore_req):
    response = udf_pb2.Response()
    response.restore.success = False
    response.restore.error = 'not implemented'
    return response

def begin_batch(self, begin_req):
    self._state.drop()
    response = udf_pb2.Response()
    response.begin.CopyFrom(begin_req)
    self._begin_response = response

def point(self, point):
    value = point.fieldsDouble[self._field]
    self._state.update(pd.to_datetime(point.time), value)
    self._point = point
    #logger.debug(str(point))


 def end_batch(self, end_req):
    forecast  = self._state.sarimax3()
    logger.debug("size of forecast %s" %len(forecast))
    self._begin_response.begin.size = self._predict
    self._agent.write_response(self._begin_response)
    response = udf_pb2.Response()
    response.point.CopyFrom(self._point)

    for i in range(0, self._predict):
        response.point.time = forecast.index[i].value
        response.point.fieldsDouble[self._field] = forecast[i]
        self._agent.write_response(response)

    response.end.CopyFrom(end_req)
    self._agent.write_response(response)

if name == ‘main’:
a = Agent()
h = ForecastHandler(a,state())
a.handler = h

logger.info("Starting Agent")
a.start()
a.wait()
logger.info("Agent finished")

Influxquery :

var data = batch
|query(’’’
SELECT max(usage_iowait) as usage_iowait
FROM “telegraf”.“autogen”.“cpu”
WHERE “cpu”=‘cpu1’
‘’’)
.period(7d)
.every(1m)
.groupBy(1m)

data
@sarimax3()
.field(‘usage_iowait’)
.predict(96)
.type(‘double’)
|influxDBOut()
.create()
.database(‘telegraf’)
.retentionPolicy(‘autogen’)
.measurement(‘sarimax1_model1’)

ts=2018-09-18T10:50:46.155+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,155 INFO:root: sarimax3 "
ts=2018-09-18T10:50:46.155+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=
ts=2018-09-18T10:50:46.164+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,159 INFO:root: size of input: 10081”
ts=2018-09-18T10:50:46.248+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,248 DEBUG:matplotlib: CACHEDIR=/var/lib/kapacitor/.cache/ma tplotlib”
ts=2018-09-18T10:50:46.250+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,250 DEBUG:matplotlib.font_manager: Using fontManager instan ce from /var/lib/kapacitor/.cache/matplotlib/fontList.json”
ts=2018-09-18T10:50:46.388+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,388 DEBUG:root: param_grid [(0, 0, 0), (0, 0, 1), (0, 1, 0) , (0, 1, 1), (1, 0, 0), (1, 0, 1), (1, 1, 0), (1, 1, 1)]”
ts=2018-09-18T10:50:46.388+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,388 DEBUG:root: seasonal_param_grid = [(0, 0, 0, 30), (0, 0 , 1, 30), (0, 1, 0, 30), (0, 1, 1, 30), (1, 0, 0, 30), (1, 0, 1, 30), (1, 1, 0, 30), (1, 1, 1, 30)]”
ts=2018-09-18T10:50:46.659+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=”/usr/local/lib/python2.7/dist-packages/statsmodels/tsa/statespace/representatio n.py:375: FutureWarning: Using a non-tuple sequence for multidimensional indexing is deprecated; use arr[tuple(seq)] instead of arr[seq]. In the future this will be interpreted as an array index, arr[np.arra y(seq)], which will result either in an error or a different result."
ts=2018-09-18T10:50:46.659+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=" return matrix[[slice(None)]*(matrix.ndim-1) + [0]]"
ts=2018-09-18T10:50:46.680+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,680 DEBUG:root: current_aic 24827.404489948818”
ts=2018-09-18T10:50:46.680+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:46,680 DEBUG:root: lowest_aic 24827.404489948818”
ts=2018-09-18T10:50:50.105+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:50:50.053157437+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor pro tocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=f13dad7b-bb1f-11e8-8030-000000000000 duration=52.219741ms
ts=2018-09-18T10:50:50.302+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:50:50.301991656+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=f163a591-bb1f-11e8-8031-000000000000 duration=432.009µs
ts=2018-09-18T10:50:53.925+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:53,925 DEBUG:root: current_aic 24025.79640798875”
ts=2018-09-18T10:50:57.380+02:00 lvl=info msg=“UDF log” service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=“2018-09-18 10:50:57,379 DEBUG:root: current_aic 27612.28552365586”
ts=2018-09-18T10:51:44.364+02:00 lvl=error msg=“failed to snapshot task” service=kapacitor task_master=main task=sarimax3_model1 err=“read error: bad wiretype for oneof field in *agent.Response”
ts=2018-09-18T10:51:50.087+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:51:50.026337265+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor pro tocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=14fcdbd1-bb20-11e8-803d-000000000000 duration=60.946178ms
ts=2018-09-18T10:51:50.302+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:51:50.302212239+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1526f42f-bb20-11e8-803e-000000000000 duration=398.773µs
ts=2018-09-18T10:52:00.076+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:52:00.03753731+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1af47252-bb20-11e8-803f-000000000000 duration=39.167629ms
ts=2018-09-18T10:52:00.302+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:52:00.302125499+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1b1cd1cb-bb20-11e8-8040-000000000000 duration=444.006µs
ts=2018-09-18T10:52:01.128+02:00 lvl=info msg=“http request” service=http host=::1 username=- start=2018-09-18T10:52:01.124457997+02:00 method=GET uri=/kapacitor/v1/tasks/sarimax3_model1?dot-view=attributes&repla y-id=&script-format=formatted protocol=HTTP/1.1 status=200 referer=- user-agent=KapacitorClient request-id=1b9a4c29-bb20-11e8-8041-000000000000 duration=4.372499ms
ts=2018-09-18T10:52:02.467+02:00 lvl=error msg=“failed to snapshot task” service=kapacitor task_master=main task=sarimax3_model1 err=“read error: bad wiretype for oneof field in *agent.Response”

I just create kapacitor udf for forecastin and when I run i get this as error : err="read error: bad wiretype for oneof field in *agent.Response.

what mean this error ?

Thanks
G Moussa

Hi!

There are a couple posts related to this problem.

Try reading through the solutions here, and in this GitHub issue. Take a look and see if this helps!

If not, let us know and we will keep digging. :slight_smile:

2 Likes

Thanks for answer but I try this solution doesn’t work. Aka I have the probleme of type wire I don’t have the alternative way that I can use to solve this error.

Thanks

Hello @garba_moussa what version of python are you using?
Thank you.

Hello @Anaisdg I use Python 2
[udf.functions.ARIMA]
prog = “/usr/bin/python2”
args = ["-u", “/tmp/…f/agent/py/ARIMA.py”]
timeout = “10s”
[udf.functions.ARIMA.env]
PYTHONPATH = “/tmp/…or/udf/agent/py”