Mqtt.to() not working in v2.5.1

About a year ago I set up a task that runs a query and send the result in MQTT. With Influx v2.5.1 I had to set up my tasks again (I lost my completed Influx setup), and the same question is no longer working.

This is the original community discussion: Using mqtt.to() in Tasks - #7 by nygma2004

And this has been working fine for about a year. Now I set up the exact same flux query for the task and it does not do anything. If I run the task, it says “success”, no errors, nothing, but I don’t see the data in my MQTT server.

Below is my full query:

import "experimental/mqtt"

option task = {name: "Send Electricity Balance", every: 1h}

billed =
    from(bucket: "nodered")
        |> range(start: -2y)
        |> filter(fn: (r) => r["_measurement"] == "meterreadings")
        |> filter(fn: (r) => r["type"] == "electricity")
        |> filter(fn: (r) => r["billing"] == "yes")
        |> filter(
            fn: (r) =>
                r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"]
                    ==
                    "peak_used",
        )
        |> last()
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "billing", "_measurement"])

current =
    from(bucket: "nodered")
        |> range(start: -2y)
        |> filter(fn: (r) => r["_measurement"] == "meterreadings")
        |> filter(fn: (r) => r["type"] == "electricity")
        |> filter(fn: (r) => r["billing"] == "no")
        |> filter(
            fn: (r) =>
                r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"]
                    ==
                    "peak_used",
        )
        |> last()
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> drop(columns: ["_start", "_stop", "billing", "_measurement"])

join(tables: {billed: billed, current: current}, on: ["type"])
    |> map(
        fn: (r) =>
            ({r with balance:
                    -r.peak_used_current - r.peak_used_billed + (r.peak_generated_current
                            -
                            r.peak_generated_billed),
            }),
    )
    |> map(fn: (r) => ({r with _time: now()}))
    |> mqtt.to(
        broker: "tcp://192.168.1.80:1883",
        topic: "growatt/balance",
        clientid: "cpu-flux",
        name: "balance",
        username: "xxxx",
        password: "xxxx",
        valueColumns: [
            "balance",
            "peak_used_current",
            "peak_used_billed",
            "peak_generated_current",
            "peak_generated_billed",
        ],
        tagColumns: ["type"],
    )

If I run the same query in Explorer without the mqtt.to line, I get data back as I expect it:

So I am not sure what has gone wrong. According to the documentation you still need import "experimental/mqtt" to use MQTT. I am monitoring the MQTT traffic in MQTT explorer where I used to see the data posted to the growatt/balance topic in the past.

Hi @nygma2004 that is odd, if you run the mqtt.to() within the query explorer aswell do you see any errors occur / any errors in the InfluxDB logs?

@Jay_Clifford : when I run it in the explorer, I just get the table back in the result and nothing else. If feels like the mqtt.to is not even executed.
In fact when I was testing it, I copied the example above without realizing I xxx-ed out the username and the password. So running it with xxx I got no error at all.

For the logs I look into here sudo journalctl -u influxdb.service --since "10 minutes ago" but I am not seeing anything there. Absolutely nothing for the period when I was running it.

Anything else I should check?