Hi guys,
I am trying to store ohlcv data obtained from binance api.
I am new to fluxdb and would appreciate some help.
The method I use sends me, already calculated the candles with the following information (timestamp, open, high, low, close, volume) in the interval of 1 minute.
Once this information is stored, I would like to be able to query this information but in the context of a larger interval, for example 5 minutes, 1 hour, 4 hours, etc.
I have been able to see in other messages here how that query would be constructed if what we had stored were stock trades, but I have precalculated 1 minute candlesticks.
My configuration is as follows:
The measurement is called “ohlcv”
I have a “pair” tag that identifies the currency pair.
In that bucket I have the 1 minute candlesticks of all currency pairs.
Each row has the fields:
- open
- high
- low
- close
- volume
Suppose I want to query the candlesticks of an interval of 5m.
Conceptually I know that to obtain the equivalent candlestick opening in an interval of 5m I have to:
- In the case of the “open” field I have to select candlestick within that interval (the first candlestick of the five).
- In the case of the “close” field I have to select the “closing” field of the last candlestick (the last candlestick of the 5).
- In the case of the “high” field I have to select the “high” field of the candlestick in which the high field reached a higher price.
- In the case of the “low” field, I have to select the “low” field of the candlestick in which the “low” field reached a lower price.
- Finally, in the case of the “volume” field I have to calculate the sum of all the values ​​of the “volume” field of all the candlesticks in the interval.
Thank you so much.
The fact that you get pre-aggregate data does not change anything syntactically.
You are probably after a flux query, but I’m not quick with those so I’ll use InfluxQL instead, here is what you described:
SELECT
sum(volume) as volume
,min(low) as low
,max(high) as high
,first(open) as open
,last(close) as close
FROM db.rp.measurement
WHERE time = now() -6h
GROUP BY
time(5m)
,pair
what allows you to define the time range for the data to query is WHERE time = ...
while for the time windows you have the GROUP BY time(5m)
, which translate to flux with the functions |> range(start: -6h)
and |> window(every: 5m)
by using those you will get exactly what you are asking for, first/last/min/etc of the given data for the defined time window. (ie: the min value of field “low” for each time window of 5 minutes)
if it’s your first time with flux you can have a look at the docs here
Thanks @Giovanni_Luisotto. But…
What would be the equivalent query in flux?
I have tried this and came very close to getting it. But strangely it doesn’t work in the case of volume (sum operation) and I don’t know why.
close=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_field"] == "close")
|> aggregateWindow(
every: 5m,
fn: last,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
open=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_field"] == "open")
|> aggregateWindow(
every: 5m,
fn: first,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
high=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_field"] == "high")
|> aggregateWindow(
every: 5m,
fn: max,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
low=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_field"] == "low")
|> aggregateWindow(
every: 5m,
fn: min,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
volume=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_field"] == "volume")
|> aggregateWindow(
every: 5m,
column: "_value",
fn: sum, // fails
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
final=union(tables: [open, close,high,low,volume])
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value")
|> yield(name: "final")
I have tried this and came very close to getting it. But strangely it doesn’t work in the case of volume (sum operation) and I don’t know why.
This code doesn’t work but if I delete the volume part it works perfectly and returns what I expected it to return
I have also tried the following with no success.
close=from(bucket: "bucket_prueba_123")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_measurement"] == "ohlc")
|> window(every: 5m)
|> reduce(fn: (r, accumulator) => ({
indexOpen:
if (r._field=="open") then
accumulator.indexOpen+1
else
accumulator.indexOpen,
open:
if (r._field=="open") then
if (accumulator.indexOpen==0) then
r._value
else
accumulator.open
else
accumulator.open
,
high:
if (r._field=="high") then
if(r._value>accumulator.high ) then
r._value
else
accumulator.high
else
accumulator.high
,
low:
if (r._field=="low") then
if(r._value<accumulator.low ) then
r._value
else
accumulator.low
else
accumulator.low,
close:
if (r._field=="close") then
r._value
else
accumulator.close,
volume:
if (r._field=="volume") then
r._value+accumulator.volume
else
accumulator.volume
}),
identity: {indexOpen:-1,open: 0.0,high: 0.0,low: 0.0,close: 0.0,volume: 0.0})
|> yield(name: "final")
I got it! Thank You. If it could be helpful to someone here is the code.
If there is a more efficient way to do the same I would appreciate a comment.
close=from(bucket: "bucket_prueba_123")
|> range(start: -30d)
|> group(columns:["pair"])
|> filter(fn: (r) => r["_measurement"] == "ohlc")
|> window(every: 5m)
|> reduce(fn: (r, accumulator) => ({
indexLow:
if (r._field=="low") then
accumulator.indexLow+1
else
accumulator.indexLow,
indexOpen:
if (r._field=="open") then
accumulator.indexOpen+1
else
accumulator.indexOpen,
open:
if (r._field=="open") then
if (accumulator.indexOpen==0) then
r._value
else
accumulator.open
else
accumulator.open
,
high:
if (r._field=="high") then
if(r._value>accumulator.high ) then
r._value
else
accumulator.high
else
accumulator.high
,
low:
if (r._field=="low") then
if(r._value<accumulator.low or accumulator.indexLow==0.0) then
r._value
else
accumulator.low
else
accumulator.low,
close:
if (r._field=="close") then
r._value
else
accumulator.close,
volume:
if (r._field=="volume") then
r._value+accumulator.volume
else
accumulator.volume
}),
identity: {indexLow:0,indexOpen:0,open: 0.0,high: 0.0,low: 0.0,close: 0.0,volume: 0.0})
|> drop(columns: ["indexOpen","indexLow"])
|> group(columns:["pair"])
|> yield(name: "candle")
2 Likes
I was able to get resampling to work without any issues this way. It is much faster than reduce
route as well…