Hi
Python Script UDF
# GridSearch the hyperparameters of p, d, q and P, D, Q, m
p = d = q = range(0, 2)
param_grid = list(itertools.product(p, d, q))
seasonal_param_grid = [(x[0], x[1], x[2], 30) for x in list(itertools.product(p, d, q))]
logger.debug("param_grid %s",param_grid)
logger.debug("seasonal_param_grid = %s",seasonal_param_grid)
ts = self.get_series()
ts.index = pd.DatetimeIndex(ts.index.values, freq=ts.index.inferred_freq)
for param in param_grid:
for param_seasonal in seasonal_param_grid:
try:
mdl = sm.tsa.statespace.SARIMAX(ts, order=param, seasonal_order=param_seasonal, enforce_stationarity=True, enforce_invertibility=True)
results = mdl.fit(maxiter=1000, method='nm')
# Store results
current_aic = results.aic
logger.debug("current_aic %s",current_aic)
# Set baseline for aic
if (lowest_aic == None):
lowest_aic = results.aic
logger.debug("lowest_aic %s",lowest_aic)
# Compare results
if (current_aic <= lowest_aic):
lowest_aic = current_aic
lowest_parm = param
lowest_param_seasonal = param_seasonal
#print ('SARIMA{}x{} - AIC:{}'.format(param, param_seasonal, results.aic))
except:
continue
return lowest_parm,lowest_param_seasonal
def sarimax3(self):
logger.info("sarimax3 \n")
ts = self.get_series()
logger.info("size of input: %s" %len(ts))
self._period = ts.index[1] - ts.index[0]
frequency = Second(self._period.total_seconds())
self._Gridsearch = self.Gridsearch()
logger.debug(self._Gridsearch)
self._model = SARIMAX(ts, order=lowest_parm, seasonal_order=lowest_param_seasonal, enforce_stationarity=False, enforce_invertibility=False).fit()
logger.debug(' model %s', self._model)
start_date = ts.index[-1] + self._period
end_date = start_date + (self._predict -1) * self._period
forecast = self._model.predict(start_date.isoformat(), end_date.isoformat())
if self._Gridsearch[1] > 0:
shift = self.max() - self.min()
forecast += shift
logger.debug(forecast)
return forecast
class ForecastHandler(Handler):
def init(self, agent, state):
self._agent = agent
self._state = state
self._field = None
self._field_type = None
self._predict = 0
self._begin_response = None
self._point = None
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.BATCH
response.info.options['predict'].valueTypes.append(udf_pb2.INT)
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
response.info.options['type'].valueTypes.append(udf_pb2.STRING)
logger.info("info")
return response
def init(self, init_req):
success = True
msg = ''
for opt in init_req.options:
if opt.name == 'predict':
self._predict = opt.values[0].intValue
self._state._predict = self._predict
if opt.name == 'field':
self._field = opt.values[0].stringValue
if opt.name == 'type':
self._field_type = opt.values[0].stringValue
if self._predict < 1:
succ = False
msg += ' must supply number of values to be predicted > 0'
if self._field is None:
success = False
msg += ' must supply a field name'
if self._field_type not in FIELD_TYPES:
succ = False
msg += ' field type must be one of {}'.format(FIELD_TYPES)
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]
return response
def snapshot(self):
response = udf_pb2.Response()
response.snapshot.snapshot = ''
return response
def restore(self, restore_req):
response = udf_pb2.Response()
response.restore.success = False
response.restore.error = 'not implemented'
return response
def begin_batch(self, begin_req):
self._state.drop()
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
def point(self, point):
value = point.fieldsDouble[self._field]
self._state.update(pd.to_datetime(point.time), value)
self._point = point
#logger.debug(str(point))
def end_batch(self, end_req):
forecast = self._state.sarimax3()
logger.debug("size of forecast %s" %len(forecast))
self._begin_response.begin.size = self._predict
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
response.point.CopyFrom(self._point)
for i in range(0, self._predict):
response.point.time = forecast.index[i].value
response.point.fieldsDouble[self._field] = forecast[i]
self._agent.write_response(response)
response.end.CopyFrom(end_req)
self._agent.write_response(response)
if name == âmainâ:
a = Agent()
h = ForecastHandler(a,state())
a.handler = h
logger.info("Starting Agent")
a.start()
a.wait()
logger.info("Agent finished")
Influxquery :
var data = batch
|query(âââ
SELECT max(usage_iowait) as usage_iowait
FROM âtelegrafâ.âautogenâ.âcpuâ
WHERE âcpuâ=âcpu1â
âââ)
.period(7d)
.every(1m)
.groupBy(1m)
data
@sarimax3()
.field(âusage_iowaitâ)
.predict(96)
.type(âdoubleâ)
|influxDBOut()
.create()
.database(âtelegrafâ)
.retentionPolicy(âautogenâ)
.measurement(âsarimax1_model1â)
ts=2018-09-18T10:50:46.155+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,155 INFO:root: sarimax3 "
ts=2018-09-18T10:50:46.155+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=
ts=2018-09-18T10:50:46.164+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,159 INFO:root: size of input: 10081â
ts=2018-09-18T10:50:46.248+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,248 DEBUG:matplotlib: CACHEDIR=/var/lib/kapacitor/.cache/ma tplotlibâ
ts=2018-09-18T10:50:46.250+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,250 DEBUG:matplotlib.font_manager: Using fontManager instan ce from /var/lib/kapacitor/.cache/matplotlib/fontList.jsonâ
ts=2018-09-18T10:50:46.388+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,388 DEBUG:root: param_grid [(0, 0, 0), (0, 0, 1), (0, 1, 0) , (0, 1, 1), (1, 0, 0), (1, 0, 1), (1, 1, 0), (1, 1, 1)]â
ts=2018-09-18T10:50:46.388+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,388 DEBUG:root: seasonal_param_grid = [(0, 0, 0, 30), (0, 0 , 1, 30), (0, 1, 0, 30), (0, 1, 1, 30), (1, 0, 0, 30), (1, 0, 1, 30), (1, 1, 0, 30), (1, 1, 1, 30)]â
ts=2018-09-18T10:50:46.659+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â/usr/local/lib/python2.7/dist-packages/statsmodels/tsa/statespace/representatio n.py:375: FutureWarning: Using a non-tuple sequence for multidimensional indexing is deprecated; use arr[tuple(seq)]
instead of arr[seq]
. In the future this will be interpreted as an array index, arr[np.arra y(seq)]
, which will result either in an error or a different result."
ts=2018-09-18T10:50:46.659+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=" return matrix[[slice(None)]*(matrix.ndim-1) + [0]]"
ts=2018-09-18T10:50:46.680+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,680 DEBUG:root: current_aic 24827.404489948818â
ts=2018-09-18T10:50:46.680+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:46,680 DEBUG:root: lowest_aic 24827.404489948818â
ts=2018-09-18T10:50:50.105+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:50:50.053157437+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor pro tocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=f13dad7b-bb1f-11e8-8030-000000000000 duration=52.219741ms
ts=2018-09-18T10:50:50.302+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:50:50.301991656+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=f163a591-bb1f-11e8-8031-000000000000 duration=432.009”s
ts=2018-09-18T10:50:53.925+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:53,925 DEBUG:root: current_aic 24025.79640798875â
ts=2018-09-18T10:50:57.380+02:00 lvl=info msg=âUDF logâ service=kapacitor task_master=main task=sarimax3_model1 node=sarimax32 text=â2018-09-18 10:50:57,379 DEBUG:root: current_aic 27612.28552365586â
ts=2018-09-18T10:51:44.364+02:00 lvl=error msg=âfailed to snapshot taskâ service=kapacitor task_master=main task=sarimax3_model1 err=âread error: bad wiretype for oneof field in *agent.Responseâ
ts=2018-09-18T10:51:50.087+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:51:50.026337265+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor pro tocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=14fcdbd1-bb20-11e8-803d-000000000000 duration=60.946178ms
ts=2018-09-18T10:51:50.302+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:51:50.302212239+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1526f42f-bb20-11e8-803e-000000000000 duration=398.773”s
ts=2018-09-18T10:52:00.076+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:52:00.03753731+02:00 method=POST uri=/write?consistency=&db=_internal&precision=ns&rp=monitor prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1af47252-bb20-11e8-803f-000000000000 duration=39.167629ms
ts=2018-09-18T10:52:00.302+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:52:00.302125499+02:00 method=POST uri=/write?consistency=&db=telegraf&precision=ns&rp=autogen prot ocol=HTTP/1.1 status=204 referer=- user-agent=InfluxDBClient request-id=1b1cd1cb-bb20-11e8-8040-000000000000 duration=444.006”s
ts=2018-09-18T10:52:01.128+02:00 lvl=info msg=âhttp requestâ service=http host=::1 username=- start=2018-09-18T10:52:01.124457997+02:00 method=GET uri=/kapacitor/v1/tasks/sarimax3_model1?dot-view=attributes&repla y-id=&script-format=formatted protocol=HTTP/1.1 status=200 referer=- user-agent=KapacitorClient request-id=1b9a4c29-bb20-11e8-8041-000000000000 duration=4.372499ms
ts=2018-09-18T10:52:02.467+02:00 lvl=error msg=âfailed to snapshot taskâ service=kapacitor task_master=main task=sarimax3_model1 err=âread error: bad wiretype for oneof field in *agent.Responseâ
I just create kapacitor udf for forecastin and when I run i get this as error : err="read error: bad wiretype for oneof field in *agent.Response.
what mean this error ?
Thanks
G Moussa