Windowed Delta (Energy consumption)

Hello, Influx Community

My team and I are developing an application that gathers sensor and metering (cumulative) information from heating controllers.
One of the features we offer to our customers is exporting some of the data is gathered from their devices as a user-defined CSV of windowed aggregations on some of that sensor and cumulative data.
That is, the customer selects which data to get (sensor, cumulative or a mixture), the overal period (start and stop), the window size and the aggregation to perform on the data (min, max, mean, first, last or delta).

Data comes in this format:

11111111111111111111111111111111,deviceId=237140 EM1_Energy=124642,EM1_Energy_Tariff1=0,EM1_Energy_Tariff2=0,EM1_Flow=361,EM1_Flow_Temperature=64.18,EM1_Power=8.1,EM1_Return_Temperature=44.44,EM1_Serial=224212,EM1_Uptime=3192,EM1_Volume=4237,HC1_FlowTemperature=34.64,HC1_FlowTemperature_Ref=11,HC1_ReturnTemperature=30.96,HC1_ReturnTemperature_Extra=192,HC1_ReturnTemperature_Ref=40,HC1_RoomTemperature=192,HC1_RoomTemperature_Limit=18,HC2_FlowTemperature=49.3,HC2_FlowTemperature_Ref=52,HC2_ReturnTemperature=192,HC2_ReturnTemperature_Extra=0.01,HC2_ReturnTemperature_Ref=50,Outdoor_Temperature=27.64,Primary1_FlowTemperature=192,Primary1_ReturnTemperature=192 1625492641215000000

Currently such feature is implemented using an InfluxQL query executed in our Nodejs application that gets the data for the export.

select 
   Last("EM1_Energy") AS "1",
   First("EM1_Energy")) AS "2",
   (Last("EM1_Energy") - First("EM1_Energy")) AS "3",
   Mean("Outdoor_Temperature") as "4" 
from "11111111111111111111111111111111"
where ("deviceId" = '179474' or "deviceId" = '237140')
and time >= 1617235200000ms and time <= 1617321599999ms
group by "deviceId",time(60m) fill(previous)

In this example, we would be exporting energy consumption for an energy meter (delta energy) alongside the values for the energy meter at the beginning and the end of each window as well as average sensor data in the hourly window.

We could came up with this flux query:

every = duration(v: "60m")

data = () => from(bucket: "dev_readouts/high")
    |> range(start: time(v: "2021-04-01T00:00:00.000Z"), stop: time(v: "2021-04-01T23:59:59.999Z"))
    |> filter(fn: (r) => r._measurement == "11111111111111111111111111111111")
    |> filter(fn: (r) => r.deviceId == "179474" or r.deviceId == "237140")

t1 = data()
    |> filter(fn: (r) => r._field == "EM1_Energy")
    |> aggregateWindow(every: every, fn: last, timeSrc: "_stop")
    |> set(key: "_field", value: "1")

t2 = data()
    |> filter(fn: (r) => r._field == "EM1_Energy")
    |> aggregateWindow(every: every, fn: first, timeSrc: "_stop")
    |> set(key: "_field", value: "2")

t3_first = data()
    |> filter(fn: (r) => r._field == "EM1_Energy")
    |> aggregateWindow(every: every, fn: first, timeSrc: "_stop")
	|> set(key: "_field", value: "_first")
t3_last = data()
    |> filter(fn: (r) => r._field == "EM1_Energy")
    |> aggregateWindow(every: every, fn: last, timeSrc: "_stop")
	|> set(key: "_field", value: "_last")
t3 = union(tables: [t3_first, t3_last])
    |> pivot(
       rowKey:["deviceId", "_time"],
        columnKey: ["_field"],
        valueColumn: "_value"
    )
    |> map(fn: (r) => ({ r with _value: r._last - r._first, _field: "3" }))
    |> drop(columns: ["_first", "_last"])

t4 = data()
    |> filter(fn: (r) => r._field == "Outdoor_Temperature")
    |> aggregateWindow(every: every, fn: mean, timeSrc: "_stop")
    |> set(key: "_field", value: "4")

union(tables: [t1, t2, t3, t4])
    |> pivot(
        rowKey:["deviceId", "_time"],
        columnKey: ["_field"],
        valueColumn: "_value"
    )
    |> group(columns: ["deviceId"], mode:"by")
    |> drop(columns: ["_start", "_stop", "_measurement"])
    |> rename(columns: {_time: "_t", deviceId: "_d"})
    |> sort(columns: ["_t"])
    |> yield(name: "detailed")

The query sort of “works” and gives us the data we need in the shape we want EXCEPT when we include the delta calculation for cumulative values.

When deleta calculation is included, the results are “missaligned” and the delta values appears in a different row that is expected with all the other values as null and viceversa, the non-delta values appear all in their own row with null delta.
An example of that is:

How can we calculate the windowed consumption and get all the values (delta or otherwise) as columns?

Thanks in advance

Hello @danielgonnet,
I’m going to try translating your influxql query first and see if that helps you and then if not debugging your flux script.

select 
   Last("EM1_Energy") AS "1",
   First("EM1_Energy")) AS "2",
   (Last("EM1_Energy") - First("EM1_Energy")) AS "3",
   Mean("Outdoor_Temperature") as "4" 
from "11111111111111111111111111111111"
where ("deviceId" = '179474' or "deviceId" = '237140')
and time >= 1617235200000ms and time <= 1617321599999ms
group by "deviceId",time(60m) fill(previous)

I got:

import "experimental"
import "influxdata/influxdb/schema"

data = from(bucket: "cpu")
  |> range(start: 2022-02-15T16:00:00.000Z, stop: 2022-02-15T16:02:00.000Z)
  |> filter(fn: (r) => r["_field"] == "usage_system" or r["_field"] == "usage_user" )
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["cpu"] == "cpu-total" or r["cpu"] == "cpu0")
  |> yield(name: "data")

prep = data 
|> filter(fn: (r) => r["_field"] == "usage_user")
|> window(every: 60s) 
|> experimental.fill(usePrevious: true)
// |> yield(name: "prep")

lastE = prep |> last() 
|> set(key: "selection", value: "last E in window") 
// |> yield(name: "1") 

firstE = prep 
|> first() 
|> set(key: "selection", value: "first E in window") 
// |> yield(name: "2") 

firstLastdiff = 
union(tables: [lastE, firstE])
|> difference(nonNegative: false, columns: ["_value"])
// |> yield(name: "3")

meanOutdoorTemp = data 
|> filter(fn: (r) => r["_field"] == "usage_system")
|> window(every: 60s) 
|> experimental.fill(usePrevious: true)
|> mean()
// |> yield(name: "4")

Where you’ll want to uncomment the yield functions. And replace your tags and two fields. I use usage_user instead of EM1_Energy and usage_system instead of Outdoor_Temperature.

Where my data is:

import "csv"

csvData = "#group,false,false,true,true,false,false,true,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
#default,data,,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:00Z,3.716216216215638,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:20Z,3.993986673166662,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:40Z,3.3034784419923344,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:00Z,3.6254531816471727,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:20Z,4.563070383797323,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,0,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:40Z,3.2415316028496495,usage_system,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:00Z,12.049999999999272,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:20Z,12.456228114056076,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:40Z,11.344327836080714,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:00Z,11.9500000000005,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:20Z,13.656828414211072,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,1,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:40Z,10.994502748624795,usage_system,cpu,cpu0,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:00Z,8.587068080737994,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:20Z,9.479115878431193,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:40Z,7.669235575923854,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:00Z,8.463557944741813,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:20Z,9.530357961410974,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,2,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:40Z,8.716303487354386,usage_user,cpu,cpu-total,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:00Z,25.74999999999818,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:20Z,27.263631815907136,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:00:40Z,24.23788105946734,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:00Z,25.89999999999918,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:20Z,26.463231615815236,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local
,,3,2022-02-15T16:00:00Z,2022-02-15T16:02:00Z,2022-02-15T16:01:40Z,27.23638180909404,usage_user,cpu,cpu0,Anaiss-MacBook-Pro.local"

csv.from(
  csv: csvData,
)

Let me know if that helps!

You can also use rename() if you want to change not just the result name to “1”, “2”, “3”, and “4” but the actual field value

We will try your suggestions and come back when we have.

Thanks

1 Like

We realized that the way we were calculating Deltas (consumptions) was not entirely correct as we were not accounting from readings that may occur from the beginning of the initial bucket to the first data captured.

As a result, we ended up calculating deltas as the difference between last bucket values (and fetching an extra initial bucket to fill in the initial gap).

Then, a delta column is calculated like so:

t3 = data()
	|> filter(fn: (r) => r._field == "EM1_Energy")
    |> aggregateWindow(timeSrc: "_stop", every: duration(v: "60m"), fn: last)
	|> difference(nonNegative: true)
    |> set(key: "_field", value: "1")

Data queried in this manner “aligns” perfectly with the other columns when pivoting.

Thanks for all the ideas.