Aggregate to multiple results from one input

My crawler crawled a lot of bitcoin data, I need to generate open/high/low/close from the input stream, I tried the:

data = from(bucket: "dcoin")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))

open = data |> aggregateWindow(every: v.windowPeriod, fn: first, createEmpty: false)
high = data |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
low = data |> aggregateWindow(every: v.windowPeriod, fn: min, createEmpty: false)
close = data |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)

join(tables: {open: open, high: high, low: low, close: close}, on: ["_time"])

But it reports:

joins currently must only have two parents

Then I tried another way:

from(bucket: "dcoin")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))
  |> aggregateWindow(every: v.windowPeriod, fn: (column, tables=<-) => ({
  open : tables |> first(), 
  high : tables |> max(),
  low : tables |> min(),
  close : tables |> last()
}), createEmpty: false)

It’s still not working:

expected [A] but found {open:[B], low:[B], high:[B], close:[B]} for return type (argument fn)

How can I do this in Flux without using InfluxQL?

Thanks in advance!

@lexchou You have a few options. You can only join two streams at a time, but you can do multiple joins:

data = from(bucket: "dcoin")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))

open = data |> aggregateWindow(every: v.windowPeriod, fn: first, createEmpty: false)
high = data |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
low = data |> aggregateWindow(every: v.windowPeriod, fn: min, createEmpty: false)
close = data |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)

open-close = join(tables: {open: open, close: close}, on: ["_time"])
high-low = join(tables: {high: high, low: low}, on: ["_time"])

join(tables: {oc: open-close, hl: high-low}, on: ["_time"])

BUT, this is a very inefficient way to do this. The better way would be to write a custom aggregate function using reduce().

This example below defines a custom function called openCloseHighLow and uses that to do the aggregation:

openCloseHighLow = (tables=<-, column) =>
  tables
    |> reduce(
      identity: { index: 0, open: 0.0, close: 0.0, high: 0.0, low: 0.0 },
      fn: (r, accumulator) => ({
        index: accumulator.index + 1,
        open: if accumulator.index == 0 then r._value else accumulator.open,
        close: r._value,
        high:  if accumulator.index == 0 then r._value else if r._value > accumulator.high then r._value else accumulator.high,
        low:  if accumulator.index == 0 then r._value else if r._value < accumulator.low then r._value else accumulator.low
      })
    )
    |> drop(columns: ["index"])

data = from(bucket: "dcoin")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))
  |> aggregateWindow(every: v.windowPeriod, fn: openCloseHighLow)

While it does take more code, this query will be MUCH more efficient.