How to pivot multiple aggregate fields

Hello,
How can I get multiple aggregate values in one table as result? This is my query, it returns hundreds of tables and is relatively slow (3-4 seconds):

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")
  |> window(every: v.windowPeriod, createEmpty: false)

data_min = data
  |> min()
  |> yield(name: "low")

data_max = data
  |> max()
  |> yield(name: "high")

data_first = data
  |> first()
  |> yield(name: "open")

data_last = data
  |> last()
  |> yield(name: "close")

Does the speed of the query improve if you change |> window() to |> aggregateWindow() and then put in an aggregation period of say, 20s?

|> aggregateWindow(every: 20s, fn: mean)

Yes, it is a lot faster with aggregateWindow. I have 4 different aggregations, resulting in 4 tables. Is this the proper way?

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: first, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""first"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: max, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""max"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: min, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""min"")

from(bucket:""ticks"")
    |> range(start: {from.ToUnixTimeSeconds()}, stop: {to.ToUnixTimeSeconds()})
    |> filter(fn: (r) => r[""_measurement""] == ""tick"")
    |> filter(fn: (r) => r[""_field""] == ""bid"")
    |> filter(fn: (r) => r[""symbol""] == ""{symbol}"")
    |> aggregateWindow(every: {duration}, fn: last, createEmpty: false, timeSrc: ""_start"")
    |> yield(name: ""last"")

@mgeorgiev1-zen To make this a little less verbose you can do this:

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: v.windowPeriod, fn: min)
  |> yield(name: "low")

data_max = data
  |> aggregateWindow(every: v.windowPeriod, fn: max)
  |> yield(name: "high")

data_first = data
  |> aggregateWindow(every: v.windowPeriod, fn: first)
  |> yield(name: "open")

data_last = data
  |> aggregateWindow(every: v.windowPeriod, fn: last)
  |> yield(name: "close")

It depends on how exactly you need the data structured. If you’re visualizing it an InfluxDB dashboard or Grafana, then this is the structure you’ll want. If you need to, you can pivot all the different values into individual columns. If you’re going to do this, you’re going to want to rename the field for each aggregation, union all the tables together, then pivot the fields into columns:

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: v.windowPeriod, fn: min)
  |> set(key: "_field", value: "low")

data_max = data
  |> aggregateWindow(every: v.windowPeriod, fn: max)
  |> set(key: "_field", value: "high")

data_first = data
  |> aggregateWindow(every: v.windowPeriod, fn: first)
  |> set(key: "_field", value: "open")

data_last = data
  |> aggregateWindow(every: v.windowPeriod, fn: last)
  |> set(key: "_field", value: "close")

union(tables: [data_min, data_max, data_first, data_last])
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
2 Likes

Hi Scott, thanks for the reply. I need to return the results as objects with pivoted columns. I tried your second suggestion and the query takes 6 seconds to execute (range - past 7 days). Would there be a way to speed it up?

Unfortunately, pivot() is required here and does require a little bit of heavy lifting. The more dense your data, the longer the pivot will take. You could reduce the density of your data by half by modifying the v.windowPeriod duration with date.scale():

import "date"

windowPeriod = date.scale(d: v.windowPeriod, n: 2)

data = from(bucket: "ticks")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "tick")
  |> filter(fn: (r) => r["_field"] == "bid")
  |> filter(fn: (r) => r["symbol"] == "EURUSD")

data_min = data
  |> aggregateWindow(every: windowPeriod, fn: min)
  |> set(key: "_field", value: "low")

data_max = data
  |> aggregateWindow(every: windowPeriod, fn: max)
  |> set(key: "_field", value: "high")

data_first = data
  |> aggregateWindow(every: windowPeriod, fn: first)
  |> set(key: "_field", value: "open")

data_last = data
  |> aggregateWindow(every: windowPeriod, fn: last)
  |> set(key: "_field", value: "close")

union(tables: [data_min, data_max, data_first, data_last])
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

Also, what version of InfluxDB are you using?

I am using version 2.6.1. What I am trying to accomplish is store financial ticks history and was thinking of using separate buckets with aggregations. Is it possible to query a bucket with aggregated data (e.g.1 hour range) and also append the latest live data, does influxDb support this? (real-time buckets)

P.S. I ran the query and the execution time is 5-6 seconds.

Oh, ok. If you’re going to write the aggregated data back to InfluxDB, you don’t need to pivot it. That should save you some compute time.

Yes, totally possible. There’s a thread about this here: Flux query over multiple downsampled buckets with different retention policies

Thanks, I went through the thread. I have a case, when the selected time range is in the past, it should query only the aggregated bucket, and when the range is until now, it should query both buckets - the aggregated and the “live date” bucket. But I don’t know how to implement the conditions:

if rangeStop > latestAggrData.time then union with livedata(start: latestAggrData.time, stop: now())

@mgeorgiev1-zen This specific behavior can be tricky and I don’t know that it can be optimized all that much, but it’s worth a try.

import "date"

rangeStart = date.time(v: -24h)
rangeStop = now()
lastAggrTime = date.time(v: -1h)

getData = () => {
    _historicalData = () => from(bucket: "historical-bucket") |> range(start: rangeStart, stop: rangeStop)
    _liveData = () => from(bucket: "live-bucket") |> range(start: rangeStart, stop: lastAggrTime)
    _output = if rangeStop >= lastAggrTime then union(tables: [_historicalData(), _liveData()]) else _historicalData()

    return _output
}

getData()
    |> filter(...)
    |> ...

One question here is how do you get the time of the latest aggregation. Is that something you store somewhere that you can query? If you’re running the aggregations as a task, you could query the _tasks system bucket to get more information about the last task execution. Something like:

getLatestTaskStart = (taskID) => {
    _lastTaskRun = from(bucket: "_tasks")
        |> range(start: -24h)
        |> filter(fn: (r) => r["taskID"] == taskID)
        |> filter(fn: (r) => r["_field"] == "scheduledFor")
        |> last()
        |> findRecord(fn: (key) => true, idx: 0)

    return _lastTaskRun._time
}

I actually get the time of the last record, like this:

bucketLastTime = (data) => {
  lastTime = data
    |> last()
    |> keep(columns: ["_time"])
    |> tableFind(fn: (key) => true)
    |> getRecord(idx: 0)
  return lastTime._time
}
1 Like