Hi all,
In order to analyze anomality of api routes call,
I choose next solution of aggregateWindow with quantile to each dataset, split by the “path” tag value (or 1dataset filtered by selected “path” tag value) and do it for long time range (7d,14d,30d,1y) in realtime if possible.
Try to found a way to optimize query execution time.
Implement through array.map as a way that I consider more simple. But with long duration/timeRange 7d and more have very bad performance (first all CPU core 100% ocupied while query executed and after a while take more and more RAM for this query)
and customization like using of custom windowPeriod, sample method not fully help.
I have not very big cardinality - 261, separate bucket with retention 365d for this measurement.
Precision for write - s. Measurements are written in batches every 5s per user.
InfluxDB v2.7 on dedicated server with 8CPU (single thread), 16GB RAM.
What interesting that previously was 4CPU and upgrade server to 8CPU add nothing to this query execution for long timerange.
Data schema:
tags: env, status, source, path (cardinality for now - 261)
fields: value (duration in ms)
In order to optimize query it want to try get rid of array.map (as it is not pushdown function, but not very clear how really work pushdown functions in this query)
in application to each unique tag “path” value dataset aggregateWindow by quantile.
As routePatterns variable is a table which contain all unique values of tag “path” within selected in dashboard time range,
Is exist some option to apply aggregateWindow by quantile to each value of this table separately without array.map/map?
If exist other possibilities for optimization, please share.
// Variable v.ut_path definition
import "influxdata/influxdb/schema"
import "experimental/array"
measurement = "user_timing"
tag = "path"
env = "production"
cacheStatus = "nonCached"
filteredRoutePatterns = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == measurement and r["env"] == env and r["status"] == cacheStatus)
|> keep(columns: [tag])
|> distinct(column: tag)
allStream = array.from(rows: [{_value: "All"}])
filteredRoutePatternsWithAll = union(tables: [allStream, filteredRoutePatterns])
filteredRoutePatternsWithAll
// Dashboard query
import "experimental/array"
import "date"
timeRangeStart = 2024-09-06T10:40:00Z
timeRangeStop = 2024-09-13T10:40:00Z
timestart = uint(v: date.time(t: timeRangeStart))
timestop = uint(v: date.time(t: timeRangeStop))
duration = uint(v: timestop - timestart)
windowPeriod =
if int(v:duration) >= int(v:1y) then 1mo
else if int(v:duration) >= int(v:30d) then 1d
else if int(v:duration) >= int(v:14d) then 1d
else if int(v:duration) >= int(v:7d) then 1d
else if int(v:duration) >= int(v:2d) then 45m
else if int(v:duration) >= int(v:24h) then 30m
else if int(v:duration) >= int(v:12h) then 15m
else if int(v:duration) >= int(v:6h) then 6m
else if int(v:duration) >= int(v:3h) then 3m
else if int(v:duration) >= int(v:1h) then 1m
else 1m
q = 0.7
env = "production"
createEmpty = true
measurement = "user_timing"
cacheStatus = "nonCached"
predicate = if ut_path == "All" then
(r) => r["_measurement"] == measurement
and r["env"] == env
and r["status"] == cacheStatus
else
(r) => r["_measurement"] == measurement
and r["env"] == env
and r["status"] == cacheStatus
and r["path"] == ut_path
filteredDataInRange = () => from(bucket: v.bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: predicate)
//TODO: same as v.ut_path - if possible extract possible values from it (and not duplicate logic & execution)
routePatterns = filteredDataInRange()
|> keep(columns: ["path"]) // Keep only the tag column
|> group() // Group the data to eliminate duplicates
|> distinct(column: "path") // Get unique tag values
filteredRoutePatterns = routePatterns
|> findColumn(fn: (key) => true, column: "_value")
tagSampledData = (tag) => filteredDataInRange()
|> filter(fn: (r) => r["path"] == tag)
|> sample(n: 4) // reduce processing time by aggregate only 25% of data
tagQuantileData = (tables=<-, tag) => tables
|> aggregateWindow(
column: "_value",
every: windowPeriod,
fn: (column, tables=<-) => tables |> quantile(q: q, column: column),
createEmpty: createEmpty
)
|> yield(name: tag)
filteredRoutePatterns |> array.map(fn: (x) => tagSampledData(tag: x) |> tagQuantileData(tag: x))
this tread originated from Show percentile data per cpu-host, but make separate for this specific question