I have a fairly complicated query, which runs through different measures, calculates and combines lot of data. The result of this query is a single field that I display on a dashboard. But I also wanted to send this data over MQTT so I can use it for something else. I copy and pasted the query into a task and the result goes to MQTT. But when I get in MQTT is a different value. I looked at it from many different angle, but I can’t figure out why the two results are different.
I know it is hard to decipher what the query does, but I wanted to include it for the sake of completeness.
billed = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop","billing","_measurement"])
current = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start","_stop","billing","_measurement"])
meterreadings = join(tables: {billed: billed, current: current}, on: ["type"])
|> map(fn: (r) => ({ r with balance: -(r.peak_used_current-r.peak_used_billed)+(r.peak_generated_current-r.peak_generated_billed) }))
|> map(fn: (r) => ({ r with join: 1 }))
// Get the time for the last meter reading
current_time = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> last()
|> tableFind(fn: (key) => key._field == "offpeak")
|> getRecord(idx: 0)
// Get the usage from Shelly before the meter reading
old_usage = from(bucket: "nodered")
|> range(start: -2y, stop:current_time._time)
|> filter(fn: (r) => r["_measurement"] == "sensors")
|> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
|> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
|> last()
|> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")
// Get the latest usage from Shelly
current_usage = from(bucket: "nodered")
|> range(start: -1w)
|> filter(fn: (r) => r["_measurement"] == "sensors")
|> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
|> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
|> last()
|> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")
// Combine the tables containing the Shelly readings and calculate the combined difference
shelly = join(tables: {old_usage: old_usage, current_usage: current_usage}, on: ["_measurement"])
|> map(fn: (r) => ({ r with shelly_usage: (-(r.house_phase0_total_current_usage-r.house_phase0_total_old_usage)-
(r.house_phase1_total_current_usage-r.house_phase1_total_old_usage)-
(r.house_phase2_total_current_usage-r.house_phase2_total_old_usage)+
(r.house_phase0_total_returned_current_usage-r.house_phase0_total_returned_old_usage)+
(r.house_phase1_total_returned_current_usage-r.house_phase1_total_returned_old_usage)+
(r.house_phase2_total_returned_current_usage-r.house_phase2_total_returned_old_usage)
)/1000.0 }))
|> map(fn: (r) => ({ r with join: 1 }))
join(tables: {meterreadings : meterreadings , shelly: shelly}, on: ["join"])
|> map(fn: (r) => ({ r with balance_est: r.shelly_usage+r.balance}))
|> keep(columns: ["balance_est"])
Please note, that all the ranges use a fixed period, so the time interval in the data explorer or the task should not make a difference.
If I run the above query in the Data Explorer, I get this:
‘balance_est = 921.1785’
Now, I have copy and pasted the query, added a _time field (needed for mqtt.to) and also an mqtt.to to send out the results:
import "experimental/mqtt"
option task = {
name: "Send Electricity Estimated Balance",
every: 1h,
}
billed = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "billing", "_measurement"])
current = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "billing", "_measurement"])
meterreadings = join(tables: {billed: billed, current: current}, on: ["type"])
|> map(fn: (r) => ({r with balance: -r.peak_used_current - r.peak_used_billed + (r.peak_generated_current - r.peak_generated_billed)}))
|> map(fn: (r) => ({r with join: 1}))
// Get the time for the last meter reading
current_time = from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> last()
|> tableFind(fn: (key) => key._field == "offpeak")
|> getRecord(idx: 0)
// Get the usage from Shelly before the meter reading
old_usage = from(bucket: "nodered")
|> range(start: -2y, stop: current_time._time)
|> filter(fn: (r) => r["_measurement"] == "sensors")
|> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
|> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
|> last()
|> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")
// Get the latest usage from Shelly
current_usage = from(bucket: "nodered")
|> range(start: -1w)
|> filter(fn: (r) => r["_measurement"] == "sensors")
|> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
|> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
|> last()
|> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")
// Combine the tables containing the Shelly readings and calculate the combined difference
shelly = join(tables: {old_usage: old_usage, current_usage: current_usage}, on: ["_measurement"])
|> map(
fn: (r) => ({r with
shelly_usage: (-r.house_phase0_total_current_usage - r.house_phase0_total_old_usage - (r.house_phase1_total_current_usage - r.house_phase1_total_old_usage) - (r.house_phase2_total_current_usage - r.house_phase2_total_old_usage) + (r.house_phase0_total_returned_current_usage - r.house_phase0_total_returned_old_usage) + (r.house_phase1_total_returned_current_usage - r.house_phase1_total_returned_old_usage) + (r.house_phase2_total_returned_current_usage - r.house_phase2_total_returned_old_usage)) / 1000.0,
}),
)
|> map(fn: (r) => ({r with join: 1}))
join(tables: {meterreadings: meterreadings, shelly: shelly}, on: ["join"])
|> map(fn: (r) => ({r with balance_est: r.shelly_usage + r.balance}))
|> keep(columns: ["balance_est"])
|> map(fn: (r) => ({r with _time: now()}))
|> mqtt.to(
broker: "tcp://192.168.1.80:1883",
topic: "growatt/balance_est",
clientid: "cpu-flux",
name: "balance_est",
username: "xxx",
password: "xxx",
valueColumns: [
"balance_est",
],
)
And this is what I get in MQTT:
balance_est balance_est=-49211.7565 1654111026000000000
This is nowhere near the value I get in the Data Explorer. And of course when I go back to the Data Explorer and run the query again, the value is still around 921…
I have no idea how to “debug” this. Should I suspect that something is wrong with the map function, and the calculation logic? What do you recommend for me to do?