Flux: aggregating without windowing

Hello Influx community, happy to be joining you today.

I’ve been playing with InfluxDB as a way of manipulating some daily timeseries data I work with (high dimensionality, but new data points get added only every 24h).
I’ve been able to write a few queries allowing me to get timeseries aggregated across various dimensions (tags), using aggregateWindow(every: 24h, sum), but it’s not performing as well as I’d like.
I’m thinking part of the cost must be going in the windowing, especially as my data has some gaps in places. Might it be possible to aggregate without windowing, i.e. all values with the exact same timestamp would be aggregated together?

What granularity of data are you working with?

Here’s a good overview of working with the aggregate window function

If you can’t change the new data writing in every 24 hours, the only other suggestion is to process the history once, and the only process the new data , storing the combined results in a new measurement or bucket .

Date.truncate might be helpful with what you want to do as well

https://docs.influxdata.com/flux/v0.65/stdlib/date/truncate/#sidebar

Also, always helps if you can post a specific problem with data your able to share too but I realize that’s not always possible

Hey @FixTestRepeat, thanks for your response.
I don’t think I explained my problem very well, my bad. The data granularity is daily, and to illustrate it, let’s imagine we’re talking about the daily precipitation records, by country, type of terrain (e.g city, mountain, countryside), precipitation type (e.g. rain, hail, snow, etc), and whether it was forecast or not (yes / no), for the past 10 years.

What I’m trying to extract in a single query is, for example, “the daily total precipitation in the world in mountainous terrain, per precipitation type, as well as the count of countries in each case”.

My confusion was that I didn’t see how to perform aggregations without re-windowing, which in my case is unnecessary as my source data is already at the right granularity… But as it tends to happen, I found the answer not too long after posting the question, i.e. that it is perfectly fine to invoke a reduction function without using aggregateWindow.

I was then able to improve my query to this:

sumCount = (tables=<-) => 
  tables
    |> reduce(
      identity: {sum: 0.0, count: 0},
      fn: (r, accumulator) => ({
        sum: if r._value > 0.0 then r._value + accumulator.sum else accumulator.sum,
        count: if r._value > 0.0 then accumulator.count + 1 else accumulator.count,
      })
    )
from(bucket:"weather")
  |> range(start:-10y)
  |> filter(fn:(r) => r._measurement == "precipitations" and r.terrain_type == "mountain")
  |> drop(columns: ["was_forecast"])
  |> group(columns: ["precipitation_type", "country", "_time"]
  |> sum()
  |> group(columns: ["precipitation_type", "_time"]
  |> sumCount()
  |> group(columns: ["precipitation_type"]

This performs better than using aggregateWindow, as I’m not re-windowing every time. Nothing ground-breaking here, but if someone also got confused with this, maybe this will help.

I have found that querying/filtering is quick, but the aggregations can become a bit slow when there is too much data (my data has a few more dimensions to it than the above example). If you have any tip on how to further improve the above, I’m interested! (I can raise a separate post for that)

Many thanks!

1 Like