in tasks


I am using to send data from my bucket to a broker, however the query doesn’t seem to work well,
as you can see below :

I get way more data from my broker than what I get from the database publish message, I have usually around 3 or 6 data per second while I have something like 50 data registered in the database per second.
Here is the command used for the task, i can’t seem to get the _time value in the msg payload as well,
if someone knows how to extract the time ?

import "experimental/mqtt"

option task = {
    name: "Mqtt",
    every: 1s,

from(bucket: "test")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["_measurement"] == "accel")
    |> filter(fn: (r) => r["_field"] == "Newtons")
    |> filter(fn: (r) => r["host"] == "ESP32")
    |> filter(fn: (r) => r["topic"] == "test")
    |> aggregateWindow(every: 1ms, fn: mean, createEmpty: false)
    |> yield(name: "mean")
        broker: "tcp://host.docker.internal:1883",
        topic: "DB",
        clientid: "flux",
        timeColumn: "_time",
        qos: 0,
        valueColumns: ["_value"],
        tagColumns: ["test"],

The (~50/per second) tables do appear in the normal querying system when I run the same query, so why doesn’t it work well in tasks ?

Thanks to anyone who can help

Ok I just found part of the problem, it seems that when I change the range start, I get more points per seconds ?? (Around |> range(start: -10s) I get all the points)
Do someone has an explanation and a solution on how to counter this ?
I need to process data in timestamp close to real time processing and waiting 10 seconds for each frame of data is not the goal of my work