Advice on aggregating to ohlc data

I have a problem that I think is quite common, and probably very simple, but I’m new to Influx so would like your input.
I have data that looks like this:

_time,                           _value, _field,  _measurement, asset
2020-03-04T20:54:12, 300,      price,   tick_data,          AAPL

I have many points of data every minute, so I’d like to aggregate to 1min bars (for example) and insert this in to a new bucket called “minuteData”. I think this would be executed once a minute through a “task”.

A few questions:

  1. Is that a reasonable thing to do, or is it better to just keep the tick data as is and do the aggregating in the query when reading the data?
  2. How would you suggest I make the insert to a new bucket? I have been messing around with join, to join the four aggregates first, max, min, last, but I’m not sure that’s the best way. Something similar to (removed min and max from example):

open = from(bucket: “myBucket”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® => r._measurement == “tick_data”)
|> aggregateWindow(every: 1m, fn: first)
|> yield(name: “open”)

close = from(bucket: “myBucket”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® => r._measurement == “tick_data”)
|> aggregateWindow(every: 1m, fn: last)
|> yield(name: “close”)

join(
tables: {open, close},
on: ["_time", “_stop”, “_start”, “_measurement”, “_field”, “asset”]
)

How would you handle this task? I’m I on the right track?

Thanks!

An update, I got it working as a task, but instead of joining the tables I unioned them. So instead of creating new columns “_value_open” and “_value_close”, which the join would create, I set the _field to “open” or “close” instead:

option task = {name: “taskName”, every: 1m}

open = from(bucket: “tickData”)
|> range(start: -1h)
|> filter(fn: ® =>
(r._measurement == “tick_data”))
|> aggregateWindow(every: 1m, fn: first)
|> map(fn: ® =>
({r with _field: “open”}))
|> yield(name: “first”)
close = from(bucket: “tickData”)
|> range(start: -1h)
|> filter(fn: ® =>
(r._measurement == “tick_data”))
|> aggregateWindow(every: 1m, fn: last)
|> map(fn: ® =>
({r with _field: “close”}))
|> yield(name: “last”)

union(tables: [open, close])
|> to(bucket: “minuteData”, org: “myOrg”)

What do you think about that approach?

Hey, I think using custom aggregation functions like it it is described could work :

Hi there.

Were you able to do it another way or did you keep it that way? IMHO, an union is really expensive and it uses only two tables.