@lexchou You have a few options. You can only join two streams at a time, but you can do multiple joins:
data = from(bucket: "dcoin")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))
open = data |> aggregateWindow(every: v.windowPeriod, fn: first, createEmpty: false)
high = data |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
low = data |> aggregateWindow(every: v.windowPeriod, fn: min, createEmpty: false)
close = data |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
open-close = join(tables: {open: open, close: close}, on: ["_time"])
high-low = join(tables: {high: high, low: low}, on: ["_time"])
join(tables: {oc: open-close, hl: high-low}, on: ["_time"])
BUT, this is a very inefficient way to do this. The better way would be to write a custom aggregate function using reduce()
.
This example below defines a custom function called openCloseHighLow
and uses that to do the aggregation:
openCloseHighLow = (tables=<-, column) =>
tables
|> reduce(
identity: { index: 0, open: 0.0, close: 0.0, high: 0.0, low: 0.0 },
fn: (r, accumulator) => ({
index: accumulator.index + 1,
open: if accumulator.index == 0 then r._value else accumulator.open,
close: r._value,
high: if accumulator.index == 0 then r._value else if r._value > accumulator.high then r._value else accumulator.high,
low: if accumulator.index == 0 then r._value else if r._value < accumulator.low then r._value else accumulator.low
})
)
|> drop(columns: ["index"])
data = from(bucket: "dcoin")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))
|> aggregateWindow(every: v.windowPeriod, fn: openCloseHighLow)
While it does take more code, this query will be MUCH more efficient.