How to get a cumulative sum of a product from different series?

I had some time and tried to better understand that flux query. And now I kind of made it work … but only kind of:

  • UNION is not what I want … JOIN is what I needed
  • I found fill(usePrevious: true) to help with my very sparsely populated series costKwh
  • I had to drop some columns in my costKwh series as my manual entries (using http api) I added into that series do not perfectly match with home assistant created ones and so get handled differently however

Here is the resulting query:

costKwh =
  from(bucket: "data")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "EUR")
    |> filter(fn: (r) => r["friendly_name"] == "Strom_Preis_pro_kWh")
    |> filter(fn: (r) => r["_field"] == "value")
    // unify manual entries with entries from HA
    |> drop(columns: ["device_class_str","state_class_str"])
    |> aggregateWindow(every: 1d, fn: last, createEmpty: true)
    |> fill(usePrevious: true)

selfUsedKwh =
  from(bucket: "data")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "kWh")
    |> filter(fn: (r) => r["friendly_name"] == "Solar_selbst_verbraucht_Summe")
    |> filter(fn: (r) => r["_field"] == "value")
    |> aggregateWindow(every: 1d, fn: spread, createEmpty: true)

join(tables: {t1: selfUsedKwh, t2: costKwh}, on: ["_time"] )
  |> map(fn: (r) => ({r with _field: "saving", _value: r._value_t1 * r._value_t2}))
  |> cumulativeSum()

And this is the resulting data:

So

  • The cumulative sum works well, nice
  • What remains an issue is that fill(previous) does only check for “previous” data in the query time window, but it should do that without any limits. In the data shown there is an entry for costKwh on 01.04.2024 and this should be used here as well. Any ideas to make that possible?

I hope I did not loose you meanwhile, @scott ?

Thank you

Ok, I finally made it:

  • I am using a constant start of the time range to start data selection and cum. sum calculation from the point in time where I have the first data. The end of the range will always be now(). From Grafana I will then kind select a sub timeframe from this and so always see the correct data for the time displayed. That works around the fill(usePrevious: true) issue at the cost of noticeable longer query runtime.
  • I had to add the privot function to the end to work around a “[sse.readDataError] [C] got error: input data must be a wide series but got type long (input refid)” error from grafana. I do not understand why, but No Join function support in Grafana? - #7 by tcasanova - InfluxDB - Grafana Labs Community Forums indicated this is required however. But running the query in influx data explorer however I need to comment that pivot line to make the graph display correct data. Weird … but ok.
  • I had to drop the source columns from the join to make influx data explorer display the correct data. It does only display the cum sum values in the graph when I drop those source columns. For grafana this can be ignored

Here is the final query:

costKwh =
  from(bucket: "data")
    |> range(start: 2023-06-01T00:00:00Z, stop: now() )
    |> filter(fn: (r) => r["_measurement"] == "EUR")
    |> filter(fn: (r) => r["friendly_name"] == "Strom_Preis_pro_kWh")
    |> filter(fn: (r) => r["_field"] == "value")
    // unify manual entries with entries from HA
    |> drop(columns: ["device_class_str","state_class_str"])
    |> aggregateWindow(every: 1d, fn: last, createEmpty: true)
    |> fill(usePrevious: true)

selfUsedKwh =
  from(bucket: "data")
    |> range(start: 2023-06-01T00:00:00Z, stop: now() )
    |> filter(fn: (r) => r["_measurement"] == "kWh")
    |> filter(fn: (r) => r["friendly_name"] == "Solar_selbst_verbraucht_Summe")
    |> filter(fn: (r) => r["_field"] == "value")
    |> aggregateWindow(every: 1d, fn: spread, createEmpty: true)

join(tables: {t1: selfUsedKwh, t2: costKwh}, on: ["_time"] )
  |> map(fn: (r) => ({r with _field: "saving", _value: r._value_t1 * r._value_t2}))
  |> cumulativeSum()
  // add savings from before start of data collection
  |> map(fn: (r) => ({r with _value: (r._value + 38.2)}))
  // drop source columns from joined tables to make influx data explorer know which column to draw, can be ignored with grafana
  |> drop(fn: (column) => column =~ /.*_t1$/)
  |> drop(fn: (column) => column =~ /.*_t2$/)
  // needed with grafana, reasons unclear, comment when run from influx data explorer
  // |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Thank you @scott for your help.