@shclonk You’re going to need to create a Flux task. If you create the task through the InfluxDB UI, leave out the options
statement. The UI autogenerates that using the specified task settings. If you’re add the task through the API or the CLI, leave the options
statement in.
import "experimental"
options task = {
name: "trade_to_candles_60",
every: 10s
}
price = from(bucket: "mydb")
|> from(start: -10m)
|> filter(fn: (r) => r._measurement == "trades" and r._field == "price")
amount = price = from(bucket: "mydb")
|> from(start: -10m)
|> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")
minMaxLast = (tables=<-, column) => tables |> reduce(
identity: {last: 0.0, max: 0.0, min: 0.0},
fn: (r, accumulator) => {(
last: r._value,
min: if r._value < accumulator.min then r._value else accumlator.min,
max: if r._value > accumlator.max then r._value else accumlator.max,
)}
)
pivotedPrice = price |> aggregateWindow(every: 1m, fn: minMaxLast)
pivotedAmount = amount
|> aggregateWindow(every: 1m, fn: sum)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
join(tables: [price: pivotedPrice, amount: pivotedAmount], on: "_time")
|> experimental.set(o: {_min_price: "low", _max_price: "high", _last_price: "close", _amount_amount: "value"})
|> experimental.to(bucket: "candles_1m")