Kapacitor data flow

Hi,

I am pretty confused about kapacitor. I have started my kapacitor on a node, and my influx is running on another node.

I have setup influxdb in kapacitor.conf, and It says in the subscriptions,

  [influxdb.subscriptions]
    # Set of databases and retention policies to subscribe to.
    # If empty will subscribe to all, minus the list in
    # influxdb.excluded-subscriptions
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "telegraf", "telegraf" ]

So, it will subscribe to all databases by default. So, I read about subscriptions,

It says that

Rather than querying InfluxDB for data except when using the [BatchNode] all data is copied to your Kapacitor server or cluster through an InfluxDB subscription.

So, it means that all data will be copied on kapacitor. For how long will it be stored on kapacitor machine? If on telegraf database I have a retention policy of 7 days, will kapacitor also store this data for 7 days?

Doesn’t it introduce an extra overhead of getting a bigger machine for kapacitor so that it can handle the data properly?

I just want alerts on my telegraf database, this is my TICKscript, it is generated by chronograf.

    TICKscript:
    var db = 'telegraf'

    var rp = 'autogen'

    var measurement = 'cpu'

    var groupBy = ['host']

    var whereFilter = lambda: TRUE

    var name = 'Custom CPU'

    var idVar = name + '-{{.Group}}'

    var message = 'CPU high'

    var idTag = 'alertID'

    var levelTag = 'level'

    var messageField = 'message'

    var durationField = 'duration'

    var outputDB = 'chronograf'

    var outputRP = 'autogen'

    var outputMeasurement = 'alerts'

    var triggerType = 'threshold'

    var crit = 5

    var data = stream
        |from()
            .database(db)
            .retentionPolicy(rp)
            .measurement(measurement)
            .groupBy(groupBy)
            .where(whereFilter)
        |eval(lambda: "usage_idle")
            .as('value')

    var trigger = data
        |alert()
            .crit(lambda: "value" > crit)
            .message(message)
            .id(idVar)
            .idTag(idTag)
            .levelTag(levelTag)
            .messageField(messageField)
            .durationField(durationField)
            .log('/tmp/alerts.log')
            .slack()
            .workspace('xxxxx')

    trigger
        |eval(lambda: float("value"))
            .as('value')
            .keep()
        |influxDBOut()
            .create()
            .database(outputDB)
            .retentionPolicy(outputRP)
            .measurement(outputMeasurement)
            .tag('alertName', name)
            .tag('triggerType', triggerType)

    trigger
        |httpOut('output')

This is just a test alert, which will be triggered if idle cpu is more than 5%, which is always true. I should be getting alerts all the time.

But I have got no alert. Also, there is no proper tutorial on how to set up kapacitor properly with alerts. Its all very scattered.

This might help you people incase anybody lands here

All data from the subscriptions will be streamed to Kapacitor. If you don’t configure any stream task, that data will be discarded. If you configure a stream task with a window node with a certain period, Kapacitor will continuously store in memory the data it needs to cover those windows.
If you configure a batch task, kapacitor will use the memory required to store the results of the query (or queries). Once the task pipeline finishes, that is discarded.

It’s another process on your system consuming CPU and RAM. If it’s significant overhead or not, it only depends on the volume of data you subscribe to, and the volume and computational complexity of the tasks you define (including the scope of your queries and the window periods you work with).

2 Likes