Flux max() - min() for two different tags aggregated per day

Hello,

I’m an old school relational SQL guy. I’m trying to build an energy dashboard on grafana using flux. Home assistant is feeding influxdb 2.0. I have a kWh measurement and entity_id tag (among other tags irrelevant to this). I have two measurements on my smart meter and that’s why there’re 2 entity_id tags.

This is my data, for simplicity, let’s assume _time is hours on same day, but I have this every 30 seconds or so…

_time | entity_id | _measurement
1h | power_consumption_1 | 1000
1h | power_consumption_1 | 50
2h | power_consumption_1 | 1010
2h | power_consumption_1 | 50
3h | power_consumption_1 | 1020
3h | power_consumption_1 | 50
4h | power_consumption_1 | 1020
4h | power_consumption_1 | 60
5h | power_consumption_1 | 1020
5h | power_consumption_1 | 70

I came to this working monstrous query:

data = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> drop(columns: ["source", "friendly_name"])

min1 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: min)

max1 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: max)

min2 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: min)

max2 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: max)

join1 = join(tables: {min1: min1, max1: max1}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_max1 - r._value_min1 }))

join2 = join(tables: {min2: min2, max2: max2}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_max2 - r._value_min2 }))

join(tables: {join1: join1, join2: join2}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_join1 + r._value_join2 }))
  |> drop(columns: ["_value_join1", "_value_join2", "_value_max1", "_value_max2", "_value_min1", "_value_min2"])

Now, this is working, it’s building the table I want. But, there should be a simpler/faster/optimized way, can somebody help me here?

Best regards,
Trunet

I managed to optimize a little bit. Are there better optimization here?

data1 = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)

data2 = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)

join(tables: {data1: data1, data2: data2}, on: ["_start", "_stop", "_field", "_measurement", "domain", "_time"])
  |> map(fn: (r) => ({ r with _value : r._value_data1 + r._value_data2 }))

Hello @trunet,
First of all thanks for giving Flux a try. It definitely takes a second to transition your thinking. Can you try the following?

 from(bucket: "homeassistant")
 |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1" or r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)
|> group() 
|> pivot(rowKey:["_time"], columnKey: ["entity_id"], valueColumn: "_value")
|> map(fn: (r) => ({ r with _value : r.energy_consumption_tarif_1 + r.energy_consumption_tarif_2 }))

Please let me know if it doesn’t work or explain further.

This worked perfectly, thank you. Didn’t know about the pivot() function.

Hi, I have grouped together several measurements into one table using ‘pivot’, but is there a way to select the maximum of each row using ‘map’? I have 8 measurements now per time sample, and would like to only select the maximum measurement per sample.

The ‘max’ function looks to be only used to calculate the maximum value in a column and not a row-wise operation.

What are you looking for as your output? Do you just want the value or do you need the value and column heading?

Really just looking for the value.

Planning on calculating the max/min values and then doing a join of those two queries and a map to calculate the difference between them. If there’s a better way to do this in Flux, I’m all ears…

Thanks

@Anaisdg - could you please assist? Looks like you were able to resolve the previous issue easily.

I am thinking this should be a basic functionality and I’m wondering if I’m missing something that’s available in Flux.

Thanks

If you want the max of these 8 measurements per time sample, you would use the aggregateWindow() function without grouping or pivoting.

 from(bucket: "mybucket")
 |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
 |> filter(fn: (r) => r["_measurement"] == "1" or  r["_measurement"] == "2"... or  r["_measurement"] == "8")
 |> aggregateWindow( every: 1m, fn: mean) 

I am already doing an aggregate window, but this provides the max for each separate measurement per time window. What I’m hoping to accomplish is to only return the maximum of all eight measurements.

from(bucket: "bucketName")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "Battery Container" and
    (r._field == "Temperature3" or r._field == "Temperature4" or r._field == "Temperature5" or r._field == "Temperature6" 
    or r._field == "Temperature11" or r._field == "Temperature12" or r._field == "Temperature13" or r._field == "Temperature14") and 
    r.instance == "${container}")
  |> aggregateWindow(every: 60s, fn: mean, createEmpty: false)
  |> drop(columns: ["source"])

Ah I added group() command and this seems to do what I was looking for facepalm

Thanks for responding

1 Like

@epraetor_flux
face palm is my mo, that’s what we’re here for. Out of curiosity what are you using Influx for?
Thanks for sharing your solution.

Visualizing battery energy storage data with Grafana