How to reference yielded streaming Data to use more functions on it

I’m currently working on a query, which will output the losses of some machinery. I’m using the difference of the input power and the output power to calculate the loss. I want to get the losses of multiple machines and add them all together. Here is what I have:

from(bucket: "someBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  ...
  |> filter(fn: (r) => r["name"] == "machine1input" or r["name"] == "machine1output")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  // get positive values
  |> map(fn: (r) => ({ r with _value: math.abs(x: r._value) }))
  // group them by each timestamp
  |> group(columns: ["_time"], mode:"by")
  // get the difference
  |> difference(nonNegative: false, columns: ["_value"])
  // use only positive values
  |> map(fn: (r) => ({ r with _value: math.abs(x: r._value) }))
  // group them by "name" which does nothing in this step, but is needed
  // to know which difference corresponds to which machine later on
  |> group(columns: ["name"], mode: "by")
  |> yield(name: "diff_machine1")

from(bucket: "someBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  ...
  |> filter(fn: (r) => r["name"] == "machine2input" or r["name"] == "machine2output")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> map(fn: (r) => ({ r with _value: math.abs(x: r._value) }))
  |> group(columns: ["_time"], mode:"by")
  |> difference(nonNegative: false, columns: ["_value"])
  |> map(fn: (r) => ({ r with _value: math.abs(x: r._value) }))
  |> group(columns: ["name"], mode: "by")
  |> yield(name: "diff_machine2")

...

In total, I am calculating the losses of twelve machines. In the UI using the table visualization, I can see all the tables and entries of all the different losses. Now the tricky part:
I want to pick up all the Data I yielded and execute some functions on it. something like

currentStreamedData
  |> group(columns: ["_time"], mode: "by")
  // and so on

but I haven’t found any information on how to do something like this. My only other idea would be saving all the tables in variables and joining them together one by one, but I think there must be a more elegant way to do this.

Appreciate any help!

@brimstone If you’re going to do it per machine (one stream of tables per machine), variable assignment is the way to do it. But I don’t think you need to do it that way. I think you can do what you’re trying to do with a single stream and pivot().

import "math"
import "regexp"

from(bucket: "someBucket")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    // ...
    |> filter(fn: (r) => r["name"] =~ "machine[1-2](input|output)")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
    |> map(fn: (r) => ({r with _value: math.abs(x: r._value)}))
    // remap each row with a machine tag and input or output field based on 'name'
    |> map(
        fn: (r) => {
            machine = regexp.findString(r: /machine\d/, v: r.name)
            field = regexp.findString(r: /input|output/, v: r.name)

            return {r with machine: machine, _field: field}
        },
    )
    |> pivot(rowKey: ["_time", "machine"], columnKey: ["_field"], valueColumn: "_value")
    // get the difference and make it positive
    |> map(fn: (r) => ({r with _value: math.abs(x: r.input - r.output)}))

This should output a single stream of tables with a table for each machine and the difference between input and output per timestamp.

@scott Thanks for your answer. I thought about using pivot() to reorganize the table, but haven’t thought about using regexes to do so. I have to adjust the regex in my case, because unfortunately, in reality, the names of the machines are not that straight forward :sweat_smile: Thanks again, it helped me a lot.

1 Like