Hi all, I know there are quite a few threads on transforming stock data in different ways, I’ve written a few myself, but here’s a topic I cant find any proper info on.
I have 1 minute candles in the form of:
Tag: Symbol (AAPL, for example) _Fields: O, H, L C, V (Open, high, low, close and volume)
Super standard. Now, if I would like 5min bars or perhaps daily bars, what would be the best way to do that?
I have found two ways, but it’s quite a bit of code for a very simple operation. The two ways also seem quite similar to me in how they work, but they perform very differently.
So, my question, 1: is this a reasonable way to do this, or is it a MUCH simpler and faster way? And 2: Why is the performance so different for these two? The first example takes half or even a third of the time to run, according to my small tests.
First example:
OPEN = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
|> filter(fn: (r) => r._field == "o")
|> window(every: 5m)
|> first()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> drop(columns: ["_start", "_stop", "_measurement", "_field"])
HIGH = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
|> filter(fn: (r) => r._field == "h")
|> window(every: 5m)
|> max()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> drop(columns: ["_start", "_stop", "_measurement", "_field"])
OH = join(tables: {o: OPEN, h: HIGH}, on: ["_time", "ticker"])
LOW = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
|> filter(fn: (r) => r._field == "l")
|> window(every: 5m)
|> min()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> drop(columns: ["_start", "_stop", "_measurement", "_field"])
CLOSE = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
|> filter(fn: (r) => r._field == "c")
|> window(every: 5m)
|> last()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> drop(columns: ["_start", "_stop", "_measurement", "_field"])
LC = join(tables: {l: LOW, c: CLOSE}, on: ["_time", "ticker"])
VOLUME = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
|> filter(fn: (r) => r._field == "v")
|> window(every: 5m)
|> sum()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> drop(columns: ["_start", "_stop", "_measurement", "_field"])
LCV = join(tables: {LC: LC, VOLUME: VOLUME}, on: ["_time", "ticker"])
join(tables: {OH: OH, LCV: LCV}, on: ["_time", "ticker"])
|> group(columns:["ticker"])
|> rename(columns: {_value: "v", _value_c: "c", _value_h:"h", _value_l:"l", _value_o:"o"})
Second example:
data = from(bucket:"stock_data_collector_v2")
|> range(start: 2022-09-21T13:30:00.000Z, stop: 2022-09-21T20:00:00.000Z)
|> filter(fn: (r) => r["ticker"] == "F" or r["ticker"] == "AAPL" or r["ticker"] == "MSFT" or r["ticker"] == "AA")
open = data |> filter(fn: (r) => r._field == "o") |> aggregateWindow(every: 5m, fn: first) |> drop(columns: ["_start", "_stop", "_measurement", "_field"])
close = data |> filter(fn: (r) => r._field == "c") |> aggregateWindow(every: 5m, fn: last) |> drop(columns: ["_start", "_stop", "_measurement", "_field"])
low = data |> filter(fn: (r) => r._field == "l") |> aggregateWindow(every: 5m, fn: min) |> drop(columns: ["_start", "_stop", "_measurement", "_field"])
high = data |> filter(fn: (r) => r._field == "h") |> aggregateWindow(every: 5m, fn: max) |> drop(columns: ["_start", "_stop", "_measurement", "_field"])
volume = data |> filter(fn: (r) => r._field == "v") |> aggregateWindow(every: 5m, fn: sum) |> drop(columns: ["_start", "_stop", "_measurement", "_field"])
OC = join(tables: {o: open, c: close}, on: ["_time", "ticker"])
HL = join(tables: {h: high, l: low}, on: ["_time", "ticker"])
OCLH = join(tables: {OC: OC, HL: HL}, on: ["_time", "ticker"])
join(tables: {OCLH: OCLH, v: volume}, on: ["_time", "ticker"])
|> group(columns:["ticker"])
|> rename(columns: {_value: "v", _value_c: "c", _value_h:"h", _value_l:"l", _value_o:"o"})