Anyone Performing Time Series Forecasting or Anomaly Detection?

Hi @gregory_scafarto,
Sorry other business priorities superseded but I’ve been so much looking forward for this.
It will be awesome if you can share the (working) udfs, script anything you got!

Thanks
Ashish

1 Like

@Ashish_Sikarwar the udf

from kapacitor.udf.agent import Agent, Handler
from kapacitor.udf import udf_pb2
import sys
import json
import pandas as pd
import numpy as np
from  adtk.src.adtk.data import validate_series
import pickle
import math
import adtk.src.adtk.detector
import logging
from datetime import datetime, timedelta

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()


class Anomalies_detection(Handler):
    def __init__(self, agent):
        """Constructor"""
        logger.info('__init__ trigger')
        self._agent = agent
        self._field = ''
        self._size = 25
        self._points = []
        self._time=[]
        self._state = {}
        self.model=""

    def info(self):
        """info: Define what your UDF wants and what will it provide in the end"""
        logger.info('info trigger')
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.STREAM
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)
        response.info.options['size'].valueTypes.append(udf_pb2.INT)
        response.info.options['model'].valueTypes.append(udf_pb2.STRING)
        return response

    def init(self, init_req):
        """init: Define what your UDF expects as parameters when parsing the TICKScript"""
        logger.info('INIT trigger')
        for opt in init_req.options:
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            elif opt.name == 'size':
                self._size = opt.values[0].intValue
            elif opt.name == 'model':
                self.model = opt.values[0].stringValue
        success = True
        msg = ' must provides info'
        if self._field == '':
            success = False
            msg = 'must provide field name'
        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg.encode()
        return response

    def begin_batch(self, begin_req):
        """begin_batch: Do something at the beginning of the batch"""
        self._batch=AD_model(self._size,self.model)
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response
        logger.info('begin_batch trigger')

    def point(self, point):
        """point: process each point within the batch"""
        logger.info('point trigger')
        self._batch.update(point.fieldsDouble[self._field],point.time)

    def snapshot(self):
        """snapshot: take a snapshot of the current data, if the task stops for some reason """
        data = {}
        for group, state in self._state.items():
            data[group] = state.snapshot()
        response = udf_pb2.Response()
        response.snapshot.snapshot = json.dumps(data).encode()
        return response

    def restore(self, restore_req):
        response = udf_pb2.Response()
        response.restore.success = True
        return response

        
    def end_batch(self, batch_meta):
        """end_batch: do something at the end of the batch"""
        results=self._batch.test()
        point=results[-1]
        resp = udf_pb2.Response()
        resp.point.name = batch_meta.name
        resp.point.time = self._batch.get_time(len(results)-1)
        resp.point.group = batch_meta.group
        resp.point.tags.update(batch_meta.tags)
        resp.point.fieldsString['metrics'] = "anomalies"
        resp.point.fieldsDouble["value"] = self._batch.get_value(len(results)-1)
        if point ==  1.0 or point == True :
            resp.point.fieldsDouble['val_anomalies'] = 100
        else :
            resp.point.fieldsDouble['val_anomalies'] = 1
        if type(resp != None) :
            self._agent.write_response(resp)
        else :
            logger.info('pbbbbbb',resp)
        logger.info('end_batch')

    def end_batch_3(self, batch_meta):
        """end_batch: do something at the end of the batch"""
        results=self._batch.test()
        p=0
        for point in results:            
            response = udf_pb2.Response()
            response.point.name = batch_meta.name
            response.point.time = self._batch.get_time(p)
            response.point.group = batch_meta.group
            response.point.tags.update(batch_meta.tags)
            response.point.fieldsString['metrics'] = "anomalies"
            response.point.fieldsDouble["value"] = self._batch.get_value(p)
            if point ==  1.0 or point == True :
                response.point.fieldsDouble['val_anomalies'] = 100
            else :
                response.point.fieldsDouble['val_anomalies'] = 1
            p=p+1
            self._agent.write_response(response)   
        logger.info('end_batch')


class AD_model(object):

    def __init__(self,size,model):
        self.size=size
        self._date=[]
        self._value=[]
        self.model=model
        with open(self.model, 'rb') as f:
            self.model = pickle.load(f)
            
    def update(self,value,time) :
        self._value.append(value)
        self._date.append(time)
       

    def test(self):
        self.df=pd.DataFrame({'time':self._date,'y': self._value})
        self.df=self.df.fillna(0)
        self.df=self.df.reset_index(drop=True)
        self.df["time"]=self.df["time"].astype('Int64')        
        self.df=self.df.set_index("time")
        self.df.index=pd.to_datetime(self.df.index)        
        s = validate_series(self.df["y"])                   
        val=self.model.detect(s, return_list=False)
        logger.info("les res",val)
        return val

    def get_value(self,p):
        return self._value[p]
    
    def get_time(self,p):
        return self._date[p]
    
if __name__ == '__main__':
    # Create an agent
    agent = Agent()

    # Create a handler and pass it an agent so it can write points
    h = Anomalies_detection(agent)

    # Set the handler on the agent
    agent.handler = h

    # anything printed to STDERR from a UDF process gets captured into the logs
    logger.info("Starting agent for Anomalies_detection")
    agent.start()
    agent.wait()
    logger.info("Agent finished")
1 Like

@Ashish_Sikarwar the class that create the alert

import os

path_to_kap = os.environ['kapacitor']


class Alert():
    def __init__(self, host, measurement):
        self.host = host
        self.measurement = measurement
        self.texte = ""

    def create(self, message, form, pas, path_to_model, field, typo, db):
        self.form = form
        '''
        Create the tick alert 
        Note : NEED TO DEFINE the path of the script, which will be launched when an alert is trigged, as a variable environnement
        Parameters
        ----------
        message : str
            Message to be shown as an alert on slack etc ; need to be written with kapacitor syntax
        Returns
        -------
        None.
        '''
        where_condition = ""
        where = [[element.split("=") for element in form[1:].split(",")][i][0] for i in range(len(form[1:].split(",")))]
        for ele in where:
            where_condition = where_condition + ele + "=" + ele + " AND "
        texte = ""
        tags = ""
        cond = ["var " + (form[1:].replace(",", " AND").split("AND")[i]).replace("=", "='") + "'" for i in
                range(len(form[1:].replace(",", " AND").split("AND")))]
        for element in cond:
            texte = texte + element + "\n"
            tags = tags + ".tag('" + element.split("=")[0] + "'," + element.split("=")[1] + ")\n"
        texte = texte + "\n\n" + """var data = batch
                |query('SELECT """ + typo + """(*) FROM """ + db + """."autogen".""" + self.measurement + """ WHERE """ + where_condition[
                                                                                                                          : -5] + """')
                    .period(""" + str(int(25 * pas)) + """m)
                    .every(""" + str(int(pas)) + """m)
                    .groupBy(time(""" + str(int(pas)) + """m))
                data
                  @udf_test()
                    .field('""" + typo + """_""" + field + """')
                    .size(25)
                    .model('""" + path_to_model + """')
                  |alert()
                    .crit(lambda: "val_anomalies" > 10)
                    .message('""" + message + """')
                    .slack()
                  |InfluxDBOut()
                        .database('""" + db + """')
                        .retentionPolicy('autogen')
                        .measurement('anomalies')
                        .tag('measurement','""" + self.measurement + """')\n
                        """ + tags.replace("var ", "")

        self.texte = texte

    def save_alert(self):
        self.form = self.form[1:].replace("=", ".")
        self.form = self.form.replace(",", "_")
        self.form = self.form.replace(":", "")
        self.path = r"Alerte/alerte_" + self.measurement + "_" + self.form + ".tick"
        self.path = self.path.replace(" ", "")
        print(self.path)
        with open(self.path, "w") as f:
            f.write(self.texte)
        f.close()

    def define_alert(self):
        self.form = self.form.replace("=", ".")
        self.form = self.form.replace(",", "_")
        self.form = self.form.replace(":", "")
        cmd_define_alert = path_to_kap + " define " + "alerte_AD_" + self.measurement + "_" + self.form + " -type batch -tick " + self.path + " -dbrp telegraf.autogen"
        print(cmd_define_alert)
        os.system('cmd /c ' + cmd_define_alert)

    def enable_alert(self):
        self.form = self.form.replace("=", ".")
        self.form = self.form.replace(",", "_")
        self.form = self.form.replace(":", "")
        cmd_enable_alert = path_to_kap + " enable " + "alerte_AD_" + self.measurement + "_" + self.form
        os.system('cmd /c ' + cmd_enable_alert)

    def launch(self):
        self.define_alert()
        self.enable_alert()

I dont have any alert.tick to show but the class (and the form parameter) is explained here : Community Highlight: How Supralog Built an Online Incremental Machine Learning Pipeline with InfluxDB OSS for Capacity Planning | InfluxData
pas is the granularity
field is the value you want to monitor
typo is the aggregation function
db the database

2 Likes

Thanks a lot @gregory_scafarto !!!

Thanks for sharing. i am watching the seminar and the presenter talked and demo about the anomaly detection via ML (in the services), however, when i checked my influxdb v2, i can’t locate them. Did you manage to find them? thanks

Hello @lchunleo,
That idea was never implemented in InfluxDB. However I appreciate your interest in the topic. Can you please link to the webinar (No-Code ML for Forecasting and Anomaly Detection | InfluxData) and create an issue in both:
GitHub - influxdata/influxdb: Scalable datastore for metrics, events, and real-time analytics
GitHub - influxdata/ui: UI for InfluxDB
If you feel so inclined? I would love to help product prioritize supporting that framework.
Thank you!