How to pivot multiple aggregate fields

Hello,
How can I get multiple aggregate values in one table as result? This is my query, it returns hundreds of tables and is relatively slow (3-4 seconds):

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")
  |> window(every: v.windowPeriod, createEmpty: false)

data_min = data
  |> min()
  |> yield(name: "low")

data_max = data
  |> max()
  |> yield(name: "high")

data_first = data
  |> first()
  |> yield(name: "open")

data_last = data
  |> last()
  |> yield(name: "close")

Does the speed of the query improve if you change |> window() to |> aggregateWindow() and then put in an aggregation period of say, 20s?

|> aggregateWindow(every: 20s, fn: mean)

Yes, it is a lot faster with aggregateWindow. I have 4 different aggregations, resulting in 4 tables. Is this the proper way?

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: first, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""first"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: max, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""max"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: min, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""min"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: last, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""last"")

@mgeorgiev1-zen To make this a little less verbose you can do this:

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: v.windowPeriod, fn: min)
  |> yield(name: "low")

data_max = data
  |> aggregateWindow(every: v.windowPeriod, fn: max)
  |> yield(name: "high")

data_first = data
  |> aggregateWindow(every: v.windowPeriod, fn: first)
  |> yield(name: "open")

data_last = data
  |> aggregateWindow(every: v.windowPeriod, fn: last)
  |> yield(name: "close")

It depends on how exactly you need the data structured. If you’re visualizing it an InfluxDB dashboard or Grafana, then this is the structure you’ll want. If you need to, you can pivot all the different values into individual columns. If you’re going to do this, you’re going to want to rename the field for each aggregation, union all the tables together, then pivot the fields into columns:

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: v.windowPeriod, fn: min)
  |> set(key: "_field", value: "low")

data_max = data
  |> aggregateWindow(every: v.windowPeriod, fn: max)
  |> set(key: "_field", value: "high")

data_first = data
  |> aggregateWindow(every: v.windowPeriod, fn: first)
  |> set(key: "_field", value: "open")

data_last = data
  |> aggregateWindow(every: v.windowPeriod, fn: last)
  |> set(key: "_field", value: "close")

union(tables: [data_min, data_max, data_first, data_last])
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
2 Likes

Hi Scott, thanks for the reply. I need to return the results as objects with pivoted columns. I tried your second suggestion and the query takes 6 seconds to execute (range - past 7 days). Would there be a way to speed it up?

Unfortunately, pivot() is required here and does require a little bit of heavy lifting. The more dense your data, the longer the pivot will take. You could reduce the density of your data by half by modifying the v.windowPeriod duration with date.scale():

import "date"

windowPeriod = date.scale(d: v.windowPeriod, n: 2)

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: windowPeriod, fn: min)
  |> set(key: "_field", value: "low")

data_max = data
  |> aggregateWindow(every: windowPeriod, fn: max)
  |> set(key: "_field", value: "high")

data_first = data
  |> aggregateWindow(every: windowPeriod, fn: first)
  |> set(key: "_field", value: "open")

data_last = data
  |> aggregateWindow(every: windowPeriod, fn: last)
  |> set(key: "_field", value: "close")

union(tables: [data_min, data_max, data_first, data_last])
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

Also, what version of InfluxDB are you using?

I am using version 2.6.1. What I am trying to accomplish is store financial ticks history and was thinking of using separate buckets with aggregations. Is it possible to query a bucket with aggregated data (e.g.1 hour range) and also append the latest live data, does influxDb support this? (real-time buckets)

P.S. I ran the query and the execution time is 5-6 seconds.

Oh, ok. If you’re going to write the aggregated data back to InfluxDB, you don’t need to pivot it. That should save you some compute time.

Yes, totally possible. There’s a thread about this here: Flux query over multiple downsampled buckets with different retention policies