Influx Tasks Not Creating New Columns

Hello, I’m trying to downsample my main bucket with the flux task below and it’s not creating the min/max/mean columns in the new bucket that it sends the data to. I use the flux query in the main bucket and it DOES create the new columns/values but not when used as a rollup task. Any tips or guidance on this one? I’m really lost right now. Any help would be incredible. Thank you!

option task = {name: “DailyRollup”, every: 24h}

data = from(bucket: “capacity”)

|> range(start: -task.every)

|> filter(fn: (r) =>

    (r._measurement == "compute"))

|> filter(fn: (r) =>

    (contains(value: r._field, set: ["vcpu", "memory", "instances", "available", "unavailable", "total", "used"])))

data

|> window(every: 1h)

|> reduce(identity: {

    _value: 0,

    count: 0.0,

    sum: 0.0,

    min: 0.0,

    max: 0.0,

    mean: 0.0,

}, fn: (r, accumulator) =>

    ({

        _value: r._value,

        count: accumulator.count + 1.0,

        sum: float(v: r._value) + accumulator.sum,

        min: if accumulator.count == 0.0 then float(v: r._value) else if float(v: r._value) < accumulator.min then float(v: r._value) else accumulator.min,

        max: if accumulator.count == 0.0 then float(v: r._value) else if float(v: r._value) > accumulator.max then float(v: r._value) else accumulator.max,

        mean: (float(v: r._value) + accumulator.sum) / (accumulator.count + 1.0),

    }))

|> drop(columns: ["count", "sum"])

|> duplicate(column: "_stop", as: "_time")

|> to(bucket: "v2")

Hello @lmarzocco,
I’m not sure. Are you getting error messages in the logs? Can you please share them? Is the task doing anything? Can you please share some raw input and task output data?

Thank you :slight_smile:

Hello!

There’s never any errors, the tasks run successfully. The tasks output the same exact data (same columns) that the original bucket has and doesn’t create any new ones when it should be. I just checked the new bucket again and now there’s no data; it’s as if it’s being wiped and I really don’t understand why. Am I doing something wrong with the flux task? It seems to me that it should just be, for the last 24 hours, grouping every set of data into 1 hour windows and calculating the min/mean/max for each window and sending that to a new bucket. Is there an easier way to approach this?

@lmarzocco Are you trying to store min, max, and mean as fields? To write fields, the to() function requires a _time, _measurement, _field and a _value column. You essentially need to unpivot your data to correctly write these calculated values back as fields. With the data structured as is, min, max, and mean are being written back as tags. Unfortunately, Flux doesn’t support unpivoting yet, but there is an issue for it (influxdata/flux#2539).

The good news is that there is an experimental.to() function that writes pivoted data back to InfluxDB. It writes all columns not in the group key back as fields. It think this will accomplish what you’re trying to do:

import "experimental"

option task = {name: “DailyRollup”, every: 24h}

fieldSet = ["vcpu", "memory", "instances", "available", "unavailable", "total", "used"]

data = from(bucket: “capacity”)
  |> range(start: -task.every)
  |> filter(fn: (r) => r._measurement == "compute" and contains(value: r._field, set: fieldSet )

data
  |> window(every: 1h)
  |> reduce(identity: {
      _value: 0,
      count: 0.0,
      sum: 0.0,
      min: 0.0,
      max: 0.0,
      mean: 0.0,
    },
    fn: (r, accumulator) =>
      ({
        _value: r._value,
        count: accumulator.count + 1.0,
        sum: float(v: r._value) + accumulator.sum,
        min: if accumulator.count == 0.0 then float(v: r._value) else if float(v: r._value) < accumulator.min then float(v: r._value) else accumulator.min,
        max: if accumulator.count == 0.0 then float(v: r._value) else if float(v: r._value) > accumulator.max then float(v: r._value) else accumulator.max,
        mean: (float(v: r._value) + accumulator.sum) / (accumulator.count + 1.0),
      })
  )
  |> drop(columns: ["count", "sum"])
  |> duplicate(column: "_stop", as: "_time")
  |> experimental.to(bucket: "v2")

@scott - could not execute task run; Err: runtime error @31:5-31:34: to: found column “_field” in the group key; experimental.to() expects pivoted data: runtime error @31:5-31:34: to: found column “_field” in the group key; experimental.to() expects pivoted data

As if it’s not pivoted already? Is this just a bad way of trying to add columns? Seems like the best way from searching around on the web. Any tips?

Thanks!

Lucas

@lmarzocco Ah, right. _field is part of the group key, so it persists through the reduce() operation. Just add _field to the list of columns in the drop() function.

// ...
|> drop(columns: ["count", "sum", "_field"])
//...

@scott - that makes it work and not complain anymore, thanks! :smiley:

Only problem is that the fieldset gets replaced with min/max/mean. I thought this flux task would add a min/max/mean column for each data point in the original table, but now it just replaces the data completely. It keeps _field but then the value is now min/max/mean vs instances or vcpus, etc. Was hoping it would keep that but then tack on the min value of that, max value of that, and mean value of that per time window (per hour). Is this possible? I hope this makes sense.