Same query in Data Explorer and Task appears to work differently

I have a fairly complicated query, which runs through different measures, calculates and combines lot of data. The result of this query is a single field that I display on a dashboard. But I also wanted to send this data over MQTT so I can use it for something else. I copy and pasted the query into a task and the result goes to MQTT. But when I get in MQTT is a different value. I looked at it from many different angle, but I can’t figure out why the two results are different.

I know it is hard to decipher what the query does, but I wanted to include it for the sake of completeness.

billed = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "yes")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop","billing","_measurement"])

current = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "no")
  |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
  |> last()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> drop(columns: ["_start","_stop","billing","_measurement"])

meterreadings = join(tables: {billed: billed, current: current}, on: ["type"])
  |> map(fn: (r) => ({ r with balance: -(r.peak_used_current-r.peak_used_billed)+(r.peak_generated_current-r.peak_generated_billed) }))
  |> map(fn: (r) => ({ r with join: 1 }))

// Get the time for the last meter reading
current_time = from(bucket: "nodered")
  |> range(start: -2y)
  |> filter(fn: (r) => r["_measurement"] == "meterreadings")
  |> filter(fn: (r) => r["type"] == "electricity")
  |> filter(fn: (r) => r["billing"] == "no")
  |> last()
  |> tableFind(fn: (key) => key._field == "offpeak")
  |> getRecord(idx: 0)

// Get the usage from Shelly before the meter reading
old_usage = from(bucket: "nodered")
  |> range(start: -2y, stop:current_time._time)
  |> filter(fn: (r) => r["_measurement"] == "sensors")
  |> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
  |> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
  |> last()
  |> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")

// Get the latest usage from Shelly
current_usage = from(bucket: "nodered")
  |> range(start: -1w)
  |> filter(fn: (r) => r["_measurement"] == "sensors")
  |> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
  |> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
  |> last()
  |> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")

// Combine the tables containing the Shelly readings and calculate the combined difference
shelly = join(tables: {old_usage: old_usage, current_usage: current_usage}, on: ["_measurement"])
  |>  map(fn: (r) => ({ r with shelly_usage: (-(r.house_phase0_total_current_usage-r.house_phase0_total_old_usage)-
    (r.house_phase1_total_current_usage-r.house_phase1_total_old_usage)-
    (r.house_phase2_total_current_usage-r.house_phase2_total_old_usage)+
    (r.house_phase0_total_returned_current_usage-r.house_phase0_total_returned_old_usage)+
    (r.house_phase1_total_returned_current_usage-r.house_phase1_total_returned_old_usage)+
    (r.house_phase2_total_returned_current_usage-r.house_phase2_total_returned_old_usage)
    )/1000.0 }))
  |> map(fn: (r) => ({ r with join: 1 }))

join(tables: {meterreadings : meterreadings , shelly: shelly}, on: ["join"])
  |>  map(fn: (r) => ({ r with balance_est: r.shelly_usage+r.balance}))
  |> keep(columns: ["balance_est"])

Please note, that all the ranges use a fixed period, so the time interval in the data explorer or the task should not make a difference.

If I run the above query in the Data Explorer, I get this:

‘balance_est = 921.1785’

Now, I have copy and pasted the query, added a _time field (needed for mqtt.to) and also an mqtt.to to send out the results:

import "experimental/mqtt"

option task = {
    name: "Send Electricity Estimated Balance",
    every: 1h,
}

billed = from(bucket: "nodered")
    |> range(start: -2y)
    |> filter(fn: (r) => r["_measurement"] == "meterreadings")
    |> filter(fn: (r) => r["type"] == "electricity")
    |> filter(fn: (r) => r["billing"] == "yes")
    |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
    |> last()
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "billing", "_measurement"])

current = from(bucket: "nodered")
    |> range(start: -2y)
    |> filter(fn: (r) => r["_measurement"] == "meterreadings")
    |> filter(fn: (r) => r["type"] == "electricity")
    |> filter(fn: (r) => r["billing"] == "no")
    |> filter(fn: (r) => r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"] == "peak_used")
    |> last()
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> drop(columns: ["_start", "_stop", "billing", "_measurement"])

meterreadings = join(tables: {billed: billed, current: current}, on: ["type"])
    |> map(fn: (r) => ({r with balance: -r.peak_used_current - r.peak_used_billed + (r.peak_generated_current - r.peak_generated_billed)}))
    |> map(fn: (r) => ({r with join: 1}))

// Get the time for the last meter reading
current_time = from(bucket: "nodered")
    |> range(start: -2y)
    |> filter(fn: (r) => r["_measurement"] == "meterreadings")
    |> filter(fn: (r) => r["type"] == "electricity")
    |> filter(fn: (r) => r["billing"] == "no")
    |> last()
    |> tableFind(fn: (key) => key._field == "offpeak")
    |> getRecord(idx: 0)

// Get the usage from Shelly before the meter reading
old_usage = from(bucket: "nodered")
    |> range(start: -2y, stop: current_time._time)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
    |> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
    |> last()
    |> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")

// Get the latest usage from Shelly
current_usage = from(bucket: "nodered")
    |> range(start: -1w)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device"] == "house_phase0" or r["device"] == "house_phase1" or r["device"] == "house_phase2")
    |> filter(fn: (r) => r["_field"] == "total" or r["_field"] == "total_returned")
    |> last()
    |> pivot(rowKey: ["_measurement"], columnKey: ["device", "_field"], valueColumn: "_value")

// Combine the tables containing the Shelly readings and calculate the combined difference
shelly = join(tables: {old_usage: old_usage, current_usage: current_usage}, on: ["_measurement"])
    |> map(
        fn: (r) => ({r with
            shelly_usage: (-r.house_phase0_total_current_usage - r.house_phase0_total_old_usage - (r.house_phase1_total_current_usage - r.house_phase1_total_old_usage) - (r.house_phase2_total_current_usage - r.house_phase2_total_old_usage) + (r.house_phase0_total_returned_current_usage - r.house_phase0_total_returned_old_usage) + (r.house_phase1_total_returned_current_usage - r.house_phase1_total_returned_old_usage) + (r.house_phase2_total_returned_current_usage - r.house_phase2_total_returned_old_usage)) / 1000.0,
        }),
    )
    |> map(fn: (r) => ({r with join: 1}))

join(tables: {meterreadings: meterreadings, shelly: shelly}, on: ["join"])
    |> map(fn: (r) => ({r with balance_est: r.shelly_usage + r.balance}))
    |> keep(columns: ["balance_est"])
    |> map(fn: (r) => ({r with _time: now()}))
    |> mqtt.to(
        broker: "tcp://192.168.1.80:1883",
        topic: "growatt/balance_est",
        clientid: "cpu-flux",
        name: "balance_est",
        username: "xxx",
        password: "xxx",
        valueColumns: [
            "balance_est",
        ],
    )

And this is what I get in MQTT:
balance_est balance_est=-49211.7565 1654111026000000000

This is nowhere near the value I get in the Data Explorer. And of course when I go back to the Data Explorer and run the query again, the value is still around 921…

I have no idea how to “debug” this. Should I suspect that something is wrong with the map function, and the calculation logic? What do you recommend for me to do?