Query optimization. Possible to get rid of array.map?

Hi all,

In order to analyze anomality of api routes call,
I choose next solution of aggregateWindow with quantile to each dataset, split by the “path” tag value (or 1dataset filtered by selected “path” tag value) and do it for long time range (7d,14d,30d,1y) in realtime if possible.

Try to found a way to optimize query execution time.
Implement through array.map as a way that I consider more simple. But with long duration/timeRange 7d and more have very bad performance (first all CPU core 100% ocupied while query executed and after a while take more and more RAM for this query)
and customization like using of custom windowPeriod, sample method not fully help.
I have not very big cardinality - 261, separate bucket with retention 365d for this measurement.
Precision for write - s. Measurements are written in batches every 5s per user.
InfluxDB v2.7 on dedicated server with 8CPU (single thread), 16GB RAM.
What interesting that previously was 4CPU and upgrade server to 8CPU add nothing to this query execution for long timerange.

Data schema:
tags: env, status, source, path (cardinality for now - 261)
fields: value (duration in ms)

In order to optimize query it want to try get rid of array.map (as it is not pushdown function, but not very clear how really work pushdown functions in this query)
in application to each unique tag “path” value dataset aggregateWindow by quantile.
As routePatterns variable is a table which contain all unique values of tag “path” within selected in dashboard time range,
Is exist some option to apply aggregateWindow by quantile to each value of this table separately without array.map/map?

If exist other possibilities for optimization, please share.

// Variable v.ut_path definition

import "influxdata/influxdb/schema"
import "experimental/array"

measurement = "user_timing"
tag = "path"
env = "production"
cacheStatus = "nonCached"

filteredRoutePatterns = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == measurement and r["env"] == env and r["status"] == cacheStatus)
  |> keep(columns: [tag])
  |> distinct(column: tag)

allStream = array.from(rows: [{_value: "All"}])
filteredRoutePatternsWithAll = union(tables: [allStream, filteredRoutePatterns])
filteredRoutePatternsWithAll

// Dashboard query

import "experimental/array"
import "date"

timeRangeStart = 2024-09-06T10:40:00Z
timeRangeStop = 2024-09-13T10:40:00Z
timestart = uint(v: date.time(t: timeRangeStart))
timestop = uint(v: date.time(t: timeRangeStop))
duration = uint(v: timestop - timestart)

windowPeriod = 
  if int(v:duration) >= int(v:1y) then 1mo 
  else if int(v:duration) >= int(v:30d) then 1d
  else if int(v:duration) >= int(v:14d) then 1d
  else if int(v:duration) >= int(v:7d) then 1d
  else if int(v:duration) >= int(v:2d) then 45m
  else if int(v:duration) >= int(v:24h) then 30m
  else if int(v:duration) >= int(v:12h) then 15m
  else if int(v:duration) >= int(v:6h) then 6m
  else if int(v:duration) >= int(v:3h) then 3m
  else if int(v:duration) >= int(v:1h) then 1m
  else 1m 

q = 0.7
env = "production"
createEmpty = true
measurement = "user_timing"
cacheStatus = "nonCached"

predicate = if ut_path == "All" then
    (r) => r["_measurement"] == measurement
      and r["env"] == env
      and r["status"] == cacheStatus
    else
    (r) => r["_measurement"] == measurement
      and r["env"] == env
      and r["status"] == cacheStatus
      and r["path"] == ut_path

filteredDataInRange = () => from(bucket: v.bucket)
  |> range(start: timeRangeStart, stop: timeRangeStop)
  |> filter(fn: predicate)

//TODO: same as v.ut_path - if possible extract possible values from it (and not duplicate logic & execution)
routePatterns = filteredDataInRange()
  |> keep(columns: ["path"]) // Keep only the tag column
  |> group() // Group the data to eliminate duplicates
  |> distinct(column: "path") // Get unique tag values


filteredRoutePatterns = routePatterns
  |> findColumn(fn: (key) => true, column: "_value")


tagSampledData = (tag) => filteredDataInRange()
  |> filter(fn: (r) => r["path"] == tag) 
  |> sample(n: 4) // reduce processing time by aggregate only 25% of data


tagQuantileData = (tables=<-, tag) => tables 
    |> aggregateWindow(
        column: "_value",
        every: windowPeriod,
        fn: (column, tables=<-) => tables |> quantile(q: q, column: column),
        createEmpty: createEmpty
      )
    |> yield(name: tag)

filteredRoutePatterns |> array.map(fn: (x) => tagSampledData(tag: x) |> tagQuantileData(tag: x))

this tread originated from Show percentile data per cpu-host, but make separate for this specific question

Hello @va2dim,
Is there any chance you can share some example input data and expected output?
I don’t think there’s any other way to work with arrays than array.map.

So to summarize you are:

  1. querying for some data and finding the distinct tag values filteredRoutePatterns
tag   _value 
t1	t1 

tag   _value 
t2	t2 
  1. unioning that output with:
_value 
ALL

to get:

tag   _value 
t1	t1 
null   ALL 

tag   _value 
t2    t2
null   ALL 
  1. not sure what ut_path is but youre filtering for predicate conditionally in filteredDataInRange
    Which is where you’ve lost me a little bit

  2. then you’re back to filteredRoutePatterns

tag   _value 
t1	t1 

tag   _value 
t2	t2 

and extracting the _value column as an array.

[t1, t2 , ALL]
  1. okay then you’re creating a function to pass that conditional filtering in and reduce the data set size for. tagSampledData

  2. another function tagQuantileData is the aggregate window

  3. okay were takin our array
    [t1, t2, ALL]
    and filtering based off that with the predicate and then windowing.

Hmmm. There miight? be a way to do this with special grouping and just maps? I’m not sure. But I dont know if it would be any more performant.
@scott anything jump out to you?

input data example

_start _stop _time _value _field _measurement env path source status
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:18Z 176 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:28Z 273 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:39Z 268 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:41Z 397 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:47Z 163 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:20:31.858658607Z 2024-09-18T08:20:31.858658607Z 2024-09-18T07:21:54Z 194 value user_timing production GET:/visitors/:idUser/favorites/ web nonCached
2024-09-18T07:22:52.799861225Z 2024-09-18T08:22:52.799861225Z 2024-09-18T07:23:16Z 930 value user_timing production POST:/discountCard/ web nonCached
2024-09-18T07:22:52.799861225Z 2024-09-18T08:22:52.799861225Z 2024-09-18T07:24:10Z 1225 value user_timing production POST:/discountCard/ web nonCached
2024-09-18T07:22:52.799861225Z 2024-09-18T08:22:52.799861225Z 2024-09-18T07:28:44Z 900 value user_timing production POST:/discountCard/ web nonCached
2024-09-18T07:22:52.799861225Z 2024-09-18T08:22:52.799861225Z 2024-09-18T07:29:28Z 963 value user_timing production POST:/discountCard/ web nonCached

As I say i was interested in
aggregateWindow with quantile to each dataset, split by the “path” tag value (split stream to tables by tag, and apply aggregateWindow with quantile to each tables by tag separately)
for long timeRange (14d, 30d,…).
Tried next solution that didnt use array.map, but group. But have near same bad performance as with array.map example.
I didnt see possibility to calculate quantile in realtime for long timeRange with more or less normal performance.
If I didnt use all option in optimization this logic, please share your ideas.
As I didnt see possibility with this solution, start look in side of downsampling (calculation quantiles for input data) - have some questions but better ask about it separately.

import “experimental/array”
import “date”

timeRangeStart = 2024-09-06T10:40:00Z
timeRangeStop = 2024-09-13T10:40:00Z
timestart = uint(v: date.time(t: timeRangeStart))
timestop = uint(v: date.time(t: timeRangeStop))
duration = uint(v: timestop - timestart)

windowPeriod =
if int(v:duration) >= int(v:1y) then 1mo
else if int(v:duration) >= int(v:30d) then 1d
else if int(v:duration) >= int(v:14d) then 1d
else if int(v:duration) >= int(v:7d) then 1d
else if int(v:duration) >= int(v:2d) then 45m
else if int(v:duration) >= int(v:24h) then 30m
else if int(v:duration) >= int(v:12h) then 15m
else if int(v:duration) >= int(v:6h) then 6m
else if int(v:duration) >= int(v:3h) then 3m
else if int(v:duration) >= int(v:1h) then 1m
else 1m

q = 0.7
env = “production”
createEmpty = true
measurement = “user_timing”
cacheStatus = “nonCached”

predicate = if ut_path == “All” then
(r) => r[“_measurement”] == measurement
and r[“env”] == env
and r[“status”] == cacheStatus
else
(r) => r[“_measurement”] == measurement
and r[“env”] == env
and r[“status”] == cacheStatus
and r[“path”] == ut_path

from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: predicate)
|> group(columns: [“path”])
//|> sample(n: 4)
|> aggregateWindow(
column: “_value”,
every: windowPeriod,
fn: (column, tables=<-) => tables |> quantile(q: q, column: column),
createEmpty: createEmpty
)
|> yield()