Flux: operations on multiple transformations


I have some cpu time measurements from the Telegraf cgroup plugin, which are given in number of nanoseconds of cpu time up to the measurement time. So to obtain a measure of percent cpu usage, I need to use the result of the difference() function divided by the result of elapsed(). In Influxql, I can do it like this:

select difference(cpuacct.usage)/elapsed(cpuacct.usage) * 100
from cgroup
where path = '/sys/fs/cgroup/cpu/some-cgroup-name'
order by time desc
limit 10

I’m trying to translate this query to Flux, but I don’t know the correct way to call the two transformation functions in a pipeline. I’m trying to use a join:

cgroup = (path) =>
    |> range(start:-3m)
    |> filter(fn:(r) =>
         r._measurement == "cgroup"
         and r._field == "cpuacct.usage"
         and r.path == path

cpu_time = (path) => cgroup(path: path)
  |> difference()
  |> keep(columns: ["_time", "_value"])

elapsed_time = (path) => cgroup(path: path)
  |> elapsed(unit: 1ns)
  |> keep(columns: ["_time", "elapsed"])

my_cpu_time     = cpu_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")
my_elapsed_time = elapsed_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")

join(tables: {cpu: my_cpu_time, elapsed: my_elapsed_time}, on: ["_time"]) 
  |> map(fn: (r) => {
       x = r._value * 100
       y = r.elapsed
       return {_value: x/y}
  |> sort(columns: ["_time"], desc: true)
  |> limit(n: 10)

Is this the correct way to replicate the Influxql query? I’m multiplying by 100 and doing integer division because if I use the float() function, I get “Error: unknown server error: 500 Internal Server Error”.

So I guess I have two questions: Is this how the above Influxql query should be written in Flux, and how do I do the float conversion correctly?


I’ve just noticed that the internal server error happens when running the query in the influxdb console, but it works via an API request using curl:

$ curl -XPOST localhost:8086/api/v2/query -sS \
  -H 'Accept: application/csv' \
  -H 'Content-type: application/vnd.flux' \
  -H 'Authorization: Token user:pass' \
  -d 'cgroup = (path) =>
        from(bucket: "telegraf")
          |> range(start: -5m)
          |> filter(fn: (r) =>
               r._measurement == "cgroup"
               and r._field == "cpuacct.usage"
               and r.path == path

      cpu_time = (path) => cgroup(path: path)
        |> difference()
        |> keep(columns: ["_time", "_value"])

      elapsed_time = (path) => cgroup(path: path)
        |> elapsed(unit: 1ns)
        |> keep(columns: ["_time", "elapsed"])

      my_cpu_time     = cpu_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")
      my_elapsed_time = elapsed_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")

      join(tables: {cpu: my_cpu_time, elapsed: my_elapsed_time}, on: ["_time"])
        |> map(fn: (r) =>
             ({_value: float(v: r._value) / float(v: r.elapsed) * 100.0, _time: r._time})
        |> sort(columns: ["_time"], desc: true)'


Hello @andrenth,
Welcome! Thanks for your question. I believe you’re over complicating the query a little? Unless I’ve misunderstood you.
I think you want something like the following:

data = from(bucket: "System")
  |> range(start: 2020-08-05T17:31:56Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_user")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> elapsed(unit: 1s)
  |> difference(nonNegative: false, columns: ["_value"])
  |> map(fn: (r) => ({ r with perent: r._value * float(v: r.elapsed) * 100.0 }))
  |> yield()

I believe you’re over complicating the query a little?

It would seem your belief is correct :slight_smile:

I got the expected results with the query below. Thanks!

from(bucket: "telegraf")
  |> range(start: -5m)
  |> filter(fn: (r) =>
       r._measurement == "cgroup"
       and r._field == "cpuacct.usage"
       and r.path == "/sys/fs/cgroup/cpu/some-cgroup-name"
  |> elapsed(unit: 1ns)
  |> difference(nonNegative: false, columns: ["_value"])
  |> map(fn: (r) => ({
       _time: r._time, 
       _value: float(v: r._value) / float(v: r.elapsed) * 100.0 
  |> sort(columns: ["_time"], desc: true) 

Do you know if there’s a performance penalty to multiple filter() calls as opposed to a single one as I did above?

This topic was automatically closed 60 minutes after the last reply. New replies are no longer allowed.