InfluxDB 2 Downsampling

good day. documentation have very little information about how to make complex dowsampling. we have some data from many sensors. this data marked with 3 tags : ControllerName,DeviceName and SensorName select to display sensor mesurment looks like this |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == “mqtt_consumer”)
|> filter(fn: (r) => r[“Controller”] == “ControllerName”)
|> filter(fn: (r) => r[“Device”] == “deviceName”)
|> filter(fn: (r) => r[“Control”] == “SensorName”)
|> filter(fn: (r) => r["_field"] == “value”)
but how can i make one downsampling task that filter out each controller with each device and each sensor and aggregate those seqenses with tags preserved.
i need domething like this:

  foreach(controlerTag: scheme.getTagValues("controller")){
      foreach(deviceTag: scheme.getTagValues("device")){
          foreach(sensorTag: scheme.getTagValues("sensor")){
            data = from(bucket: "Bucket")
            |> range(start: -task.every)
            |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
            |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
            |> filter(fn: (r) => r["controller"] == controlerTag)
            |> filter(fn: (r) => r["device"] == deviceTag)
            |> filter(fn: (r) => r["sensor"] == sensorTag)
            |> filter(fn: (r) => r["_field"] == "value")

            data
            |> mean()
            |> set(key: "agg_type", value: "mean")
            |> set(key: "controller", value: controlerTag)
            |> set(key: "device", value: deviceTag)
            |> set(key: "sensor", value: sensorTag)
            |> to(bucket: "5secAggregationFor1Day", org: "Org", tagColumns: ["agg_type"])
            
            data
            |> last()
            |> set(key: "agg_type", value: "last")
            |> set(key: "controller", value: controlerTag)
            |> set(key: "device", value: deviceTag)
            |> set(key: "sensor", value: sensorTag)
            |> to(bucket: "5secAggregationFor1Day", org: "Org", tagColumns: ["agg_type"])
          }
      }
  }

Hello @ScorpioTheDark,
Welcome!
Your data should already be grouped by default by “Controller”, “Device”, and “Control” if you just filter for the measurement.

from(bucket: "test")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "mqtt_comsumer")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

I wrote the following data:

mqtt_comsumer,Controller=N1,Device=N1,Control=N1 value=1
mqtt_comsumer,Controller=N1,Device=N1,Control=N1 value=2
mqtt_comsumer,Controller=N2,Device=N2,Control=N2 value=2
mqtt_comsumer,Controller=N2,Device=N2,Control=N2 value=3

And the query above yielded the following output:

As we can see, the tables are grouped by different controller, device, and control names. And their corresponding tags values are included in the output.
Our to() function could look like:

to(
  bucket: "my-mean-data",
  org: "my-org",
  timeColumn: "_time",
  tagColumns: ["Controller", "Device", "Control"],
  fieldFn: (r) => ({ [r._field]: r._value })
)

I might also recommend using the aggregateWindow() function

 |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

I might be wrong, but I think gaining a better understanding of annotated CSV could help you here:

Please let me know if this helps? Or if I misunderstood your problem.

Thank you.

to(
  bucket: "my-mean-data",
  org: "my-org",
  timeColumn: "_time",
  tagColumns: ["Controller", "Device", "Control"],
  fieldFn: (r) => ({ [r._field]: r._value })
)

I am facing a similar problem but I am getting tags and everything but is there any way we don’t have to give particular tag names InfluxDB should automatically take all tags.
it will be like:

to(
      bucket: "my-mean-data",
      org: "my-org",
      timeColumn: "_time",
      tagColumns: [code for list of all tags or any variable which place all the tag list here],
      fieldFn: (r) => ({ [r._field]: r._value })
    )

Hello @Ravikant_Gautam,
Yes.

Tag columns in the output. Defaults to all columns with type string , excluding all value columns and columns identified by fieldFn .

good day. thank you for answer. i managed to do downsampling on a small dataset with this script

from(bucket: "Source")
	|> range(start: -task.every)
	|> filter(fn: (r) =>
		(r["_measurement"] == "mqtt_consumer"))
	|> group(columns: ["Controller", "Device", "Control"])
	|> window(every: 5s, timeColumn: "_time")
	|> reduce(fn: (r, accumulator) =>
		({
			count: accumulator.count + 1,
			total: accumulator.total + r._value,
			avg: (accumulator.total + r._value) / float(v: accumulator.count),
			min: if r._value < float(v: accumulator.min) then float(v: r._value) else float(v: accumulator.min),
			max: if r._value > float(v: accumulator.max) then float(v: r._value) else float(v: accumulator.max),
			_measurement: r["_measurement"],
			_time: r["_time"],
			topic: r["topic"],
		}), identity: {
		count: 1,
		total: 0.0,
		avg: 0.0,
		min: 0.0,
		max: 0.0,
		_measurement: "",
		_time: 1677-09-21T00:12:43.145224194Z,
		topic: "",
	})
	|> to(
		bucket: "5secAggregationFor1Day",
		org: "Org",
		timeColumn: "_start",
		tagColumns: ["Controller", "Device", "Control", "topic"],
		fieldFn: (r) =>
			({
				"count": r.count,
				"total": r.total,
				"avg": r.avg,
				"min": r.min,
				"max": r.max,
			}),
	)

but on large data (aprox 40Gb and about 20K of independent mesurments) i have got Out of memory error.
In Docker container this Task take 28Gb Memory , that maximum memory that i manage to allocate, but still i got exit code 137 and docker restart container with OOM error.
can you advice what i can do next? may be script is wrong ?

Hello @ScorpioTheDark,
I’m not sure. You might be able to implement some pushdown patterns. I’ll send your question to the Flux team.

@ScorpioTheDark I think the following task will accomplish what you need and take advantage of available pushdowns.

data = from(bucket: "Source")
	|> range(start: -task.every)
	|> filter(fn: (r) => (r["_measurement"] == "mqtt_consumer"))

count = data |> aggregateWindow(every: 5s, fn: count, timeSrc: "_start",) |> set(key: "_field", as: "count")
total = data |> aggregateWindow(every: 5s, fn: sum, timeSrc: "_start",) |> set(key: "_field", as: "total")
avg = data |> aggregateWindow(every: 5s, fn: mean, timeSrc: "_start",) |> set(key: "_field", as: "avg")
min = data |> aggregateWindow(every: 5s, fn: min, timeSrc: "_start",) |> set(key: "_field", as: "min")
max = data |> aggregateWindow(every: 5s, fn: max, timeSrc: "_start",) |> set(key: "_field", as: "max")

union(tables: [count, total, avg, min, max])
  |> to(bucket: "5secAggregationFor1Day",	org: "Org",)

There is one difference between your query on the one above – I left the data grouped by topic to maintain the pushdowns. If it’s a single topic, it shouldn’t matter. If there are multiple topics, it will change how data is grouped and aggregated.