Calculate usage today so far from incrementing total

Recently I transitioned my home automation to InfluxDB 2 to gain experience with Influx 2 so I can also better consult my customers about it.
Before I was running InfluxDB 1.8 in a TICK stack, but professionally I have more experience with Prometheus+Grafana.

The challenge:
I have hooked up my electric Smart Meter to MQTT to monitor energie and gas consumption/production. It sends out measurements multiple times per minute in absolute total values.
What I would like, it to know how much energie I have used ‘today’ and compare it with ‘yesterday’ en the same day last week (so 7 days ago).

Although this is quite trivial with Prometheus+Grafana I cannot find out how to do this with Flux on a InfluxDB 2 dashboard.

  • From this incrementing ‘total’ value, how to calculate the usage per day?
  • And how to get this usage to ‘today so far’?


@diversit Flux 0.116.0 just introduced a today() function that would help in your use case, but right now, 0.116.0 is only available in InfluxDB Cloud (unless you build InfluxDB OSS from source). Either way, the today function is pretty simple to replicate. You can also use some functions in the experimental package to help as well:

import "date"
import "experimental"

today = date.truncate(t: now(), unit: 1d)
yesterday = { start: date.truncate(t: -24h, unit: 1d), stop: experimental.subDuration(d: 1ns, from: today) }
weekAgo = { start: experimental.subDuration(d: 6d, from: yesterday.start), stop: experimental.subDuration(d: 6d, from: yesterday.stop) }

todayData = from(bucket: "example-bucket" |> range(start: today)
yesterdayData = from(bucket: "example-bucket" |> range(start: yesterday.start, stop: yesterday.stop)
weekAgoData = from(bucket: "example-bucket" |> range(start: weekAgo.start, stop: weekAgo.stop)

union(tables: [todayData, yesterdayData, weekAgoData])
  // If you want to align data to the same time frame in your visualization
  |> experimental.alignTime(alignTo: today)

@scott Selecting the right data seems to work. Thanks.
I still cannot figure out how to calculate the usage though.
The metrics is an incrementing total. Only changed values are reported, so they don’t come at regular intervals. The latest value might be >1 hour ago. I saw a case yesterday where the first value was around 02:12AM en the previous value was at 22:45PM the day before.

I have tried the functions window, aggregateWindow, difference, derivative, aggregate.rate but neither seem to give the wanted result: the difference between the latest en first value of the stream.
The challenge also seems to be that it should also include the latest value of the previous day.

The closest solution was to have different queries to get the last value of yesterday and last value of today, then the streams have to be merged and calculate the difference with a map function. But this is quite cumbersome.
Calculating the difference between the first and last value of a stream seems to me something that is wanted quite often so it should be simpler to do this, or not?

Any ideas how to do this?

So you tried something like this (after your range and filters):

  |> aggregateWindow(every: 1d, fn: last, createEmpty: false)
  |> difference(nonNegative: false, columns: ["_value"])

and this doesn’t get you what you are looking for?


If you want to find the Energy (kWh) consumption per day wise.
You can use it… kWh value is always incremental

SELECT (max("kWh")) - (min("kWh")) FROM "EnergyMeters" WHERE ("meter" = 'Meter1') AND $timeFilter GROUP BY time(24hr) fill(null)

@diversit Flux / InfluxDB doesn’t support returning values outside of the queried time range. All queries are time-bound, so it currently isn’t possible to get the latest reported point if that point falls outside of the queried time range. There are a few issues on both the Flux and InfluxDB repos requesting this functionality.

Something you can do is “seed” the query with additional data, use aggregateWindow to create empty values at regular intervals, fill the null values, then re-range the data to today. For example:

todayData = from(bucket: "example-bucket")
  |> range(start: -2d)
  |> filter(...)
  |> aggregateWindow(every: 5m, fn: max, createEmpty: true)
  |> fill(usePrevious: true)
  |> range(start: today())

With an incrementing counter and you just trying to calculate the difference between starting and ending values, I’d just use spread():

todayData = from(bucket: "example-bucket")
  |> range(start: -2d)
  |> filter(...)
  |> aggregateWindow(every: 5m, fn: max, createEmpty: true)
  |> fill(usePrevious: true)
  |> range(start: today())
  |> spread()

Thanks all for the tips.
aggregateWindow and difference did not work for me.
I did not know about the spread() function. Using it in combination with aggregateWindow seems to give me more or less the info I’d like.
Using a Flux query like below for gauges:

import "date"

today = date.truncate(t: now(), unit: 1d)

from(bucket: "mqtt")
  |> range(start: today)
  |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
  |> filter(fn: (r) => r["topic"] == "esp-dsmr/fdc4f6/gas/total")
  |> toFloat()
  |> aggregateWindow(every: 1d, fn: spread, createEmpty: true)

Changing the range to multiple days, and changing the aggregateWindow.every to 1h, seem to give nice graphs also.
Since InfluxDB 2 dashboard does not support bar charts yet, created some bar charts in Grafana using queries like this. Works well.


No problem. Happy to help!