InfluxQL to new task FLUX with resample and group

@shclonk You’re going to need to create a Flux task. If you create the task through the InfluxDB UI, leave out the options statement. The UI autogenerates that using the specified task settings. If you’re add the task through the API or the CLI, leave the options statement in.

import "experimental"

options task = {
  name: "trade_to_candles_60",
  every: 10s
}

price = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "price")

amount = price = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")

minMaxLast = (tables=<-, column) => tables |> reduce(
  identity: {last: 0.0, max: 0.0, min: 0.0},
  fn: (r, accumulator) => {(
    last: r._value,
    min: if r._value < accumulator.min then r._value else accumlator.min,
    max: if r._value > accumlator.max then r._value else accumlator.max,
  )}
)

pivotedPrice = price |> aggregateWindow(every: 1m, fn: minMaxLast)

pivotedAmount = amount
  |> aggregateWindow(every: 1m, fn: sum)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

join(tables: [price: pivotedPrice, amount: pivotedAmount], on: "_time")
  |> experimental.set(o: {_min_price: "low", _max_price: "high", _last_price: "close", _amount_amount: "value"})
  |> experimental.to(bucket: "candles_1m")
1 Like