shclonk
February 9, 2021, 4:10pm
#1
I 6 hour try convert influxQL to FLUX query. Any can do it ?
CREATE CONTINUOUS QUERY trade_to_candles_60 ON mydb
RESAMPLE EVERY 10s FOR 10m
BEGIN SELECT first(price) AS open, last(price) AS close, max(price) AS high, min(price) AS low, sum(amount) AS volume INTO
candles_1m FROM trades WHERE
GROUP BY time(1m), pair, exchange END
scott
February 9, 2021, 4:32pm
#2
@shclonk You’re going to need to create a Flux task. If you create the task through the InfluxDB UI, leave out the options
statement. The UI autogenerates that using the specified task settings. If you’re add the task through the API or the CLI, leave the options
statement in.
import "experimental"
options task = {
name: "trade_to_candles_60",
every: 10s
}
price = from(bucket: "mydb")
|> from(start: -10m)
|> filter(fn: (r) => r._measurement == "trades" and r._field == "price")
amount = price = from(bucket: "mydb")
|> from(start: -10m)
|> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")
minMaxLast = (tables=<-, column) => tables |> reduce(
identity: {last: 0.0, max: 0.0, min: 0.0},
fn: (r, accumulator) => {(
last: r._value,
min: if r._value < accumulator.min then r._value else accumlator.min,
max: if r._value > accumlator.max then r._value else accumlator.max,
)}
)
pivotedPrice = price |> aggregateWindow(every: 1m, fn: minMaxLast)
pivotedAmount = amount
|> aggregateWindow(every: 1m, fn: sum)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
join(tables: [price: pivotedPrice, amount: pivotedAmount], on: "_time")
|> experimental.set(o: {_min_price: "low", _max_price: "high", _last_price: "close", _amount_amount: "value"})
|> experimental.to(bucket: "candles_1m")
1 Like
shclonk
February 11, 2021, 8:49am
#3
thx wow so hardy after sql )
scott
February 11, 2021, 5:24pm
#4
The Flux team has plans to make the syntax for these mutli-aggregate queries a little simpler. This is only one approach. Here’s another way you could do it:
options task = {
name: "trade_to_candles_60",
every: 10s
}
data = from(bucket: "mydb")
|> from(start: -10m)
|> filter(fn: (r) => r._measurement == "trades")
close = data
|> filter(fn: (r) => r._field == "price")
|> aggregateWindow(every: 1m, fn: last)
|> set(key: "_field", value: "close")
high = data
|> filter(fn: (r) => r._field == "price")
|> aggregateWindow(every: 1m, fn: max)
|> set(key: "_field", value: "high")
low = data
|> filter(fn: (r) => r._field == "price")
|> aggregateWindow(every: 1m, fn: min)
|> set(key: "_field", value: "low")
amount = data
|> filter(fn: (r) => r._field == "amount")
|> aggregateWindow(every: 1m, fn: sum)
|> set(key: "_field", value: "amount")
union(tables: [close, high, low, amount])
|> to(bucket: "candles_1m")
1 Like
shclonk
February 14, 2021, 6:17pm
#5
does this syntax already work? or when will it be available?
scott
February 16, 2021, 5:02pm
#6
It’s not available yet, but you can look through the proposal here: Flux Usability Study and Language Proposal · GitHub
I don’t know the timeframe for when this will be implemented.
Hello. A year has passed. Has the new syntax not appeared?