About a year ago I set up a task that runs a query and send the result in MQTT. With Influx v2.5.1 I had to set up my tasks again (I lost my completed Influx setup), and the same question is no longer working.
This is the original community discussion: Using mqtt.to() in Tasks - #7 by nygma2004
And this has been working fine for about a year. Now I set up the exact same flux query for the task and it does not do anything. If I run the task, it says “success”, no errors, nothing, but I don’t see the data in my MQTT server.
Below is my full query:
import "experimental/mqtt"
option task = {name: "Send Electricity Balance", every: 1h}
billed =
from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "yes")
|> filter(
fn: (r) =>
r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"]
==
"peak_used",
)
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "billing", "_measurement"])
current =
from(bucket: "nodered")
|> range(start: -2y)
|> filter(fn: (r) => r["_measurement"] == "meterreadings")
|> filter(fn: (r) => r["type"] == "electricity")
|> filter(fn: (r) => r["billing"] == "no")
|> filter(
fn: (r) =>
r["_field"] == "offpeak" or r["_field"] == "peak_generated" or r["_field"]
==
"peak_used",
)
|> last()
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "billing", "_measurement"])
join(tables: {billed: billed, current: current}, on: ["type"])
|> map(
fn: (r) =>
({r with balance:
-r.peak_used_current - r.peak_used_billed + (r.peak_generated_current
-
r.peak_generated_billed),
}),
)
|> map(fn: (r) => ({r with _time: now()}))
|> mqtt.to(
broker: "tcp://192.168.1.80:1883",
topic: "growatt/balance",
clientid: "cpu-flux",
name: "balance",
username: "xxxx",
password: "xxxx",
valueColumns: [
"balance",
"peak_used_current",
"peak_used_billed",
"peak_generated_current",
"peak_generated_billed",
],
tagColumns: ["type"],
)
If I run the same query in Explorer without the mqtt.to line, I get data back as I expect it:
So I am not sure what has gone wrong. According to the documentation you still need import "experimental/mqtt"
to use MQTT. I am monitoring the MQTT traffic in MQTT explorer where I used to see the data posted to the growatt/balance topic in the past.