Is there a way a task could iterate over measurements and aggregate data into another bucket?

Hey there!

I’m using a bucket for collecting tick data for multiple symbols in Binance (e.g. ETH/BTC and BNB/BTC) and storing on different measurements (binance_ethbtc and binance_bnbbtc respectively) and that’s working fine. Other than that, I’d like to make aggregations of OHLC data into another bucket, just like this guy here. I’ve already managed to write Flux code for aggregating this data for a single measurement but then it got me wondering: do I need to write a task for EVERY measurement I have? Isn’t there a way of iterating over measurements in a bucket and aggregating the data into another one?

What does your aggregation flux query look like currently?

The data in influxdb should already be grouped by measurement - I wonder if your aggregation code is confusing that. Anyways if you can post your code in a code </> block, either I someone else can probably help point you in the right direction.

Yeah I completely forgot about that. I got the base code from the guy I’ve linked previously, so it looks just like his:

from(bucket: "algotrader")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "binance_ethbtc")
  |> filter(fn: (r) => r["_field"] == "price")
  |> aggregateWindow(every: 1m, fn: first, createEmpty: false)
  |> yield(name: "open")

This is actually only for open (O in OHLC), I’m using similar ones to high (max), low (min), close (last) and volume (sum on _field == “amount”).

You’re telling me that if I remove that filter for measurement, I’ll get what I want?

Try adding _measurement into the yield, so you can still tie that open column back to the source in your new bucket.

And yeh try removing the filter. If the results still don’t look right try changing the grouping

The idea is that instead of returning a single series, you’ll get back multiple series. So instead of 50+ tasks per stat per measurement , you should end up with a task per stat, and it calculates it for all the measurements found in that time period.

1 Like

I’ve tried the removal of the measurement filter, and it just worked! This is awesome, thank you very much! For anyone that ends up here, my query looks like this now:

from(bucket: "algotrader")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_field"] == "price")
  |> aggregateWindow(every: 1m, fn: first, createEmpty: false)
  |> yield(name: "open")

It’s a very easy solution, but I’m getting started with InfluxDB today and I’m struggling with basic things. Thanks again!

1 Like