Flux: operations on multiple transformations

Hello

I have some cpu time measurements from the Telegraf cgroup plugin, which are given in number of nanoseconds of cpu time up to the measurement time. So to obtain a measure of percent cpu usage, I need to use the result of the difference() function divided by the result of elapsed(). In Influxql, I can do it like this:

select difference(cpuacct.usage)/elapsed(cpuacct.usage) * 100
from cgroup
where path = '/sys/fs/cgroup/cpu/some-cgroup-name'
order by time desc
limit 10

I’m trying to translate this query to Flux, but I don’t know the correct way to call the two transformation functions in a pipeline. I’m trying to use a join:

cgroup = (path) =>
  from(bucket:"telegraf")
    |> range(start:-3m)
    |> filter(fn:(r) =>
         r._measurement == "cgroup"
         and r._field == "cpuacct.usage"
         and r.path == path
       )

cpu_time = (path) => cgroup(path: path)
  |> difference()
  |> keep(columns: ["_time", "_value"])

elapsed_time = (path) => cgroup(path: path)
  |> elapsed(unit: 1ns)
  |> keep(columns: ["_time", "elapsed"])

my_cpu_time     = cpu_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")
my_elapsed_time = elapsed_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")

join(tables: {cpu: my_cpu_time, elapsed: my_elapsed_time}, on: ["_time"]) 
  |> map(fn: (r) => {
       x = r._value * 100
       y = r.elapsed
       return {_value: x/y}
     })
  |> sort(columns: ["_time"], desc: true)
  |> limit(n: 10)

Is this the correct way to replicate the Influxql query? I’m multiplying by 100 and doing integer division because if I use the float() function, I get “Error: unknown server error: 500 Internal Server Error”.

So I guess I have two questions: Is this how the above Influxql query should be written in Flux, and how do I do the float conversion correctly?

Thanks

I’ve just noticed that the internal server error happens when running the query in the influxdb console, but it works via an API request using curl:

$ curl -XPOST localhost:8086/api/v2/query -sS \
  -H 'Accept: application/csv' \
  -H 'Content-type: application/vnd.flux' \
  -H 'Authorization: Token user:pass' \
  -d 'cgroup = (path) =>
        from(bucket: "telegraf")
          |> range(start: -5m)
          |> filter(fn: (r) =>
               r._measurement == "cgroup"
               and r._field == "cpuacct.usage"
               and r.path == path
             )

      cpu_time = (path) => cgroup(path: path)
        |> difference()
        |> keep(columns: ["_time", "_value"])

      elapsed_time = (path) => cgroup(path: path)
        |> elapsed(unit: 1ns)
        |> keep(columns: ["_time", "elapsed"])

      my_cpu_time     = cpu_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")
      my_elapsed_time = elapsed_time(path: "/sys/fs/cgroup/cpu/some-cgroup-name")

      join(tables: {cpu: my_cpu_time, elapsed: my_elapsed_time}, on: ["_time"])
        |> map(fn: (r) =>
             ({_value: float(v: r._value) / float(v: r.elapsed) * 100.0, _time: r._time})
           )
        |> sort(columns: ["_time"], desc: true)'

#datatype,string,long,dateTime:RFC3339,double
#group,false,false,false,false
#default,_result,,,
,result,table,_time,_value
,,0,2020-09-03T17:43:00Z,462.36519068666666
,,0,2020-09-03T17:42:45Z,482.74031537999997
,,0,2020-09-03T17:42:30Z,613.1715823866666
...

Hello @andrenth,
Welcome! Thanks for your question. I believe you’re over complicating the query a little? Unless I’ve misunderstood you.
I think you want something like the following:

data = from(bucket: "System")
  |> range(start: 2020-08-05T17:31:56Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_user")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> elapsed(unit: 1s)
  |> difference(nonNegative: false, columns: ["_value"])
  |> map(fn: (r) => ({ r with perent: r._value * float(v: r.elapsed) * 100.0 }))
  |> yield()

I believe you’re over complicating the query a little?

It would seem your belief is correct :slight_smile:

I got the expected results with the query below. Thanks!

from(bucket: "telegraf")
  |> range(start: -5m)
  |> filter(fn: (r) =>
       r._measurement == "cgroup"
       and r._field == "cpuacct.usage"
       and r.path == "/sys/fs/cgroup/cpu/some-cgroup-name"
     )
  |> elapsed(unit: 1ns)
  |> difference(nonNegative: false, columns: ["_value"])
  |> map(fn: (r) => ({
       _time: r._time, 
       _value: float(v: r._value) / float(v: r.elapsed) * 100.0 
     }))
  |> sort(columns: ["_time"], desc: true) 

Do you know if there’s a performance penalty to multiple filter() calls as opposed to a single one as I did above?

This topic was automatically closed 60 minutes after the last reply. New replies are no longer allowed.