Flux group by time

Hi,

I am trying to downsample data to get candlesticks in a flux script. The data is simple and its just t_time, price and i would like the output to be _time, open, high,low,close sampled by minute.
Its odd to me since it doesn’t look like there is group db time functionality in flux. Does anyone know if this is possible of if there is a work around to this. If i could down sample the time column and join it with the data even that would work.

Thanks,

Mark

@Mark_Best This is what window() is for. It creates windows of time at a specified duration. So if you want to group points into 1 minute windows, you’d do:

data
  |> window(every: 1m)

I’d recommend using aggregateWindow() to both window and aggregate your data.

Lucky for you, I actually wrote a custom candlestick() function a while back which looks like it’ll be perfect for you:

candlestick = (tables=<-) => tables
  |> reduce(
    fn: (r, accumulator) => ({
      index: accumulator.index + 1,
      open: if accumulator.index == 0 then r._value else accumulator.open,
      close: r._value,
      high: if accumulator.index == 0 then r._value else if r._value > accumulator.high then r._value else accumulator.high,
      low: if accumulator.index == 0 then r._value else if r._value < accumulator.low then r._value else accumulator.low
    }),
    identity: { index: 0, open: 0.0, close: 0.0, high: 0.0, low: 0.0 }
  )
  |> drop(columns: ["index"])

// --- Adjust below as necessary ---
from(bucket: "bucketName")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "measurementName" and r._field == "price")
  |> aggregateWindow(
    every: 1m,
    fn: (tables=<-, column) => tables |> candlestick()
  )

yes, this is exactly what i was looking for. You also saw ahead what i was trying to do. I am really new to flux so i’m going to have to look a bit more into the syntax but your reply has sent me in the right direction.

@Mark_Best This is taking you right into the deep end in some intermediate-to-advanced scripting, but I essentially create a custom aggregate function that outputs the open, close, high, and low for each input table. The Custom aggregate functions doc walks through how reduce() works. I then use the custom function to aggregate all the time-based groups generated from window() inside of aggregateWindow().

That’s a brief synopsis. If you have specific questions about what it’s doing, let me know. I’m happy to answer them.

@Scott Thank’s for your script!

Right now I need it to use this Grafana plugin:

Your script is wonderful but I don’t understand how I can divide the generated fields and extract them as this plugin needs:

open : SELECT first("open") FROM "ticker" WHERE ("market" =~ /^$market$/ AND "pair" =~ /^$pair$/) AND $timeFilter GROUP BY time($__interval) fill(linear)

close : SELECT last("close") FROM "ticker" WHERE ("market" =~ /^$market$/ AND "pair" =~ /^$pair$/) AND $timeFilter GROUP BY time($__interval) fill(linear)

low : SELECT min("low") FROM "ticker" WHERE ("market" =~ /^$market$/ AND "pair" =~ /^$pair$/) AND $timeFilter GROUP BY time($__interval) fill(linear)

high : SELECT max("high") FROM "ticker" WHERE ("market" =~ /^$market$/ AND "pair" =~ /^$pair$/) AND $timeFilter GROUP BY time($__interval) fill(linear)

volume : SELECT max("volume") FROM "ticker" WHERE ("market" =~ /^$market$/ AND "pair" =~ /^$pair$/) AND $timeFilter GROUP BY time($__interval) fill(linear)

Can you please help me?

Thanks,

Davide

@DavideCampagna I believe this will get you what you need. My only unknown is how to properly load Grafana variables into a Flux query.

As a brief explanation, I use the Flux regexp.compile() to compile a string containing your Grafana variable into a usable regular expression. I then define a base data set which is then used in other variables specific to each field and aggregation method. I then union all of those streams of tables back into a single stream of tables (without pivoting). This should give you the data structure Grafana is expecting.

import "regexp"
import "interpolate"

marketRegex = regexp.compile(v: "/^${market}/")
pairRegex = regexp.compile(v: "/^${pair}/")

data = from(bucket: "example-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "ticker")
  |> filter(fn: (r) => r.market =~ marketRegex)
  |> filter(fn: (r) => r.pair =~ pairRegex)

open = data |> filter(fn: (r) => r._field == "open") |> aggregateWindow(every: v.windowPeriod, fn: first) |> interpolate.linear(every: v.windowPeriod)
close = data |> filter(fn: (r) => r._field == "close") |> aggregateWindow(every: v.windowPeriod, fn: last) |> interpolate.linear(every: v.windowPeriod)
low = data |> filter(fn: (r) => r._field == "low") |> aggregateWindow(every: v.windowPeriod, fn: min) |> interpolate.linear(every: v.windowPeriod)
high = data |> filter(fn: (r) => r._field == "high") |> aggregateWindow(every: v.windowPeriod, fn: max) |> interpolate.linear(every: v.windowPeriod)
volume = data |> |> filter(fn: (r) => r._field == "volume") aggregateWindow(every: v.windowPeriod, fn: max) |> interpolate.linear(every: v.windowPeriod)

union(tables: [open, close, low, high, volume])
1 Like

(Edit: I meant to reply to the candlestick comment)

Hey @scott,
First off, thanks a ton! I found this really useful!
It’s been a couple of years since you posted this. I wanted to reach out and see if anything had changed since and if this is still the most efficient approach.
Thanks!!

@dmill I believe this is still the best approach to this problem.