Show percentile data per cpu-host

Still beginner in flux, need help with quantile function:
I want graph with only data (for simplicity I take “cpu-total”, but in real data I want to apply this to response duration) that in quantile (drop the data above quantile).

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

q = float(v:v.quantiles)

r0 = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

data
|> map(fn: (r) => ({ r with _value: r0._value }))
|> yield(name: “p” + v.percentiles + " line")

data
|> filter(fn: (r) => r[“_value”] < r0._value )
|> yield(name: “p” + v.percentiles + " data")

This work (not so sure about is default quantile method is good hear)
But I collect “cpu-total” not from 1 host, but from many (near 200).
How to show this percentile line & percentile data for each host?
(percentile line for me is just to check correctness of result in compare with raw data (for this purpose also want to have it per host)

Additionally want nicely agreggate this data in graph for 1d, 1m, 1y period.

Thanks to idias of
@codi640
come with such a solution (without percentile line for testing).

Please take a look,
Is this solution ok, expecially in application to long range like 1m or 1 year?

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
)

q = float(v:v.percentiles)

quantileValue_by_host = (tables=<-, host) => {
result = tables
|> group(columns: [“host”])
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> filter(fn: (r) => r.host == host)
|> drop(columns: [“_start”, “_stop”, “_field”, “cpu”, “_measurement”, “host”]) //this 2 lines just to return value, not stream (must be better solution for this)
|> findRecord(fn: (key) => true, idx: 0)

return result
}

data_in_quantile = data
|> filter(fn: (r) => r._value < quantileValue_by_host(tables: data, host: r[“host”])._value)
|> yield(name: “p” + v.percentiles + " data")

1 Like

@va2dim Thank you for sharing your solution with the community! We really appreciate it.

What is a possible options to optimize this solution for long term range (6month, 1year) selection?
This query example with cpu_total just die if I use range like 6-12h.

For me not clear where to look for optimization:
look on pushdown in filter - cant understand this from profile
I think problem in data amount - but not sure about downsample data before quantile function.

In real data I will collect duration of request execution for all users (near 5 000 000/ day)
routePattern - ~250 different routePatterns
requestCacheStatus = “cachedOnCF”| “nonCached”
value - duration in ms
routePattern,requestCacheStatus - tags

I want to have picture about anomal routePattern duration (something like kendle-stick chart in influxdb we don’t have, so for simplicity I trying to do it on graphs with quantile as variable).

q = float(v:v.percentiles)

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “user_timing”)
|> filter(fn: (r) => r[“_field”] == “value”)
// |> filter(fn: (r) => r[“requestCacheStatus”] == “cachedOnCF”) #cachedOnCF,nonCached

quantileValueByRoutePattern = (tables=<-, requestCacheStatus, routePattern) => {
result = tables
|> group(columns: [“requestCacheStatus”, “routePattern”])
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> filter(fn: (r) => r.requestCacheStatus == requestCacheStatus)
|> filter(fn: (r) => r.routePattern == routePattern)
|> findRecord(fn: (key) => true, idx: 0)

return result
}

dataInQuantile = data
// |> yield(name: “raw”)
|> filter(fn: (r) => r._value < quantileValueByRoutePattern(tables: data, requestCacheStatus: r[“requestCacheStatus”], routePattern: r[“routePattern”])._value)
|> yield(name: “p” + v.percentiles + " data")

It mightttttt be better to leave the function and result as a separate variable so you calculate the value just once. And then compare it in the filter function. I’m not sure that will be more performant but it might.

Can’t catch your idea, because of nesessity to transmit dynamic data: host (or requestCacheStatus,routePattern in 2nd example). Could you pls provide some example.

Trying to figure out smth thinking in the side of aggregateWindow before quantile calculation (but it is not really what I need, I want to quantile calculation on raw data).

And as flux have possibility to do quantile under aggregateWindow, may be exist some way to pack this solution in aggregateWindow? Could this direction give some benefit in performance?

try to use this dynamic aggregation with v.windowPeriod - same result - query just die
data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> aggregateWindow(every: v.windowPeriod, fn:max)

Try this agregation, but its look very bad

timestart = uint(v: v.timeRangeStart)
timestop = uint(v: v.timeRangeStop)
duration = uint(v: timestop - timestart) // Calculate duration for custom time range

windowPeriod =
if int(v:duration) >= int(v:1y) then 1mo
else if int(v:duration) >= int(v:1mo) then 1d
else if int(v:duration) >= int(v:7d) then 2h
else if int(v:duration) >= int(v:1d) then 1h
else if int(v:duration) >= int(v:12h) then 30m
else if int(v:duration) >= int(v:6h) then 15m
else if int(v:duration) >= int(v:3h) then 5m
else if int(v:duration) >= int(v:1h) then 1m
else 1s

|> aggregateWindow(every: windowPeriod, fn:max)

Hello @va2dim,
Good point. I didn’t read the flux closely enough. It looks good to me. Sorry for the confusion.

@va2dim I think this may be a bit more performant, but you’ll have to test it to see. It updates quantileValueByRoutePattern() to return a stream of tables that contains all of the quantiles per requestCacheStatus/routePattern combination. I then added a getQuantile() function that takes requestCacheStatus and routePattern as parameters and returns the associated quantile. This way, quantileValueByRoutePattern() only runs once to return the set of quantiles rather than for every single row like it does in your solution above. getQuantile() runs for every row, but it now references a “static” stream of tables.

q = float(v:v.percentiles)

data = from(bucket: v.bucket)
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "user_timing")
    |> filter(fn: (r) => r["_field"] == "value")
    // |> filter(fn: (r) => r["requestCacheStatus"] == "cachedOnCF") #cachedOnCF,nonCached

quantileValueByRoutePattern = () => {
    result =
        data
            |> group(columns: ["requestCacheStatus", "routePattern"])
            |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)

    return result
}

getQuantile = (requestCacheStatus, routePattern) => {
    outputQuantile = quantileValueByRoutePattern()
        |> findRecord(fn: (key) => key.requestCacheStatus == requestCacheStatus and key.routePattern == routePattern ==, idx: 0)
    
    return outputQuantile._value
}

dataInQuantile = data
    |> filter(fn: (r) => r._value < getQuantile(requestCacheStatus: r["requestCacheStatus"], routePattern: r["routePattern"]))
    |> yield(name: "p" + v.percentiles + " data")

Thanks for sharing Scott, testing on cpu-total metrics (this loading even less than I expect for my real metrics), code below and this solution give 2,5 times worst result even on small timeRange (0,5h, 1h, 1,5h, 2h) than previous :confused:. Is I adapt it in a wrong way?

q = float(v:v.percentiles)

 //TODO: Calculate duration for relative custom time range (-1h)
// timestart = uint(v: v.timeRangeStart)
// timestop = uint(v: v.timeRangeStop)
// duration = uint(v: timestop - timestart)
// 
// windowPeriod = 
//   if int(v:duration) >= int(v:1y) then 1mo 
//   else if int(v:duration) >= int(v:1mo) then 1d 
//   else if int(v:duration) >= int(v:7d) then 1h
//   else if int(v:duration) >= int(v:1d) then 30m
//   else if int(v:duration) >= int(v:12h) then 15m
//   else if int(v:duration) >= int(v:1h) then 1m 
//   else 1s

data = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> filter(fn: (r) => r["_field"] == "usage_system")
//   |> aggregateWindow(every: windowPeriod, fn:max)
//   |> aggregateWindow(every: v.windowPeriod, fn:max)

quantileValueByRoutePattern = () => {
    result = data
      |> group(columns: ["host"])
      |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)

    return result
}

getQuantile = (host) => {
    outputQuantile = quantileValueByRoutePattern()
       |> findRecord(fn: (key) => key.host == host, idx: 0)
    
    return outputQuantile._value
}

dataInQuantile = data
  |> filter(fn: (r) => r._value < getQuantile(host: r["host"]))
  |> yield(name: "p" + v.percentiles + " data")

check also quantile modes, and “exact_selector” give more then 2 times longer calculation.

aggregateWindow(every: v.windowPeriod, fn:max) on input data also not solve situation,

now look in a side to tune windowPeriod for aggregateWindow(every: windowPeriod, fn:max), but big period for aggregation is definitely not what I want to have :confused:

Trying to find bottle neck.
Execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I do same code in custom function quantileValue - 51s.
I really shocked. How can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way not to use customFunction to solve my task?
Now think in a way to extract all host & and do loop (maybe through map function, nothing else like loop operator cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

Trying to find bottle neck.
Limited data: execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I do same code in custom function quantileValue - 51s.
I really shocked. How can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way t not use customFunction to solve task?
Now think in a way to extract all host & and do loop with a help of some standard functions (maybe through map function, nothing else like loop operator I cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

Trying to find bottle neck.
I limit data: execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I execute code in custom function quantileValue - 51s.
I really shocked.

@scott @Anaisdg
Pls explain how can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way t not use customFunction to solve task?
Now think in a way to extract all host & and do loop (maybe through map function, nothing else like loop operator cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

@va2dim I’m curious if this will make a difference:

  • Define the data variable as a function
  • Define the quantileValue function as a static variable
q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = () =>
    from(bucket: bucket)
        |> range(start: timeRangeStart, stop: timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "cpu")
        |> filter(fn: (r) => r["_field"] == "usage_system")
        |> filter(fn: (r) => r["cpu"] == "cpu-total")
        |> filter(fn: (r) => r["host"] == "api")

quantileValue =
    (data()
        |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)
        |> findRecord(fn: (key) => key.cpu == "cpu-total", idx: 0))._value

data()
    |> filter(fn: (r) => r["_value"] < quantileValue)
    |> yield(name: "p.data")
1 Like

This work better, thanks @scott .
Could you please explain why it is so big diff in time of execution when using custom function (or give reference where I can catch this understanding).

But even this solution much slower when use only variables (difference increase with grow of windowPeriod duration - for 1mo duration when use only variables - execution 4s, for your last example - 40s. And I need to work with 1y duration).

Other moment, if we calculate quantileValue not for 1 host, but per host, it cannot be used as a static variable.

Other option that I looking, if performance still be slow - how to reduce amount of income data for quantile - smth like sample function, but I want to have n items widely spread across timeline (but no each n item) - not exist some standard function for this?

Come to this solution without custom function - hosts array iteration + union.
When data = () => work more or less fast - 9s for 1mo duration (but I need near 3-5s for 1y duration).
(But when store data as static variable - for 1host - 4s, 2 hosts - 8s, 24hosts - 85s for 1mo duration.
85s for query is too much, and I need comfortable work not with 1mo but with 1y duration. What the difference?).

Is this solution have some option for optimization?
Maybe exist some option to extract tagValues from already filtered data stream (data variable)?

import "influxdata/influxdb/schema"
import "experimental/array"

q = float(v:v.percentiles)
bucket = v.bucket
predicate = (r) => r["_measurement"] == "cpu" and r["_field"] == "usage_system" and r["cpu"] == "cpu-total"
tag = "host"

hosts = schema.tagValues(bucket: bucket, tag: tag, predicate: predicate, start: v.timeRangeStart, stop: v.timeRangeStop)
  |> findColumn(fn: (key) => true, column: "_value") 

data = () => from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: predicate)

getTagQuantileData = (tables=<-, tag) => tables 
    |> filter(fn: (r) => r["host"] == tag) 
    |> aggregateWindow(
            every: v.windowPeriod,
            //in this realisation (maybe when quantile under aggregateWindow) change of quantile method or compression doesnt influence performance much
            fn: (column, tables=<-) => tables |> quantile(q: q),
        )

quantileData = hosts |> array.map(fn: (x) => data() |> getTagQuantileData(tag: x))

union(tables: quantileData)

Hello @va2dim
I’m not sure why the execution time is so different when using a custom function.
Here are some resources around Flux performance:

Although I feel like you’ve probably arleady seen those.

To extract any values from a query you can use:

@scott @Anaisdg
Return to this problem, try one more time ask you community with much experience, how to optimize query:

This time on real example, maybe this will be more effective:
I store statistics of api call duration for 1Year, and I need to visualize this this data within this time range with possibility of simple search of anomality.
I choose to have at least 2 graphs (nonCahced & cachedOnCF). To each path (API method + route name) I need possibility to apply quantile (as simple search of anomality).
Also need to have possibility to have filter by path (if possible in same 2 graphs, if not - additional graph for 1 path).

Main problem - is that for all tags on big duration like 7d/30d it become very slow.
I tried to optimize through custom calculation windowPeriod, but still is to slow - 40s on query.

Maybe exist other option to optimize this solution or maybe exist better solution than I choose. If it so, please share.

For all cachedOnCF path

import "influxdata/influxdb/schema"
import "experimental/array"
import "date"

timestart = uint(v: date.time(t: v.timeRangeStart))
timestop = uint(v: date.time(t: v.timeRangeStop))
duration = uint(v: timestop - timestart)

windowPeriod = 
  if int(v:duration) >= int(v:1y) then 1mo 
  else if int(v:duration) >= int(v:30d) then 1d
  else if int(v:duration) >= int(v:7d) then 90m
  else if int(v:duration) >= int(v:2d) then 45m
  else if int(v:duration) >= int(v:24h) then 30m //30s
  else if int(v:duration) >= int(v:12h) then 15m //15s, 49
  else if int(v:duration) >= int(v:6h) then 6m //11s, 61
  else if int(v:duration) >= int(v:3h) then 3m //9s, 61
  else if int(v:duration) >= int(v:1h) then 1m //4s, 61
  else 1m 

q = float(v:v.percentiles)
env = "production"
createEmpty = true

bucket = v.bucket
measurement = "user_timing"
field = "value"
cacheStatus = "cachedOnCF"
tag = "path"
path = v.ut_path

predicate = (r) => 
  r["_measurement"] == measurement 
  and r["env"] == env
  and r["_field"] == field 
  and r["status"] == cacheStatus


routePatterns = schema.tagValues(bucket: bucket, tag: tag, predicate: predicate, start: v.timeRangeStart, stop: v.timeRangeStop)
  |> findColumn(fn: (key) => true, column: "_value") 

data = () => from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: predicate)

getTagQuantileData = (tables=<-, tag) => tables 
    |> filter(fn: (r) => r["path"] == tag) 
    |> aggregateWindow(
            column: "_value",
            every: windowPeriod,
            fn: (column, tables=<-) => tables |> quantile(q: q, column: column),
            createEmpty: createEmpty
        )

quantileData = routePatterns 
  |> array.map(fn: (x) => data() |> getTagQuantileData(tag: x))
union(tables:quantileData)

Variable v.ut_path conllect all unique tag value, but I need them for all period (even for 1 Year), and this also not very fast (maybe exist more fast way to collect it?)

import "influxdata/influxdb/schema"
import "experimental/array"

allPaths = schema.measurementTagValues(bucket: v.bucket, measurement: "user_timing", tag: "path")

allStream = array.from(rows: [{_value: "All"}])
pathsWithAll = union(tables: [allStream, allPaths])
pathsWithAll

We have possibility to have few query for same graph.
I wonder may be exist way to mount dynamically with smh like foreach tag unique value per query (if I have 1000 tag value path, then 1000 separate, but more simple queries),
possibility to process dynamically each tag value in separate query?

import "influxdata/influxdb/schema"
import "experimental/array"
import "date"

timestart = uint(v: date.time(t: v.timeRangeStart))
timestop = uint(v: date.time(t: v.timeRangeStop))
duration = uint(v: timestop - timestart)


windowPeriod = 
  if int(v:duration) >= int(v:1y) then 1mo 
  else if int(v:duration) >= int(v:30d) then 1d
  else if int(v:duration) >= int(v:7d) then 90m
  else if int(v:duration) >= int(v:2d) then 45m
  else if int(v:duration) >= int(v:24h) then 30m
  else if int(v:duration) >= int(v:12h) then 15m
  else if int(v:duration) >= int(v:6h) then 6m
  else if int(v:duration) >= int(v:3h) then 3m
  else if int(v:duration) >= int(v:1h) then 1m
  else 1m 

q = float(v:v.percentiles)
env = "production"
createEmpty = true

bucket = v.bucket
measurement = "user_timing"
field = "value"
cacheStatus = "nonCached"
tag = "path"
path = "PUT:/users/:sessionId/cart/"

predicate = (r) => 
  r["_measurement"] == measurement 
  and r["env"] == env
  and r["_field"] == field 
  and r["status"] == cacheStatus
  and r["path"] == path


data = () => from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: predicate)

getTagQuantileData = (tables=<-, tag) => tables 
    |> filter(fn: (r) => r["path"] == tag) 
    |> aggregateWindow(
            column: "_value",
            every: windowPeriod,
            fn: (column, tables=<-) => tables |> quantile(q: q, column: column),
            createEmpty: createEmpty
        )

getQuantileData = (tag) => {
    return data() |> getTagQuantileData(tag: tag)
}

result = getQuantileData(tag: path)
result |> yield()