Calculate data from different flux queries

I am storing electricity meter readings in flux in a measurement called “meterreadings”, with type=“electricity” and I also have an additional tag “billing”= true | false.
I want to calculate how much electricity I used since the last billing relevant reading. Basically take the last reading with billing = false, and extract the last reading where billing = true.

So the select the last billing relevant reading, I did this:

from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop"])

This gives me a result like this:

I can get the current reading, by running the same query, but with r["billing"] == "no". I understand that I can also put the results into different variables like billed and current.

But now I want to calculate a new value. Essentially, I need a single value like -(current.peak_used-billed.peak_used)-(current.offpeak-billed.offpeak)+(current.peak_generated-billed.peak_generated)

I don’t know how I can generate a “new table” just by calculation rules?

Hi @nygma2004,
So what you could is the following:

current = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop"])

table2 = current
 |> map(fn: (r) => ({ r with newColumnName: r.peak_used - r.offpeak + r.peak_generated }))

Map() enables you to create a new column and perform a calculation on each row to fill it. From there you can drop the columns you don’t need. Let me know if I have the right idea or you meant something different :slight_smile:

No exactly. I have used Map() before, but here I would need to calculate a new field from two different tables, that I select with separate flux queries:

billed = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop"])

current = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "no")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop"])

newValue = -(current.peak_used-billed.peak_used)-(current.offpeak-billed.offpeak)+(current.peak_generated-billed.peak_generated)

And I don’t know how that last part will work. It is a calculation from two tables.

@nygma2004, could you merge the tables using a join() on time? join() function | Flux 0.x Documentation

Then perform the map?

Oh yes, the join worked:

billed = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop","billing","_measurement"])

current = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "no")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop","billing","_measurement"])

join(tables: {billed: billed, current: current}, on: ["type"])
  |>  map(fn: (r) => ({ r with balance: -(r.peak_used_current-r.peak_used_billed)-(r.offpeak_current-r.offpeak_billed)+(r.peak_generated_current-r.peak_generated_billed) }))

Since the current and the billed table contain data from different times, I had to use one of the tag to join the entries. It took some time until I figured that out.
Now I can see the value in the balance table.

Now I have new idea that I also want to select some real-time usage data and add that to this calculated field. But that table has nothing in common, so I am not yet sure how I will do that.

Could you map a custom Tag based on the results of both tables to create a connection? If not, falling back on the timestamp to relate the two tables is always helps. You can also combine tables using a Union(). Though this would leave null values in columns where data does not exist.

I tried continuing with this complex query, but I got stuck again.

So, as you can see on my screenshot from the first post, I get a single record for my last meter reading.
And now, I want to run another selection on a different measurement which have data in different time interval and I want to use the _time from the selection above in the range.

billed = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop","billing","_measurement"])

from(bucket: "nodered")
  |> range(start: billed._time)
  |> filter(fn: (r) => r["_time"] < current._time)
  |> filter(fn: (r) => r["_measurement"] == "sensors")
  |> filter(fn: (r) => r["device"] == "house_phase0")
  |> first()

I think I know what the issue is. The billed._time is not a single value even through I have on record in the table. So probably I need to use tableFind and getRecord(idx: 0) but I cannot figure out how.

The billed table was generated using a pivot, so when I check it appears to be a table and not a tablestream, hence I don’t see what the key for that table is that I can specify in tableFind.

If I I just try to do getRecord(idx: 0), I get the following error: error calling function "getRecord" @10:6-10:23: unexpected type for table: want {schema: [{grouped: bool, label: string, type: string}]}, got [t5462]

Hi @nygma2004 ,
So you analysis of the problem is correct. getRecord(idx: 0) is expecting a table. You can do this using the findtable() function. Here is an example:

raw = from(bucket: "Jetson")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
  |> filter(fn: (r) => r["_field"] == "jetson_CPU1")
  |> last()
  |> tableFind(fn: (key) => key._field == "jetson_CPU1")
  |> getRecord(idx: 0)

from(bucket: "Jetson")
  |> range(start: raw._time)
  |> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
  |> filter(fn: (r) => r["_field"] == "jetson_CPU2")
  |> yield(name: "custom-name")

I would advise pivoting within your secondary query rather than your first.

Thanks a lot. Decided just to run the query again without pivot and I was able to use the tableFind and getRecord. I guess I could have done this in stages, run the selection into a variable, run tablefind and store it in one variable, and run the pivot and put that into a different variable. But it is a small table and it worked this way.

1 Like