InfluxQL to new task FLUX with resample and group

I 6 hour try convert influxQL to FLUX query. Any can do it ?

CREATE CONTINUOUS QUERY trade_to_candles_60 ON mydb 
RESAMPLE EVERY 10s FOR 10m 
BEGIN SELECT first(price) AS open, last(price) AS close, max(price) AS high, min(price) AS low, sum(amount) AS volume INTO 
candles_1m FROM trades WHERE 
GROUP BY time(1m), pair, exchange END

@shclonk You’re going to need to create a Flux task. If you create the task through the InfluxDB UI, leave out the options statement. The UI autogenerates that using the specified task settings. If you’re add the task through the API or the CLI, leave the options statement in.

import "experimental"

options task = {
  name: "trade_to_candles_60",
  every: 10s
}

price = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "price")

amount = price = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")

minMaxLast = (tables=<-, column) => tables |> reduce(
  identity: {last: 0.0, max: 0.0, min: 0.0},
  fn: (r, accumulator) => {(
    last: r._value,
    min: if r._value < accumulator.min then r._value else accumlator.min,
    max: if r._value > accumlator.max then r._value else accumlator.max,
  )}
)

pivotedPrice = price |> aggregateWindow(every: 1m, fn: minMaxLast)

pivotedAmount = amount
  |> aggregateWindow(every: 1m, fn: sum)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

join(tables: [price: pivotedPrice, amount: pivotedAmount], on: "_time")
  |> experimental.set(o: {_min_price: "low", _max_price: "high", _last_price: "close", _amount_amount: "value"})
  |> experimental.to(bucket: "candles_1m")
1 Like

thx wow so hardy after sql )

The Flux team has plans to make the syntax for these mutli-aggregate queries a little simpler. This is only one approach. Here’s another way you could do it:

options task = {
  name: "trade_to_candles_60",
  every: 10s
}

data = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades")

close = data
  |> filter(fn: (r) => r._field == "price")
  |> aggregateWindow(every: 1m, fn: last)
  |> set(key: "_field", value: "close") 

high = data
  |> filter(fn: (r) => r._field == "price")
  |> aggregateWindow(every: 1m, fn: max)
  |> set(key: "_field", value: "high") 

low = data
  |> filter(fn: (r) => r._field == "price")
  |> aggregateWindow(every: 1m, fn: min)
  |> set(key: "_field", value: "low") 

amount = data
  |> filter(fn: (r) => r._field == "amount")
  |> aggregateWindow(every: 1m, fn: sum)
  |> set(key: "_field", value: "amount")

union(tables: [close, high, low, amount])
  |> to(bucket: "candles_1m")
1 Like

does this syntax already work? or when will it be available?

It’s not available yet, but you can look through the proposal here: Flux Usability Study and Language Proposal · GitHub

I don’t know the timeframe for when this will be implemented.