Flux max() - min() for two different tags aggregated per day

Hello,

I’m an old school relational SQL guy. I’m trying to build an energy dashboard on grafana using flux. Home assistant is feeding influxdb 2.0. I have a kWh measurement and entity_id tag (among other tags irrelevant to this). I have two measurements on my smart meter and that’s why there’re 2 entity_id tags.

This is my data, for simplicity, let’s assume _time is hours on same day, but I have this every 30 seconds or so…

_time | entity_id | _measurement
1h | power_consumption_1 | 1000
1h | power_consumption_1 | 50
2h | power_consumption_1 | 1010
2h | power_consumption_1 | 50
3h | power_consumption_1 | 1020
3h | power_consumption_1 | 50
4h | power_consumption_1 | 1020
4h | power_consumption_1 | 60
5h | power_consumption_1 | 1020
5h | power_consumption_1 | 70

I came to this working monstrous query:

data = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> drop(columns: ["source", "friendly_name"])

min1 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: min)

max1 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: max)

min2 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: min)

max2 = data
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["entity_id"])
  |> aggregateWindow(every: 1d, fn: max)

join1 = join(tables: {min1: min1, max1: max1}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_max1 - r._value_min1 }))

join2 = join(tables: {min2: min2, max2: max2}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_max2 - r._value_min2 }))

join(tables: {join1: join1, join2: join2}, on: ["_start", "_stop", "_time", "_field", "_measurement", "domain"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_join1 + r._value_join2 }))
  |> drop(columns: ["_value_join1", "_value_join2", "_value_max1", "_value_max2", "_value_min1", "_value_min2"])

Now, this is working, it’s building the table I want. But, there should be a simpler/faster/optimized way, can somebody help me here?

Best regards,
Trunet

I managed to optimize a little bit. Are there better optimization here?

data1 = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)

data2 = from(bucket: "homeassistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)

join(tables: {data1: data1, data2: data2}, on: ["_start", "_stop", "_field", "_measurement", "domain", "_time"])
  |> map(fn: (r) => ({ r with _value : r._value_data1 + r._value_data2 }))

Hello @trunet,
First of all thanks for giving Flux a try. It definitely takes a second to transition your thinking. Can you try the following?

 from(bucket: "homeassistant")
 |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "kWh")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["entity_id"] == "energy_consumption_tarif_1" or r["entity_id"] == "energy_consumption_tarif_2")
  |> drop(columns: ["source", "friendly_name"])
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: false)
|> group() 
|> pivot(rowKey:["_time"], columnKey: ["entity_id"], valueColumn: "_value")
|> map(fn: (r) => ({ r with _value : r.energy_consumption_tarif_1 + r.energy_consumption_tarif_2 }))

Please let me know if it doesn’t work or explain further.

This worked perfectly, thank you. Didn’t know about the pivot() function.