How to get scalar from one query and use in another

I am trying to something extremely simple - I want to divide a value from a measure by the value from another measure, but the numerator is a scalar. Specifically I tried this and am just met with errors (Error: error in evaluating AST while starting program: type error 10:6-10:33: missing object properties (schema)):

n_cpus = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "system" and (r._field == "n_cpus" and r.host == "x"))
  |> last()
  |> limit(n:1)
  |> getColumn(column: "_value")
  |> getRecord(idx: 0)

from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "cpu" and (r._field == "usage_idle" and r.host == "x"))  
  |> window(every: 10s)
  |> mean()  
  |> map(fn: (r) => ({
      r with
      _value: (100.0 - r._value) / float(v: n_cpus[0])
    }))
  |> group(columns: ["_time", "_start", "_stop", "_value"], mode: "except")

Any help? What am I doing wrong? The system does not like getColumn or getRecord, the two functions I thought would help me get a scalar from the stream.

Anyone? I just want to have a way in influxdb using either InfluxQL or Fluxlang to divide one series by a constant value that comes from another measurement. In this case, I want to divide CPU usage by number of cores.

I was using Kapacitor/Ticksript .

This may give you an idea, though i was doing it for Process.
Objective is to get .exe(executable) cpu usage divided by the number of cpu present on the server and then graph it out on Grafana.

Though stream would be better for alerting but if you care to just store the aggregated values then batch would be fine.

Example:
If abc.exe shows 20% of cpu usage on a server which has 4 cpus then do the following math 20\4 = 5% (cpu usage of abc.exe is 5%)
Below tickscript also groups all the similar processes using regex.

    var process = batch
        |query('SELECT Percent_Processor_Time as cpu_time FROM "test_server"."autogen"."Process" WHERE "instance" =~ /^WmiPrvSE*/')
            .period(1m)
            .every(1m)
            .groupBy('host')
        |sum('cpu_time')
            .as('cpu_time')

// This is to get number of cpus from each host
var system = batch
    |query('SELECT n_cpus as n_cpus FROM "test_server"."autogen"."system"')
        .period(1m)
        .every(1m)
        .groupBy('host')
    |last('n_cpus')
        .as('n_cpus')

var joined_data = process
    |join(system)
        .as('process', 'system')
        .fill(0.0)

var perform = joined_data
    |eval(lambda: "process.cpu_time" / "system.n_cpus")
        .as('mean')
        .keep()
    |influxDBOut()
        .database('test_server')
        .measurement('perform')
        .tag('instance', 'Wmi*')

Thanks, that helped put me on the right track.

@pwnell There’s a bug with the stream and table functions that has been fixed in the latest version of Flux, but it isn’t available in InfluxDB 1.x yet. A never version of Flux will get rolled out with InfluxDB 1.7.9.

Flux outputs a stream of tables. The purpose of stream and table functions is to extract tables, rows, and columns out of the stream. To do this, you first have to extract a single table with tableFind(). After that, you can extract values from a column using the getColumn() function. This returns an array of values from which you can extract a value at a specified index.

n_cpus = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "system" and r.host == "x")
  |> tableFind(fn: (key) => key._field == "n_cpus")
  |> getColumn(column: "_value")

from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "cpu" and (r._field == "usage_idle" and r.host == "x"))  
  |> window(every: 10s)
  |> mean()  
  |> map(fn: (r) => ({
      r with
      _value: (100.0 - r._value) / float(v: n_cpus[0])
    }))
  |> group(columns: ["_time", "_start", "_stop", "_value"], mode: "except")

The only thing you were really missing is using tableFind() to extract a table from the stream of tables.

Note: This may not work for you until the next version of InfluxDB rolls out with a newer version of Flux.

Thanks I’ll give that a shot.