Calculate sum of multiple tags

Hi,

I have this base query:

from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
      r._measurement == "Power" and
      r._field == "value" and
      r.device == "device1"
      )
  |> aggregateWindow(every: v.windowPeriod, fn: max)
  |> map(fn: (r) => ({ _value:r._value, _time:r._time, _field:"Power (W)" }))

This returns the power for “device1”. I need to expand this and calculate the sum of multiple devices, for example “device1” + “device2” + “device3”.

For example, let’s say I execute this query 3 times but change “device1” each time. The results are the following:
device1: 50.15
device2: 18.00
device3: 0.1

The desired result would be 68.25

This seems like it should be very simple but I tried numerous things but so far could not come up with a solution that works.

Thanks!

I would appreciate your help… Maybe @Anaisdg could help?

Thanks!

Hello @Hukondora,
Can you try:

import "influxdata/influxdb/schema"

from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
      r._measurement == "Power" and
      r._field == "value" and
      r.device =~ /device[1-3]$/
    |> schema.fieldsAsCols()
    |> map(fn: (r) => ({ _sum_across_fields: r.device1 + r.device2 + r.device3}))

I’m assuming you want the sum of values across the same timestamp.
Otherwise if you want the total sum of all values from every timestamp and every device use group() to ungroup your data and then apply a sum(), like:

from(bucket: "system")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_user" or r["_field"] == "usage_system")
  |> filter(fn: (r) => r["cpu"] == "cpu2")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> group()
  |> sum(column: "_value")
  |> yield(name: "mean")

H @Anaisdg,

thanks for your reply. I tried your code but there is an issue (besides the missing “)”:

invalid: type error @9:8-9:29: expected [A] but found regexp (argument tables)

Not sure what that means. Also, the devices are not actually named like “device1”. So I replaced the code with:

import "influxdata/influxdb/schema"

from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) =>
      r._measurement == "Power" and
      r._field == "value" and
      (r.device == "someDevice" or r.device == "anotherDevice" or r.device == "andAnotherDevice")
    |> schema.fieldsAsCols()
    |> map(fn: (r) => ({ _sum_across_fields: r.someDevice+ r.anotherDevice+ r.andAnotherDevice}))
  )

Then the error changes to:

invalid: type error @9:8-9:29: expected [A] but found bool (argument tables)

Thank you for your help!

Hello @Hukondora,
Sorry you might want to use pivot instead:

import "influxdata/influxdb/schema"
from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "average_temperature")
  |> filter(fn: (r) => r["_field"] == "degrees")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> limit(n: 2)
  |> pivot(rowKey:["_time"], columnKey: ["location"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with sum: r.coyote_creek + r.santa_monica})) 

Since you’re pivoting on the tag and not the field key.
The above worked for me (I have two tags santa monica and coyote creek)

Hello @Anaisdg,

after changing the map() call to:

|> map(fn: (r) => ({ _value: r.someDevice+ r.anotherDevice+ r.andAnotherDevice, _time:r._time, _field:"Power (W)" }))

it appears to be working. There is one issue with this though… If there is no data available for one (ore more) devices in the specified time-range, it will not return any data for the whole set. For example, if “anotherDevice” has no data in the given time-range but the other 2 devices do, it will still not return anything because r.anotherDevice results in:

Cannot read property ‘type’ of undefined

How can I avoid this?

Thank you!

Hello @Hukondora,
Are you able to run the query up until the map? i.e. when you pivot do you have a column for andAnotherDevice but with null values?
if so you can replace null values with 0

  |> fill(column: "andAnotherDevice", value: 0.0)
  |> fill(column: "AnotherDevice", value: 0.0)
  |> fill(column: "Device", value: 0.0)

Hi @Anaisdg

I believe the answer to your question is no. The devices are specified through the “device” tag. Let’s say I select the last 30 days as the time-range but one of the devices in the list has just been added yesterday.

Here is my current query:

import "influxdata/influxdb/schema"
from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "Power")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
  |> pivot(rowKey:["_time"], columnKey: ["device"], valueColumn: "_value")
  |> map(fn: (r) => ({  _value: r.dishwasher + r.fridge + r.oven, _time:r._time, _field:"Power (W)" }))

The device “fridge” has just been added recently. If I select a range that contains data where “fridge” did not exist yet, it will not return the data for the other (existing) devices.

I tried adding fill() like so:

import "influxdata/influxdb/schema"
from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "Power")
  |> filter(fn: (r) => r["_field"] == "value")
  |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
  |> pivot(rowKey:["_time"], columnKey: ["device"], valueColumn: "_value")
  |> fill(column: "fridge", value: 0.0)
  |> map(fn: (r) => ({  _value: r.dishwasher + r.fridge + r.oven, _time:r._time, _field:"Power (W)" }))

but that errors out with:

invalid: runtime error @8:6-8:40: fill: fill column not found: fridge

Thanks!

ah okay…
in that case you’ll have to do some conditional mapping like so:

import "influxdata/influxdb/schema"
from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "average_temperature")
  |> filter(fn: (r) => r["_field"] == "degrees")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> limit(n: 2)
  |> pivot(rowKey:["_time"], columnKey: ["location"], valueColumn: "_value")
   |> map(fn: (r) => ({ r with device1: if exists r.device1
        then r.device1 else 0.0, device2: if exists r.device2
        then r.device2 else 0.0}))

After you pivot, but with your devices.

@Anaisdg How can I incorporate the _time:r._time, _field:"Power (W)" into the map() call (see in the query that I posted previously)? That is required for the data to be rendered properly.

I am also not sure if the map() call will work as expected if it contains “with” because it will return results for all existing devices, not just the specified ones. I had to adjust the map() call as shown in my previous response to avoid that. See:

For example, “entertainment” has not included in the query but is returned anyways (see above screenshot).

Thanks for your help!

You can include them like how you had before, but they’re redundant if you use r with.
Then you need to filter for the devices you want.

import "influxdata/influxdb/schema"
from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "Power")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["device"] == "dishwasher" or r["device"] == "fridge" or r["device"] == "oven")
  |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
  |> pivot(rowKey:["_time"], columnKey: ["device"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with dishwasher: if exists r.dishwasher
        then r.dishwasher else 0.0, fridge: if exists r.fridge
        then r.fridge, oven: if exists r.oven
        then r.oven else 0.0}))
  |> map(fn: (r) => ({  r with _value: r.dishwasher + r.fridge + r.oven, _time:r._time,  _field:"Power (W)" }))

There might be a typo in there hopefully not but I don’t have your dataset so it’s hard for me to test. The _time:r._time is redundant but of course you’re welcome to include it.

You can also grouping by time and then applying the sum() function and adding a column to set a new field and then ungroup. Up to you.

from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "Power")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["device"] == "dishwasher" or r["device"] == "fridge" or r["device"] == "oven")
  |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
  |> group(columns: ["_time"], mode:"by")
  |> sum()
  |> set(key: "_field", value: "Power (W)")
  |> group()

But this solution assumes that all your fields are writing data at the same timestamps. If theyre not and you need to normalize them you can use
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/built-in/transformations/truncatetimecolumn/

I just tried both of your queries, the first one does not group the results properly.

The second one appears to be working much better, the only issue is that the returned data is labeled as “_value”, despite the value: "Power (W)". Any idea why?

And just to make sure: You wrote, that the data needs to have the same timestamp. Does it have to be the exact same one or within the specified aggregateWindow (or more specifically: v.windowPeriod)?

I fetch and send the data to Influx every 30 seconds but it takes a bit until all devices have been processed, so the timestamp is not exactly the same one. It seems to work, but not sure if that is just luck.

Thanks again

Hello @Hukondora,
What do you mean by the first one doesn’t group the results properly? There isn’t a group.
We set the field to power W not the value. You can rename the value column if you want.

Hi @Anaisdg,

sorry if I worded it poorly, English is not my mother tongue. What I mean with “grouping” is that the result (the sum of the specified devices) is returned as a single value. If you look at the screenshot in my previous reply, you will see that there are many separate results returned (all the different colored labels) when using your first query.

Your second query only returns a single value as desired. But the label states “_value” instead of “Power (W)”.

I figured out for your second query though. This sets the correct label:

from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "Power")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["device"] == "dishwasher" or r["device"] == "fridge" or r["device"] == "oven")
  |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
  |> group(columns: ["_time"], mode:"by")
  |> sum()
  |> group()
  |> rename(columns: {_value: "Power (W)"})

So the only question remaining is the one regarding the timestamp:

You wrote, that the data needs to have the same timestamp. Does it have to be the exact same one or within the specified aggregateWindow (or more specifically: v.windowPeriod )?

I fetch and send the data to Influx every 30 seconds but it takes a bit until all devices have been processed, so the timestamp is not exactly the same one. There will usually be a difference of a couple of seconds between the first processed device and the last one.

You mentioned the truncateTimeColumn function. Should it be applied in this case? I looked at documentation for it but quite honestly don’t really understand how it would be used in this case.

Thanks again for all you help…

Have a great weekend! :grinning_face_with_smiling_eyes: