Aggregate by count?

Hi,

I do not find a way to aggregate data in chunks of equal sizes, you can only do it by time. Is there an easy way to do it, other than ready the whole time-serie in python and splitting it (taking forever !).

For example the serie: [x0, x1, …, xn-1] , for chunks of size 10, I would like to get [x0 … x9] [x10 … x19] … [xn-10 … xn-1].

Maybe it does not make sense on influxdb, because that would mean indexing by count and not by time.

Hello @Nicolas_Carrara,
Welcome!!
You could create a dummy table with a time index and then group based off of that time index.
I would expect this to work but it’s not. I’m looking into it but placing these here in the meantime:

data
  |> map(fn: (r) => ({ r with _time_index: 10000000000 }))
  |> cumulativeSum(columns: ["_time_index"])
  |> map(fn: (r) => ({ r with _time_index: time(v: r._time_index) }))
  |> window(every: 100s, timeColumn: "_time_index")
  |> yield(name: "grouped by count")
1 Like

@Nicolas_Carrara
This will do it:

import "experimental"
pointsPerGroup = 10

data = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "average_temperature")
  |> filter(fn: (r) => r["_field"] == "degrees")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)


data
  |> map(fn: (r) => ({ r with i: 1 }))
  |> cumulativeSum(columns: ["i"]) 
  |> map(fn: (r) => ({ r with i: r.i / pointsPerGroup }))
  |> experimental.group(mode: "extend", columns: ["i"])
1 Like

Thanks for the help!

The solution is working, but the problem is my navigator crashes when I ask to process a too large time interval. When querying from python, the socket ends up timing out.

When doing the same with aggregate by time, it works like a charm.

Maybe the problem is that “cumulativeSum” is a sequential operation and cannot be processed in parallel?

I don’t see the big deal of aggregating by count if we assume the serie is stored as an array internally but maybe it is not.

Note that I must aggregate 100 millions datapoints at once.

@Nicolas_Carrara,
yes most likely and also that from |> range |> filter |> aggWindow is a pushdown pattern.
Flux is able to query data efficiently because some functions push down the data transformation workload to storage rather than performing the transformations in memory. Combinations of functions that do this work are called pushdown patterns.

Do you have to perform an aggregation of that scale continuously? Or just once and then you can do it incrementally?/on less points?

Thanks

@Anaisdg Thanks for your reply.

Ideally, I would like to call it at will, with calculations on the spot, because each time the parameters can change. In less than 1s for 100 millions datapoints.

As a workaround, I do the split with pre-defined parameters (like the size of the interval), save it on influx in a new measurement (where the tag are parameters), and then I can query the intervals instantly. It is not perfect but it does the job for now. I know it wont scale tho.

1 Like

@Nicolas_Carrara,
Out of curiosity, what’s your use case? You’ve got me intrigued.

@Anaisdg I am using high-frequency market data for machine learning x)