Query summary data of 1 minute ohlcv candlesticks in other intervals

Hi guys,
I am trying to store ohlcv data obtained from binance api.
I am new to fluxdb and would appreciate some help.
The method I use sends me, already calculated the candles with the following information (timestamp, open, high, low, close, volume) in the interval of 1 minute.
Once this information is stored, I would like to be able to query this information but in the context of a larger interval, for example 5 minutes, 1 hour, 4 hours, etc.
I have been able to see in other messages here how that query would be constructed if what we had stored were stock trades, but I have precalculated 1 minute candlesticks.
My configuration is as follows:

The measurement is called “ohlcv”
I have a “pair” tag that identifies the currency pair.
In that bucket I have the 1 minute candlesticks of all currency pairs.
Each row has the fields:

  • open
  • high
  • low
  • close
  • volume

Suppose I want to query the candlesticks of an interval of 5m.
Conceptually I know that to obtain the equivalent candlestick opening in an interval of 5m I have to:

  • In the case of the “open” field I have to select candlestick within that interval (the first candlestick of the five).
  • In the case of the “close” field I have to select the “closing” field of the last candlestick (the last candlestick of the 5).
  • In the case of the “high” field I have to select the “high” field of the candlestick in which the high field reached a higher price.
  • In the case of the “low” field, I have to select the “low” field of the candlestick in which the “low” field reached a lower price.
  • Finally, in the case of the “volume” field I have to calculate the sum of all the values ​​of the “volume” field of all the candlesticks in the interval.
    Thank you so much.

The fact that you get pre-aggregate data does not change anything syntactically.

You are probably after a flux query, but I’m not quick with those so I’ll use InfluxQL instead, here is what you described:

SELECT
	 sum(volume) as volume
	,min(low) as low
	,max(high) as high
	,first(open) as open
	,last(close) as close
FROM db.rp.measurement
WHERE time = now() -6h
GROUP BY
	time(5m)
	,pair

what allows you to define the time range for the data to query is WHERE time = ... while for the time windows you have the GROUP BY time(5m), which translate to flux with the functions |> range(start: -6h) and |> window(every: 5m)

by using those you will get exactly what you are asking for, first/last/min/etc of the given data for the defined time window. (ie: the min value of field “low” for each time window of 5 minutes)

if it’s your first time with flux you can have a look at the docs here

Thanks @Giovanni_Luisotto. But…
What would be the equivalent query in flux?

I have tried this and came very close to getting it. But strangely it doesn’t work in the case of volume (sum operation) and I don’t know why.

close=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
 |> filter(fn: (r) => r["_field"] == "close")
    |> aggregateWindow(
  every: 5m,
  fn: last,
  column: "_value",
  timeSrc: "_stop",
  timeDst: "_time",
  createEmpty: true
)

open=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
 |> filter(fn: (r) => r["_field"] == "open")
    |> aggregateWindow(
  every: 5m,
  fn: first,
  column: "_value",
  timeSrc: "_stop",
  timeDst: "_time",
  createEmpty: true
)

high=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
 |> filter(fn: (r) => r["_field"] == "high")
    |> aggregateWindow(
  every: 5m,
  fn: max,
  column: "_value",
  timeSrc: "_stop",
  timeDst: "_time",
  createEmpty: true
)

low=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
 |> filter(fn: (r) => r["_field"] == "low")
    |> aggregateWindow(
  every: 5m,
  fn: min,
  column: "_value",
  timeSrc: "_stop",
  timeDst: "_time",
  createEmpty: true
)


volume=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
 |> filter(fn: (r) => r["_field"] == "volume")
    |> aggregateWindow(
  every: 5m,
 column: "_value",
  fn: sum, // fails
 
  timeSrc: "_stop",
  timeDst: "_time",
  createEmpty: true
)

final=union(tables: [open, close,high,low,volume])
 |> pivot(
    rowKey:["_time"],
    columnKey: ["_field"],
    valueColumn: "_value")
    |> yield(name: "final")

I have tried this and came very close to getting it. But strangely it doesn’t work in the case of volume (sum operation) and I don’t know why.
This code doesn’t work but if I delete the volume part it works perfectly and returns what I expected it to return

I have also tried the following with no success.

close=from(bucket: "bucket_prueba_123")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> group(columns:["pair"])
  |> filter(fn: (r) => r["_measurement"] == "ohlc")
  |> window(every: 5m)
  |> reduce(fn: (r, accumulator) => ({
  
  
      indexOpen: 
      if (r._field=="open") then 
       accumulator.indexOpen+1 
       else 
       accumulator.indexOpen,
        open: 
      if (r._field=="open") then 
        if (accumulator.indexOpen==0) then 
          r._value 
        else 
          accumulator.open
      else
        accumulator.open  
    ,
    
    
      high:
       if (r._field=="high") then  
          if(r._value>accumulator.high ) then
            r._value
          else
            accumulator.high 
      else 
        accumulator.high
   ,
    low: 
      if (r._field=="low") then  
          if(r._value<accumulator.low ) then
            r._value
          else
           accumulator.low 
      else 
        accumulator.low,

             close: 
       if (r._field=="close") then 
          r._value 
      else 
        accumulator.close,
             volume: 
        if (r._field=="volume") then
          r._value+accumulator.volume 
          else
           accumulator.volume
        
  
    }),
    identity: {indexOpen:-1,open: 0.0,high: 0.0,low: 0.0,close: 0.0,volume: 0.0})
    |> yield(name: "final")

I got it! Thank You. If it could be helpful to someone here is the code.
If there is a more efficient way to do the same I would appreciate a comment.

close=from(bucket: "bucket_prueba_123")
  |> range(start: -30d)
  |> group(columns:["pair"])
  |> filter(fn: (r) => r["_measurement"] == "ohlc")
  |> window(every: 5m)
  |> reduce(fn: (r, accumulator) => ({
  
      indexLow:
        if (r._field=="low") then 
          accumulator.indexLow+1 
        else
        accumulator.indexLow,
      indexOpen: 
      if (r._field=="open") then 
       accumulator.indexOpen+1 
       else 
       accumulator.indexOpen,
        open: 
      if (r._field=="open") then 
        if (accumulator.indexOpen==0) then 
          r._value 
        else 
          accumulator.open
      else
        accumulator.open  
    ,
    
    
      high:
       if (r._field=="high") then  
          if(r._value>accumulator.high ) then
            r._value
          else
            accumulator.high 
      else 
        accumulator.high
   ,
    low: 
      if (r._field=="low") then

          if(r._value<accumulator.low or accumulator.indexLow==0.0) then
            r._value
          else
           accumulator.low 
      else 
        accumulator.low,

             close: 
       if (r._field=="close") then 
          r._value 
      else 
        accumulator.close,
             volume: 
        if (r._field=="volume") then
          r._value+accumulator.volume 
          else
           accumulator.volume
        
  
    }),
    identity: {indexLow:0,indexOpen:0,open: 0.0,high: 0.0,low: 0.0,close: 0.0,volume: 0.0})
    |> drop(columns: ["indexOpen","indexLow"])
  |> group(columns:["pair"])
    |> yield(name: "candle")
2 Likes

I was able to get resampling to work without any issues this way. It is much faster than reduce route as well…

Hello,
I don’t know if it would help someone but here’s a query sample (without needs of “Index”):
|> aggregateWindow(
every: duration(v: params.interval),
fn: (column, tables=<-) => tables
|> reduce(
fn: (r, accumulator) => ({
open:
if (accumulator.open==0.0) then
r[“open”]
else accumulator.open,
close:
r[“close”],
high:
if (accumulator.high<r[“high”]) then
r[“high”]
else accumulator.high,
low:
if (accumulator.low==0.0) then
r[“low”]
else if (accumulator.low>r[“low”]) then
r[“low”]
else accumulator.low,
}),
identity: {open: 0.0, high: 0.0, low: 0.0, close: 0.0}
),
)
|> filter(fn: (r) => r[“open”] != 0 or r[“high”] != 0 or r[“low”] != 0 or r[“close”] != 0)

Regards