Show percentile data per cpu-host

Still beginner in flux, need help with quantile function:
I want graph with only data (for simplicity I take “cpu-total”, but in real data I want to apply this to response duration) that in quantile (drop the data above quantile).

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

q = float(v:v.quantiles)

r0 = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

data
|> map(fn: (r) => ({ r with _value: r0._value }))
|> yield(name: “p” + v.percentiles + " line")

data
|> filter(fn: (r) => r[“_value”] < r0._value )
|> yield(name: “p” + v.percentiles + " data")

This work (not so sure about is default quantile method is good hear)
But I collect “cpu-total” not from 1 host, but from many (near 200).
How to show this percentile line & percentile data for each host?
(percentile line for me is just to check correctness of result in compare with raw data (for this purpose also want to have it per host)

Additionally want nicely agreggate this data in graph for 1d, 1m, 1y period.

Thanks to idias of
@codi640
come with such a solution (without percentile line for testing).

Please take a look,
Is this solution ok, expecially in application to long range like 1m or 1 year?

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
)

q = float(v:v.percentiles)

quantileValue_by_host = (tables=<-, host) => {
result = tables
|> group(columns: [“host”])
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> filter(fn: (r) => r.host == host)
|> drop(columns: [“_start”, “_stop”, “_field”, “cpu”, “_measurement”, “host”]) //this 2 lines just to return value, not stream (must be better solution for this)
|> findRecord(fn: (key) => true, idx: 0)

return result
}

data_in_quantile = data
|> filter(fn: (r) => r._value < quantileValue_by_host(tables: data, host: r[“host”])._value)
|> yield(name: “p” + v.percentiles + " data")

1 Like

@va2dim Thank you for sharing your solution with the community! We really appreciate it.

What is a possible options to optimize this solution for long term range (6month, 1year) selection?
This query example with cpu_total just die if I use range like 6-12h.

For me not clear where to look for optimization:
look on pushdown in filter - cant understand this from profile
I think problem in data amount - but not sure about downsample data before quantile function.

In real data I will collect duration of request execution for all users (near 5 000 000/ day)
routePattern - ~250 different routePatterns
requestCacheStatus = “cachedOnCF”| “nonCached”
value - duration in ms
routePattern,requestCacheStatus - tags

I want to have picture about anomal routePattern duration (something like kendle-stick chart in influxdb we don’t have, so for simplicity I trying to do it on graphs with quantile as variable).

q = float(v:v.percentiles)

data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “user_timing”)
|> filter(fn: (r) => r[“_field”] == “value”)
// |> filter(fn: (r) => r[“requestCacheStatus”] == “cachedOnCF”) #cachedOnCF,nonCached

quantileValueByRoutePattern = (tables=<-, requestCacheStatus, routePattern) => {
result = tables
|> group(columns: [“requestCacheStatus”, “routePattern”])
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> filter(fn: (r) => r.requestCacheStatus == requestCacheStatus)
|> filter(fn: (r) => r.routePattern == routePattern)
|> findRecord(fn: (key) => true, idx: 0)

return result
}

dataInQuantile = data
// |> yield(name: “raw”)
|> filter(fn: (r) => r._value < quantileValueByRoutePattern(tables: data, requestCacheStatus: r[“requestCacheStatus”], routePattern: r[“routePattern”])._value)
|> yield(name: “p” + v.percentiles + " data")

It mightttttt be better to leave the function and result as a separate variable so you calculate the value just once. And then compare it in the filter function. I’m not sure that will be more performant but it might.

Can’t catch your idea, because of nesessity to transmit dynamic data: host (or requestCacheStatus,routePattern in 2nd example). Could you pls provide some example.

Trying to figure out smth thinking in the side of aggregateWindow before quantile calculation (but it is not really what I need, I want to quantile calculation on raw data).

And as flux have possibility to do quantile under aggregateWindow, may be exist some way to pack this solution in aggregateWindow? Could this direction give some benefit in performance?

try to use this dynamic aggregation with v.windowPeriod - same result - query just die
data = from(bucket: v.bucket)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> aggregateWindow(every: v.windowPeriod, fn:max)

Try this agregation, but its look very bad

timestart = uint(v: v.timeRangeStart)
timestop = uint(v: v.timeRangeStop)
duration = uint(v: timestop - timestart) // Calculate duration for custom time range

windowPeriod =
if int(v:duration) >= int(v:1y) then 1mo
else if int(v:duration) >= int(v:1mo) then 1d
else if int(v:duration) >= int(v:7d) then 2h
else if int(v:duration) >= int(v:1d) then 1h
else if int(v:duration) >= int(v:12h) then 30m
else if int(v:duration) >= int(v:6h) then 15m
else if int(v:duration) >= int(v:3h) then 5m
else if int(v:duration) >= int(v:1h) then 1m
else 1s

|> aggregateWindow(every: windowPeriod, fn:max)

Hello @va2dim,
Good point. I didn’t read the flux closely enough. It looks good to me. Sorry for the confusion.

@va2dim I think this may be a bit more performant, but you’ll have to test it to see. It updates quantileValueByRoutePattern() to return a stream of tables that contains all of the quantiles per requestCacheStatus/routePattern combination. I then added a getQuantile() function that takes requestCacheStatus and routePattern as parameters and returns the associated quantile. This way, quantileValueByRoutePattern() only runs once to return the set of quantiles rather than for every single row like it does in your solution above. getQuantile() runs for every row, but it now references a “static” stream of tables.

q = float(v:v.percentiles)

data = from(bucket: v.bucket)
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "user_timing")
    |> filter(fn: (r) => r["_field"] == "value")
    // |> filter(fn: (r) => r["requestCacheStatus"] == "cachedOnCF") #cachedOnCF,nonCached

quantileValueByRoutePattern = () => {
    result =
        data
            |> group(columns: ["requestCacheStatus", "routePattern"])
            |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)

    return result
}

getQuantile = (requestCacheStatus, routePattern) => {
    outputQuantile = quantileValueByRoutePattern()
        |> findRecord(fn: (key) => key.requestCacheStatus == requestCacheStatus and key.routePattern == routePattern ==, idx: 0)
    
    return outputQuantile._value
}

dataInQuantile = data
    |> filter(fn: (r) => r._value < getQuantile(requestCacheStatus: r["requestCacheStatus"], routePattern: r["routePattern"]))
    |> yield(name: "p" + v.percentiles + " data")

Thanks for sharing Scott, testing on cpu-total metrics (this loading even less than I expect for my real metrics), code below and this solution give 2,5 times worst result even on small timeRange (0,5h, 1h, 1,5h, 2h) than previous :confused:. Is I adapt it in a wrong way?

q = float(v:v.percentiles)

 //TODO: Calculate duration for relative custom time range (-1h)
// timestart = uint(v: v.timeRangeStart)
// timestop = uint(v: v.timeRangeStop)
// duration = uint(v: timestop - timestart)
// 
// windowPeriod = 
//   if int(v:duration) >= int(v:1y) then 1mo 
//   else if int(v:duration) >= int(v:1mo) then 1d 
//   else if int(v:duration) >= int(v:7d) then 1h
//   else if int(v:duration) >= int(v:1d) then 30m
//   else if int(v:duration) >= int(v:12h) then 15m
//   else if int(v:duration) >= int(v:1h) then 1m 
//   else 1s

data = from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> filter(fn: (r) => r["_field"] == "usage_system")
//   |> aggregateWindow(every: windowPeriod, fn:max)
//   |> aggregateWindow(every: v.windowPeriod, fn:max)

quantileValueByRoutePattern = () => {
    result = data
      |> group(columns: ["host"])
      |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)

    return result
}

getQuantile = (host) => {
    outputQuantile = quantileValueByRoutePattern()
       |> findRecord(fn: (key) => key.host == host, idx: 0)
    
    return outputQuantile._value
}

dataInQuantile = data
  |> filter(fn: (r) => r._value < getQuantile(host: r["host"]))
  |> yield(name: "p" + v.percentiles + " data")

check also quantile modes, and “exact_selector” give more then 2 times longer calculation.

aggregateWindow(every: v.windowPeriod, fn:max) on input data also not solve situation,

now look in a side to tune windowPeriod for aggregateWindow(every: windowPeriod, fn:max), but big period for aggregation is definitely not what I want to have :confused:

Trying to find bottle neck.
Execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I do same code in custom function quantileValue - 51s.
I really shocked. How can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way not to use customFunction to solve my task?
Now think in a way to extract all host & and do loop (maybe through map function, nothing else like loop operator cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

Trying to find bottle neck.
Limited data: execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I do same code in custom function quantileValue - 51s.
I really shocked. How can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way t not use customFunction to solve task?
Now think in a way to extract all host & and do loop with a help of some standard functions (maybe through map function, nothing else like loop operator I cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

Trying to find bottle neck.
I limit data: execute query not per each host, but for 1 host only. If in query only variable (commented var) time of execution - 0.25s, but when I execute code in custom function quantileValue - 51s.
I really shocked.

@scott @Anaisdg
Pls explain how can be such a difference? Custom function just cant be used or I am used in wrong way (maybe not allowed to use global variable - like data)?

May be exist some way t not use customFunction to solve task?
Now think in a way to extract all host & and do loop (maybe through map function, nothing else like loop operator cant find).

q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = from(bucket: bucket)
|> range(start: timeRangeStart, stop: timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “cpu”)
|> filter(fn: (r) => r[“_field”] == “usage_system”)
|> filter(fn: (r) => r[“cpu”] == “cpu-total”)
|> filter(fn: (r) => r[“host”] == “api”)

quantileValue = () => {
result = data
|> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
|> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

return result._value
}

data
|> filter(fn: (r) => r[“_value”] < quantileValue())
|> yield(name: “p.data”)

// var = data
// |> quantile(column: “_value”, q: q, method: “estimate_tdigest”, compression: 1000.0)
// |> findRecord(fn: (key) => key.cpu == “cpu-total”, idx: 0)

// data
// |> filter(fn: (r) => r[“_value”] < var._value)
// |> yield(name: “p.data”)

@va2dim I’m curious if this will make a difference:

  • Define the data variable as a function
  • Define the quantileValue function as a static variable
q = 0.7
timeRangeStart = 2024-03-27T23:00:00Z
timeRangeStop = 2024-03-28T23:00:00Z

data = () =>
    from(bucket: bucket)
        |> range(start: timeRangeStart, stop: timeRangeStop)
        |> filter(fn: (r) => r["_measurement"] == "cpu")
        |> filter(fn: (r) => r["_field"] == "usage_system")
        |> filter(fn: (r) => r["cpu"] == "cpu-total")
        |> filter(fn: (r) => r["host"] == "api")

quantileValue =
    (data()
        |> quantile(column: "_value", q: q, method: "estimate_tdigest", compression: 1000.0)
        |> findRecord(fn: (key) => key.cpu == "cpu-total", idx: 0))._value

data()
    |> filter(fn: (r) => r["_value"] < quantileValue)
    |> yield(name: "p.data")
1 Like

This work better, thanks @scott .
Could you please explain why it is so big diff in time of execution when using custom function (or give reference where I can catch this understanding).

But even this solution much slower when use only variables (difference increase with grow of windowPeriod duration - for 1mo duration when use only variables - execution 4s, for your last example - 40s. And I need to work with 1y duration).

Other moment, if we calculate quantileValue not for 1 host, but per host, it cannot be used as a static variable.

Other option that I looking, if performance still be slow - how to reduce amount of income data for quantile - smth like sample function, but I want to have n items widely spread across timeline (but no each n item) - not exist some standard function for this?

Come to this solution without custom function - hosts array iteration + union.
When data = () => work more or less fast - 9s for 1mo duration (but I need near 3-5s for 1y duration).
(But when store data as static variable - for 1host - 4s, 2 hosts - 8s, 24hosts - 85s for 1mo duration.
85s for query is too much, and I need comfortable work not with 1mo but with 1y duration. What the difference?).

Is this solution have some option for optimization?
Maybe exist some option to extract tagValues from already filtered data stream (data variable)?

import "influxdata/influxdb/schema"
import "experimental/array"

q = float(v:v.percentiles)
bucket = v.bucket
predicate = (r) => r["_measurement"] == "cpu" and r["_field"] == "usage_system" and r["cpu"] == "cpu-total"
tag = "host"

hosts = schema.tagValues(bucket: bucket, tag: tag, predicate: predicate, start: v.timeRangeStart, stop: v.timeRangeStop)
  |> findColumn(fn: (key) => true, column: "_value") 

data = () => from(bucket: v.bucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: predicate)

getTagQuantileData = (tables=<-, tag) => tables 
    |> filter(fn: (r) => r["host"] == tag) 
    |> aggregateWindow(
            every: v.windowPeriod,
            //in this realisation (maybe when quantile under aggregateWindow) change of quantile method or compression doesnt influence performance much
            fn: (column, tables=<-) => tables |> quantile(q: q),
        )

quantileData = hosts |> array.map(fn: (x) => data() |> getTagQuantileData(tag: x))

union(tables: quantileData)

Hello @va2dim
I’m not sure why the execution time is so different when using a custom function.
Here are some resources around Flux performance:

Although I feel like you’ve probably arleady seen those.

To extract any values from a query you can use: