Hi guys, I’m new to influx…
I try to get some kind of candle stick pattern with Influx.
I have wrote some code and it looks nice, also get “high”, “low”, “open”, “close” and it’s kind of “aggregated” via the window function with time and period. Let’s say 3min candles.
But I just need to query the “last fully closed candle” - how can I do this - as “last()” will include the currently active candle that is not finishes as 3min window is not completed.
what can I do?
That’s the code - someone having an idea? (I hardcoded start to make sure the window is always starting at a round clock time)
open = from(bucket: "trading")
|> range(start: 2022-12-03T20:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "quote")
|> filter(fn: (r) => r["_field"] == "entryPriceBUY")
|> window(every: 3m, period: 3m)
|> first()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> set(key: "tag", value: "open")
|> group(columns: ["tag"])
close = from(bucket: "trading")
|> range(start: 2022-12-03T20:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "quote")
|> filter(fn: (r) => r["_field"] == "entryPriceBUY")
|> window(every: 3m, period: 3m)
|> last()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> set(key: "tag", value: "close")
|> group(columns: ["tag"])
low = from(bucket: "trading")
|> range(start: 2022-12-03T20:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "quote")
|> filter(fn: (r) => r["_field"] == "entryPriceBUY")
|> window(every: 3m, period: 3m)
|> min()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> set(key: "tag", value: "low")
|> group(columns: ["tag"])
high = from(bucket: "trading")
|> range(start: 2022-12-03T20:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "quote")
|> filter(fn: (r) => r["_field"] == "entryPriceBUY")
|> window(every: 3m, period: 3m)
|> max()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
|> set(key: "tag", value: "high")
|> group(columns: ["tag"])
union(tables: [high,low, open, close])
|> last()
Without “last()” I get good data (see screenshot)
Edit: seems like this make the trick - but not sure if it’s correct or not… just observing the values…
...
union(tables: [high,low, open, close])
|> range(start: -6m)
|> last()
1 Like
@Richtation ,
Thanks for sharing your solution!
For future reference to anyone else inquiring there are these two examples of candlestick functions
opened 06:50PM - 20 Aug 21 UTC
closed 01:34AM - 26 Aug 23 UTC
new function
no-issue-activity
An open-high-low-close chart (also OHLC) is a type of chart typically used to il… lustrate movements in the price of a financial instrument over time. Each vertical line on the chart shows the price range (the highest and lowest prices) over one unit of time, e.g., one day or one hour.
Create a function within Flux to produce this output.
Example:
```
ohlc = (tables=<-) =>
tables
|> reduce(
identity: {
total: 0.0,
high: 0.0,
low: 0.0,
close: 0.0
},
fn: (r, accumulator) => ({
total: accumulator.total + r._value,
high: if accumulator.total + r._value > accumulator.high then accumulator.total + r._value else accumulator.high,
low: if accumulator.total + r._value < accumulator.low then accumulator.total + r._value else accumulator.low,
close: accumulator.close + r._value,
})
)
```
@lexchou You have a few options. You can only join two streams at a time, but you can do multiple joins:
data = from(bucket: "dcoin")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "btcusdt" and (r._field == "price"))
open = data |> aggregateWindow(every: v.windowPeriod, fn: first, createEmpty: false)
high = data |> aggregateWindow(every: v.windowPeriod, fn: max, createEmpty: false)
low = data |> aggregateWindow(every: v.windowPeriod, fn:…
scott
December 9, 2022, 8:26pm
4
@Richtation It’s a little hacky, but it should work. You can use date.truncate()
to truncate your range()
s stop
value to the same duration as your window. So you just won’t query that last window. Try this:
import "date"
windowInterval = 3m
data = () =>
from(bucket: "trading")
|> range(
start: 2022-12-03T20:00:00Z,
stop: date.truncate(unit: windowInterval, t: now())
)
|> filter(fn: (r) => r["_measurement"] == "quote")
|> filter(fn: (r) => r["_field"] == "entryPriceBUY")
open = data()
|> aggregateWindow(every: windowInterval, fn: first)
|> set(key: "tag", value: "open")
close = data()
|> aggregateWindow(every: windowInterval, fn: last)
|> set(key: "tag", value: "close")
low = data()
|> aggregateWindow(every: windowInterval, fn: min)
|> set(key: "tag", value: "low")
high = data()
|> aggregateWindow(every: windowInterval, fn: max)
|> set(key: "tag", value: "high")
union(tables: [high,low, open, close])
|> group(columns: ["tag"])