I’m fairly new to working with InfluxDB and time-series databases in general. I was reading the Kapacitator TICKscript documentation to learn a little more about how I might be able to write a Kapacitor TICKscript to aggregate some data that I’m working with.
Problem
In one database, there are multiple measurements with various data spread across them (e.g. cpu_user_seconds_total, cpu_system_seconds_total, etc.). I would like to combine these multiple measurements into one so I can better model them in a visual tool.
Currently, these metrics are collected via Telegraf and pushed to an InfluxDB database. It is not an option for me to alter how these measurements are originally collected. Eventually, this data is modeled in a Grafana dashboard.
Idea
The idea I had was about writing a TICKscript that would…
Take the various measurements from one Influx database
Rewrite them together with different selectors into a single measurement
Push the new measurement back to the Influx database
I was trying to figure out how to do this with a TICKscript and Kapacitor, but it isn’t entirely clear to me about where to begin on figuring out how to do this since there isn’t a whole lot of documentation about the TICKscript DSL (or at least in a way that I understand).
Could anyone offer some pointers on how I could set out to do this? Or is there a totally obvious and easier way to do this than I am missing?
Thanks!
EDIT: This is similar to this thread, but this is the problem I’m trying to circumvent by rewriting the multiple measurements into one.
EDIT2: This post was flagged as spam and I was required to edit the post for it to show back up again.
There is an open request in Influx to do what you want (math across measurements), but right now you’re correct the only way to do this is to combine the two measurements into separate field of a new third measurement.
If you’re not using kapacitor for anything else, it may just be easier to accomplish this via continuous queries in InfluxDB itself. I’m doing so for exactly the same thing, which takes the two metrics and populates them into a new one as separate fields. I can then add them via my querry in grafana:
q=CREATE CONTINUOUS QUERY \"sum_cpu_sys\" ON mydb BEGIN SELECT mean(value) as \"cpu_sys\" INTO \"system_cpu\" FROM \"system_cpu_sys\" GROUP BY time(1m),* END"
q=CREATE CONTINUOUS QUERY \"sum_cpu_user\" ON mydb BEGIN SELECT mean(value) as \"cpu_user\" INTO \"system_cpu\" FROM \"system_cpu_user\" GROUP BY time(1m),* END"
If you want to use kapacitor for this, this page on their docs was really helpful to me
It should look something like this in a tick script, which does the same as above but goes another step and actually does the addition and saves it to its own 3rd field in the new measurement
// Get cpu user
var cpu_user = batch
|query('SELECT mean(cpu_user) FROM "mydb"."autogen".cpu_user_seconds_total')
.period(2m)
.every(1m)
.groupBy(time(30s),'server')
.align()
// Get cpu sys
var cpu_sys = batch
|query('SELECT mean(cpu_sys) FROM "mydb"."autogen".cpu_system_seconds_total')
.period(2m)
.every(1m)
.groupBy(time(30s),'server')
.align()
var cpu_total = cpu_user
|join(cpu_sys)
.as('cpu_user', 'cpu_sys')
.tolerance(30s)
.on('server')
//Calculate sum
|eval(lambda: "cpu_user.mean" + "cpu_sys.mean")
// Give the resulting field a name
.as('cpu_total')
|influxDBOut()
.database('mydb')
.retentionPolicy('autogen')
.measurement('cpu_stats')
.precision('s')
Thanks! This is wildly helpful and was exactly what I was looking to do. The docs page on continuous queries was definitely insightful too, I completely missed that.
Perhaps until mathematical operations across different measurements makes its way into InfluxDB, even a docs page with some of the info you put here would be extremely helpful. I see that GitHub issue is locked, but perhaps @pauldix or another Influx team member could link this thread on that issue (not to pick on Paul, just saw that he had commented in that issue).
EDIT: This post was flagged as spam and I was required to edit the post for it to show back up again.
Thanks, this is a super helpful reference. I’ve been playing with this for the past few days and I’m back to a gridlock. The environment I’m working with is unique because Telegraf is pulling from Prometheus endpoints and writing the data back to Influx, so it’s not quite in a Prometheus format. I also realized that for my use case, it needs to be a stream type instead of a batch type.
I’ve set up a local testing environment, and following the instructions listed here, I was able run my TICKscript successfully and see some data retrieved, but not joined. For example, from the DOT output of Kapacitor…
I’m still working through this now, but what I’m finding is that the data isn’t actually being written out into InfluxDB. I’m going to try shrinking my TICKscript to something a little smaller and see if I can debug a little more finely. In either case, I’ll share my findings back here when I figure it out. If there’s anything that stands out or might be helpful for me to look at for debugging purposes, feedback or pointers are welcome.
EDIT: I figured out the mistake on why the join() calls weren’t working. Still debugging why it isn’t getting written to the database with influxDBOut(). Once I figure it out, I’ll make a new post here describing what I did…
EDIT2: This post was flagged as spam and I was required to edit the post for it to show back up again.
As an update, I did figure it out. I needed to write the script as a stream instead of batch type for my environment, after poking around with things for a bit. I had hit a silly error in the below thread, and also saw that you can combine multiple operations into a single eval() node. This simplified what I was trying to do.
You can see more of that in this thread:
For anyone who stumbles on this later, here is a short example of how to write the TICKscript as a stream type.
// Get cpu_sys
var cpu_sys = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('container_cpu_system_seconds_total')
// Get cpu_user
var cpu_user = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('container_cpu_user_seconds_total')
// Get cpu_total
var cpu_total = stream
|from()
.database(db)
.retentionPolicy(rp)
.measurement('container_cpu_usage_seconds_total')
// Aggregate metrics
cpu_total
|join(cpu_user, cpu_sys)
.as('cpu_total', 'cpu_user', 'cpu_sys')
.tolerance(10s)
.fill('null')
|eval(
lambda: "cpu_total.counter",
lambda: "cpu_sys.counter",
lambda: "cpu_user.counter"
)
.as('total', 'system', 'user')
|influxDBOut()
.database(db)
.retentionPolicy(rp)
.measurement('container_cpu_seconds')
.precision('s')