How to turn influxQL sum(*) mean(*) into flux?

I am new to Flux and have been trying to find the right way to translate my continuous query to a flux task.
In my opinion the query is very simple, but anything i try using flux grows very complex and fails to give me a good result.

cq_power_h  CREATE CONTINUOUS QUERY cq_power_h ON "device-data"
  SELECT count(*), mean(*), sum(*)
  INTO "device-data".power_h
  FROM "device-data".power
  GROUP BY time(1h), *

My input data in Influx 1.x looks like this:

time                deviceId                 direction duration energy              watt
1665568744022000000 5ee0a4984a711920b2fdcc12 source    60735    0.0039433549597625  233.738007

My downsampled data in Influx 1.x looks like this:

time                count_direction count_duration count_energy count_watt deviceId                 mean_duration      mean_energy           mean_watt          sum_duration sum_energy         sum_watt
1665561600000000000 58              58             58           58         5ee0a4984a711920b2fdcc12 60737.016949152545 0.0054816733539213135 324.90935576271175 3583484      0.3234187278813575 19169.651989999995

Problems I have:

  • Doing multiple different aggregates on the same data window in a performant way (aka query input data once). I have tried using reduce (like this Downsampling with InfluxDB v2.0 | InfluxData), but it errors with “float != int” and it is recommended to avoid reduce due to bad performance.
  • Selecting all fields, but only executing aggregations on fields with number values (currently I get errors about string values if I don’t filter out fields).
  • Not having one aggregate overwrite another, or giving errors, when pushing the data into another bucket.
  • Dynamically renaming fields based on original name (duration => count_duration, mean_duration, sum_duration). I believe this would solve the overwriting problem.

I would very much appreciate if someone can point me in the right direction to solve this.

@Erik I think this will give you what you’re looking for:

import "types"

option task = {name: "cq_power_h", every: 1h}

data = () =>
    from(bucket: "device-data/power")
        |> range(start: -1h)
        |> filter(fn: (r) =>
            types.isType(v: r._value, type: "int") or
            types.isType(v: r._value, type: "uint") or
            types.isType(v: r._value, type: "float")

        data() |> count() |> map(fn: (r) => ({r with _field: "count_${r._field}"})),
        data() |> mean() |> map(fn: (r) => ({r with _field: "mean_${r._field}"})),
        data() |> sum() |> map(fn: (r) => ({r with _field: "sum_${r._field}"})),
    |> to(bucket: "device-data/power_h")

@scott you’re amazing! Much appreciated, this helped me solve my struggles.
I modified it by adding “, _time: r._stop” in the maps and dropped the _start and _stop.

Do you know if there is any benefit to using union or simply have all three streams write to the bucket? I’ve tried it and my bucket seem to fill up the same way. Like this:

data() |> count() |> map(fn: (r) => ({r with _field: "count_${r._field}", _time: r._stop})) |> to(bucket: "device-data/power_h")

data() |> mean() |> map(fn: (r) => ({r with _field: "mean_${r._field}", _time: r._stop})) |> to(bucket: "device-data/power_h")

data() |> sum() |> map(fn: (r) => ({r with _field: "sum_${r._field}", _time: r._stop})) |> to(bucket: "device-data/power_h")

@Erik, unioning the streams together makes it so you only have one write call back to InfluxDB instead of three, but I don’t know that it matters much.

@scott that makes sense. There will be multiple tasks running every hour, so I’ll keep an eye on it.
Thank you very much for your quick replies!