Newbie's problem with a task not reflecting the aggregation "min"

I’ve got a bucket of source data called “ha - Tyden 2”. In a task I get the data from this bucket and put it to the destination bucket called “ha - Mesic 3”. The task runs every 3h and produces a “min” aggregation. The task source code:

option task = {name: "Downsampling - 3h", every: 3h}

data = from(bucket: "ha - Tyden 2")
    |> range(start: -task.every)
    |> filter(fn: (r) => (r["_measurement"] != "state"))
    |> filter(fn: (r) => (r["_field"] == "value"))

data
    |> filter(fn: (r) => (r["agregace"] == "min"))
    |> min()
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Mesic 3")

In the source bucket I have 2 recent values of 70,6 which are the minimum. The problem is that the task is not reflecting these two minimal values and produces the destination data without them.

I tried to test everything in the explorer. This is the test code:

from(bucket: "ha - Tyden 2")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "kg")
    |> filter(fn: (r) => r["entity_id"] == "ble_weight_c8478cd9480e")
    |> filter(fn: (r) => r["agregace"] == "min")
    |> yield(name: "ha - Tyden 2")

from(bucket: "ha - Tyden 2")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "kg")
    |> filter(fn: (r) => r["entity_id"] == "ble_weight_c8478cd9480e")
    |> filter(fn: (r) => r["agregace"] == "min")
    |> aggregateWindow(every: 3h, fn: min, column: "_value", timeSrc: "_stop", timeDst: "_time")
    |> yield(name: "Task reproduction")

from(bucket: "ha - Mesic 3")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "kg")
    |> filter(fn: (r) => r["entity_id"] == "ble_weight_c8478cd9480e")
    |> filter(fn: (r) => r["agregace"] == "min")
    |> yield(name: "ha - Mesic 3")

The first block simply takes the data form the source bucket without any aggregation.
The second block tries to simulate what the task does and yields it into “Task reproduction”.
The third block simply takes the data form the destination bucket without any aggregation.

The third block doesn’t show those 2 minimal values of 70,6 but the second block (which is just the task reproduction) properly shows the minimum of 70,6.

So I must have some problem lying within the task but I cannot figure out what. Or I badly don’t understand some basics :smirk:

Hello @acerot,
If you want to find the total minimum value across series or tables, you’ll need to ungroup your data first with an empty group function like so:

|> group()
|> min()

Try that and let me know what you think?
Thanks

Hello @Anaisdg ,

thanks for a tip.

However, after putting “group()” before “min()” the task stopped producing the aggregated output at all for some data (not for all :confused: )

To by honest, my tasks structure is more complicated then I originally posted (I simplified the post guessing the extra info is not needed). I have the default bucket “ha” and 4 other buckets with different retentions (2 days, 2 weeks, 3 months, forever) and 4 tasks - each task takes data from previous bucket and aggregates it to the next bucket (5 min, 1 hour, 3 hour, 1 day). All tasks have the same code as the procedure is always the same, only source/destination buckets and task aggregate window change.

The very fist task taking data from the default bucket and putting them to the second bucket has the following code (full code). Already this very first task stopped producing the MIN output for most of the data after putting the “group()” in it. So the full chain of the following tasks could not produce the MIN as it is missing in the output of the very first task (again, for some data, not for all). You can see I put “group()” only to the MIN section and the other aggregations still produce output.

option task = {name: "Downsampling - vsechno krome state - 5m", every: 5m}

data = from(bucket: "ha")
    |> range(start: -task.every)
    |> filter(fn: (r) =>
        (r["_measurement"] != "state"))
    |> filter(fn: (r) =>
        (r["_field"] == "value"))

data
    |> mean()
    |> set(key: "agregace", value: "mean")
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

data
    **|> group()**
    |> min()
    |> set(key: "agregace", value: "min")
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

data
    |> max()
    |> set(key: "agregace", value: "max")
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

data
    |> last()
    |> set(key: "agregace", value: "last")
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

data
    |> first()
    |> set(key: "agregace", value: "first")
    |> duplicate(as: "_time", column: "_stop")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

data
    |> count()
    |> set(key: "agregace", value: "count")
    |> duplicate(as: "_time", column: "_stop")
    |> set(key: "_measurement", value: "pocet")
    |> to(bucket: "ha - Den 2", org: "homeassistant")

See also the picture with some visual explanation.

Hello @acerot,
Can you please export some of your data (just a little bit like a couple of minutes or hours) to annotated CSV and share with me?
I’d like to dig into it on my machine. Sometimes its hard to debug Flux without looking at the raw data.

Did you run the task previously without the group()?
When you apply the group() I’d expect you to only get one line that represents the min value across all of your series. Right now it looks like maybe a previous task wrote the min value for ALL fields before 08/04 and then your new task with the group() key wrote a SINGLE min value ACROSS all fields after 08/04.

Finally, make sure to include an offset in your task to avoid read/write conflicts. This could be the sole cause of your problems assuming my hypothesis above is wrong.