I am storing electricity meter readings in flux in a measurement called “meterreadings”, with type=“electricity” and I also have an additional tag “billing”= true | false.
I want to calculate how much electricity I used since the last billing relevant reading. Basically take the last reading with billing = false, and extract the last reading where billing = true.
So the select the last billing relevant reading, I did this:
from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop"])
This gives me a result like this:
I can get the current reading, by running the same query, but with r["billing"] == "no"
. I understand that I can also put the results into different variables like billed
and current
.
But now I want to calculate a new value. Essentially, I need a single value like -(current.peak_used-billed.peak_used)-(current.offpeak-billed.offpeak)+(current.peak_generated-billed.peak_generated)
I don’t know how I can generate a “new table” just by calculation rules?
Hi @nygma2004,
So what you could is the following:
current = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop"])
table2 = current
|> map(fn: (r) => ({ r with newColumnName: r.peak_used - r.offpeak + r.peak_generated }))
Map() enables you to create a new column and perform a calculation on each row to fill it. From there you can drop the columns you don’t need. Let me know if I have the right idea or you meant something different
No exactly. I have used Map() before, but here I would need to calculate a new field from two different tables, that I select with separate flux queries:
billed = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop"])
current = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop"])
newValue = -(current.peak_used-billed.peak_used)-(current.offpeak-billed.offpeak)+(current.peak_generated-billed.peak_generated)
And I don’t know how that last part will work. It is a calculation from two tables.
@nygma2004, could you merge the tables using a join() on time? join() function | Flux 0.x Documentation
Then perform the map?
Oh yes, the join worked:
billed = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop","billing","_measurement"])
current = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop","billing","_measurement"])
join(tables: {billed: billed, current: current}, on: ["type"])
|> map(fn: (r) => ({ r with balance: -(r.peak_used_current-r.peak_used_billed)-(r.offpeak_current-r.offpeak_billed)+(r.peak_generated_current-r.peak_generated_billed) }))
Since the current and the billed table contain data from different times, I had to use one of the tag to join the entries. It took some time until I figured that out.
Now I can see the value in the balance table.
Now I have new idea that I also want to select some real-time usage data and add that to this calculated field. But that table has nothing in common, so I am not yet sure how I will do that.
Could you map a custom Tag based on the results of both tables to create a connection? If not, falling back on the timestamp to relate the two tables is always helps. You can also combine tables using a Union(). Though this would leave null values in columns where data does not exist.
I tried continuing with this complex query, but I got stuck again.
So, as you can see on my screenshot from the first post, I get a single record for my last meter reading.
And now, I want to run another selection on a different measurement which have data in different time interval and I want to use the _time
from the selection above in the range.
billed = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop","billing","_measurement"])
from(bucket: "nodered")
|> range(start: billed._time)
|> filter(fn: (r) => r["_time"] < current._time)
|> filter(fn: (r) => r["_measurement"] == "sensors")
|> filter(fn: (r) => r["device"] == "house_phase0")
|> first()
I think I know what the issue is. The billed._time is not a single value even through I have on record in the table. So probably I need to use tableFind
and getRecord(idx: 0)
but I cannot figure out how.
The billed table was generated using a pivot, so when I check it appears to be a table and not a tablestream, hence I don’t see what the key for that table is that I can specify in tableFind
.
If I I just try to do getRecord(idx: 0)
, I get the following error: error calling function "getRecord" @10:6-10:23: unexpected type for table: want {schema: [{grouped: bool, label: string, type: string}]}, got [t5462]
Hi @nygma2004 ,
So you analysis of the problem is correct. getRecord(idx: 0)
is expecting a table. You can do this using the findtable()
function. Here is an example:
raw = from(bucket: "Jetson")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
|> filter(fn: (r) => r["_field"] == "jetson_CPU1")
|> last()
|> tableFind(fn: (key) => key._field == "jetson_CPU1")
|> getRecord(idx: 0)
from(bucket: "Jetson")
|> range(start: raw._time)
|> filter(fn: (r) => r["_measurement"] == "exec_jetson_stats")
|> filter(fn: (r) => r["_field"] == "jetson_CPU2")
|> yield(name: "custom-name")
I would advise pivoting within your secondary query rather than your first.
Thanks a lot. Decided just to run the query again without pivot and I was able to use the tableFind
and getRecord
. I guess I could have done this in stages, run the selection into a variable, run tablefind and store it in one variable, and run the pivot and put that into a different variable. But it is a small table and it worked this way.
1 Like