Hi @gregory_scafarto,
Sorry other business priorities superseded but I’ve been so much looking forward for this.
It will be awesome if you can share the (working) udfs, script anything you got!
Thanks
Ashish
Hi @gregory_scafarto,
Sorry other business priorities superseded but I’ve been so much looking forward for this.
It will be awesome if you can share the (working) udfs, script anything you got!
Thanks
Ashish
@Ashish_Sikarwar the udf
from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import json
import pandas as pd
import numpy as np
from adtk.src.adtk.data import validate_series
import pickle
import math
import adtk.src.adtk.detector
import logging
from datetime import datetime, timedelta
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()
class Anomalies_detection(Handler):
def __init__(self, agent):
"""Constructor"""
logger.info('__init__ trigger')
self._agent = agent
self._field = ''
self._size = 25
self._points = []
self._time=[]
self._state = {}
self.model=""
def info(self):
"""info: Define what your UDF wants and what will it provide in the end"""
logger.info('info trigger')
response = udf_pb2.Response()
response.info.wants = udf_pb2.BATCH
response.info.provides = udf_pb2.STREAM
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
response.info.options['size'].valueTypes.append(udf_pb2.INT)
response.info.options['model'].valueTypes.append(udf_pb2.STRING)
return response
def init(self, init_req):
"""init: Define what your UDF expects as parameters when parsing the TICKScript"""
logger.info('INIT trigger')
for opt in init_req.options:
if opt.name == 'field':
self._field = opt.values[0].stringValue
elif opt.name == 'size':
self._size = opt.values[0].intValue
elif opt.name == 'model':
self.model = opt.values[0].stringValue
success = True
msg = ' must provides info'
if self._field == '':
success = False
msg = 'must provide field name'
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg.encode()
return response
def begin_batch(self, begin_req):
"""begin_batch: Do something at the beginning of the batch"""
self._batch=AD_model(self._size,self.model)
response = udf_pb2.Response()
response.begin.CopyFrom(begin_req)
self._begin_response = response
logger.info('begin_batch trigger')
def point(self, point):
"""point: process each point within the batch"""
logger.info('point trigger')
self._batch.update(point.fieldsDouble[self._field],point.time)
def snapshot(self):
"""snapshot: take a snapshot of the current data, if the task stops for some reason """
data = {}
for group, state in self._state.items():
data[group] = state.snapshot()
response = udf_pb2.Response()
response.snapshot.snapshot = json.dumps(data).encode()
return response
def restore(self, restore_req):
response = udf_pb2.Response()
response.restore.success = True
return response
def end_batch(self, batch_meta):
"""end_batch: do something at the end of the batch"""
results=self._batch.test()
point=results[-1]
resp = udf_pb2.Response()
resp.point.name = batch_meta.name
resp.point.time = self._batch.get_time(len(results)-1)
resp.point.group = batch_meta.group
resp.point.tags.update(batch_meta.tags)
resp.point.fieldsString['metrics'] = "anomalies"
resp.point.fieldsDouble["value"] = self._batch.get_value(len(results)-1)
if point == 1.0 or point == True :
resp.point.fieldsDouble['val_anomalies'] = 100
else :
resp.point.fieldsDouble['val_anomalies'] = 1
if type(resp != None) :
self._agent.write_response(resp)
else :
logger.info('pbbbbbb',resp)
logger.info('end_batch')
def end_batch_3(self, batch_meta):
"""end_batch: do something at the end of the batch"""
results=self._batch.test()
p=0
for point in results:
response = udf_pb2.Response()
response.point.name = batch_meta.name
response.point.time = self._batch.get_time(p)
response.point.group = batch_meta.group
response.point.tags.update(batch_meta.tags)
response.point.fieldsString['metrics'] = "anomalies"
response.point.fieldsDouble["value"] = self._batch.get_value(p)
if point == 1.0 or point == True :
response.point.fieldsDouble['val_anomalies'] = 100
else :
response.point.fieldsDouble['val_anomalies'] = 1
p=p+1
self._agent.write_response(response)
logger.info('end_batch')
class AD_model(object):
def __init__(self,size,model):
self.size=size
self._date=[]
self._value=[]
self.model=model
with open(self.model, 'rb') as f:
self.model = pickle.load(f)
def update(self,value,time) :
self._value.append(value)
self._date.append(time)
def test(self):
self.df=pd.DataFrame({'time':self._date,'y': self._value})
self.df=self.df.fillna(0)
self.df=self.df.reset_index(drop=True)
self.df["time"]=self.df["time"].astype('Int64')
self.df=self.df.set_index("time")
self.df.index=pd.to_datetime(self.df.index)
s = validate_series(self.df["y"])
val=self.model.detect(s, return_list=False)
logger.info("les res",val)
return val
def get_value(self,p):
return self._value[p]
def get_time(self,p):
return self._date[p]
if __name__ == '__main__':
# Create an agent
agent = Agent()
# Create a handler and pass it an agent so it can write points
h = Anomalies_detection(agent)
# Set the handler on the agent
agent.handler = h
# anything printed to STDERR from a UDF process gets captured into the logs
logger.info("Starting agent for Anomalies_detection")
agent.start()
agent.wait()
logger.info("Agent finished")
@Ashish_Sikarwar the class that create the alert
import os
path_to_kap = os.environ['kapacitor']
class Alert():
def __init__(self, host, measurement):
self.host = host
self.measurement = measurement
self.texte = ""
def create(self, message, form, pas, path_to_model, field, typo, db):
self.form = form
'''
Create the tick alert
Note : NEED TO DEFINE the path of the script, which will be launched when an alert is trigged, as a variable environnement
Parameters
----------
message : str
Message to be shown as an alert on slack etc ; need to be written with kapacitor syntax
Returns
-------
None.
'''
where_condition = ""
where = [[element.split("=") for element in form[1:].split(",")][i][0] for i in range(len(form[1:].split(",")))]
for ele in where:
where_condition = where_condition + ele + "=" + ele + " AND "
texte = ""
tags = ""
cond = ["var " + (form[1:].replace(",", " AND").split("AND")[i]).replace("=", "='") + "'" for i in
range(len(form[1:].replace(",", " AND").split("AND")))]
for element in cond:
texte = texte + element + "\n"
tags = tags + ".tag('" + element.split("=")[0] + "'," + element.split("=")[1] + ")\n"
texte = texte + "\n\n" + """var data = batch
|query('SELECT """ + typo + """(*) FROM """ + db + """."autogen".""" + self.measurement + """ WHERE """ + where_condition[
: -5] + """')
.period(""" + str(int(25 * pas)) + """m)
.every(""" + str(int(pas)) + """m)
.groupBy(time(""" + str(int(pas)) + """m))
data
@udf_test()
.field('""" + typo + """_""" + field + """')
.size(25)
.model('""" + path_to_model + """')
|alert()
.crit(lambda: "val_anomalies" > 10)
.message('""" + message + """')
.slack()
|InfluxDBOut()
.database('""" + db + """')
.retentionPolicy('autogen')
.measurement('anomalies')
.tag('measurement','""" + self.measurement + """')\n
""" + tags.replace("var ", "")
self.texte = texte
def save_alert(self):
self.form = self.form[1:].replace("=", ".")
self.form = self.form.replace(",", "_")
self.form = self.form.replace(":", "")
self.path = r"Alerte/alerte_" + self.measurement + "_" + self.form + ".tick"
self.path = self.path.replace(" ", "")
print(self.path)
with open(self.path, "w") as f:
f.write(self.texte)
f.close()
def define_alert(self):
self.form = self.form.replace("=", ".")
self.form = self.form.replace(",", "_")
self.form = self.form.replace(":", "")
cmd_define_alert = path_to_kap + " define " + "alerte_AD_" + self.measurement + "_" + self.form + " -type batch -tick " + self.path + " -dbrp telegraf.autogen"
print(cmd_define_alert)
os.system('cmd /c ' + cmd_define_alert)
def enable_alert(self):
self.form = self.form.replace("=", ".")
self.form = self.form.replace(",", "_")
self.form = self.form.replace(":", "")
cmd_enable_alert = path_to_kap + " enable " + "alerte_AD_" + self.measurement + "_" + self.form
os.system('cmd /c ' + cmd_enable_alert)
def launch(self):
self.define_alert()
self.enable_alert()
I dont have any alert.tick to show but the class (and the form parameter) is explained here : Community Highlight: How Supralog Built an Online Incremental Machine Learning Pipeline with InfluxDB OSS for Capacity Planning | InfluxData
pas is the granularity
field is the value you want to monitor
typo is the aggregation function
db the database
Thanks a lot @gregory_scafarto !!!
Thanks for sharing. i am watching the seminar and the presenter talked and demo about the anomaly detection via ML (in the services), however, when i checked my influxdb v2, i can’t locate them. Did you manage to find them? thanks
Hello @lchunleo,
That idea was never implemented in InfluxDB. However I appreciate your interest in the topic. Can you please link to the webinar (No-Code ML for Forecasting and Anomaly Detection | InfluxData) and create an issue in both:
GitHub - influxdata/influxdb: Scalable datastore for metrics, events, and real-time analytics
GitHub - influxdata/ui: UI for InfluxDB
If you feel so inclined? I would love to help product prioritize supporting that framework.
Thank you!