View in #flux on Slack
@Randall_Theobald: Hi all! I need some help optimizing a flux query to fit into less memory. My data variable resolves to about 11.4 million points (and the whole point of doing this in InfluxDB was to be able to do it with full 15s resolution). I need to run various moving average windows and exact quantile calculations on it. Iām not concerned with how long the query runs, but right now I am running out of memory (16 GiB!). Iām wondering if maybe it is trying to do all the calculations in memory concurrently. If so, is there a way I can force it to just run them serially? Current flux script below.
import "math"
data = from(bucket: "primary")
|> range(start: time(v: v.timeRangeStart), stop: time(v: v.timeRangeStop))
|> filter(fn: (r) => r["_measurement"] == "container")
|> filter(fn: (r) => r["_field"] == "cpu_usage_millicores")
|> filter(fn: (r) => r["kube_cluster_name"] == v.cluster)
|> filter(fn: (r) => r["kube_namespace"] == v.namespace)
|> filter(fn: (r) => r["app"] == v.app)
|> filter(fn: (r) => r["kube_container_name"] == v.container)
|> aggregateWindow(every: 15s, fn: mean, createEmpty: false)
|> sort(columns: ["_time"])
avg =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> mean()
|> set(key: "function", value: "mean")
|> group(columns: ["function"])
|> map(fn: (r) => ({ r with _value: math.round(x: r._value) }))
min =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> min()
|> set(key: "function", value: "min")
|> group(columns: ["function"])
max =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> max()
|> set(key: "function", value: "max")
|> group(columns: ["function"])
quant = (q) =>
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> quantile(column: "_value", q: q, method: "exact_selector")
|> set(key: "function", value: "p${q*100.0}")
|> group(columns: ["function"])
mavg = (m) =>
data
|> movingAverage(n: m * 4) // dependent on 15s resolution
|> highestMax(n:1)
|> keep(columns: ["_value"]) // convert to single table with just values
|> set(key: "function", value: "mavg_${m}m")
|> group(columns: ["function"])
|> map(fn: (r) => ({ r with _value: math.round(x: r._value) }))
union(tables:[
avg,
min,
max,
quant(q:0.50),
quant(q:0.60),
quant(q:0.70),
quant(q:0.80),
quant(q:0.90),
quant(q:0.95),
quant(q:0.99),
mavg(m:60),
mavg(m:30),
mavg(m:10),
mavg(m:5),
mavg(m:2),
mavg(m:1),
])
|> toInt()
|> group()
|> sort()
I did try multiple yields instead of union and that just made it run out of memory even faster. Iām running InfluxDB in OpenShift with a memory limit of 16GiB. So the container is getting restarted. Do I need to limit the memory of the InfluxDB process somehow, and will that even allow it to complete the query? (maybe itās just caching prolifically?)
@Michael_Hall: Given the calculation intensity of what youāre doing, it makes sense to convert this into a downsampling task that runs every minute or so
that way youād only be calculating a handful of 15s intervals at a time, saving the transformed results into another bucket, and then querying the raw data out of that bucket in your dashboard
@Randall_Theobald: How can down-sampling work when I need full-resolution percentiles?
@Michael_Hall: Your query is already downsampling (aggregateWindow, mean, min, max, quantile, etc), youāre just doing it on very dashboard page load
and over the timespan of the dashboardās view
Iām suggesting you do that downsampling in smaller chucks (1 minute instead of the dashboardās view) and save the results into a separate bucket
since your calculated values from an hour ago arenāt going to change, thereās no need to keep recalculating them
@Randall_Theobald: Right, but a 90th percentile calculation over 4 weeks canāt be done from downsampled data. Unless Iām missing something.
@Michael_Hall: Your quant
function is working on the 15s downsampled data
, so you can save those into one bucket, called something like primary_15s
in one task, then have another task calculate your quant
s and mavg
s and save them, along with avg
, min
and max
into a third bucket called something like primary_final
, then you can query primary_final
in your dashboard
so itāll be a multi-step workflow with three buckets (one for raw data, one for 15s aggregates, one for final transformed data) and two tasks (one for 15s aggregation, one for final transformation)
but, that means you can have a lower retention period on your raw data bucket, a medium-length retention period on your 15s data, and a long (or unlimited) retention on your final transformed data
which wills ave you space as well
@Randall_Theobald: Any suggestions that donāt involve tasks like this? The time window is variable, so Iām trying to see if there is a way to get it to work without pre-calculating stats of time windows. Our only other option is to just use InfluxDB as a data store and perform all the calculations in a separate process, which would be a shame since Flux is perfect for most of what we want to do.
@Anais: Yes I agree with michael you could try splitting this work up into several tasks, writing each value to a new bucket and then querying for all of the new values in your dashboard. You might also try https://docs.influxdata.com/flux/v0.x/stdlib/experimental/mean/ in case it has an effect on performance.
experimental.mean() function | Flux 0.x Documentation
You could try querying for less data
@Randall_Theobald: Imagine this is not in a dashboard, and it is considered a bulk data calculation request. Is there a way I can get the server to handle it with a lower memory footprint? Is breaking it up into completely separate queries the only way?
@Anais: That Iām aware of
Iām currently working with your query to see if there are any optimizations I can make/think of
You can shave off the
|> group(columns: ["function"])
for every calculation
When I ran with yields I was also able to shave off a lot of time.
But ideally if youāre doing this on bulk data you want to use tasks to reduce the memory footprint
I use the flux profiler and look at the ExecuteDuration
import "math"
import "profiler"
option profiler.enabledProfilers = ["query", "operator"]
// 45373943 without groups
// 34330576 with yields instead of durations
data = from(bucket: "noaa")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "average_temperature")
|> filter(fn: (r) => r["_field"] == "degrees")
|> filter(fn: (r) => r["location"] == "coyote_creek")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
avg =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> mean()
|> set(key: "function", value: "mean")
// |> group(columns: ["function"])
|> yield(name: "avg")
min =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> min()
|> set(key: "function", value: "min")
// |> group(columns: ["function"])
|> yield(name: "min")
max =
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> max()
|> set(key: "function", value: "max")
|> group(columns: ["function"])
|> yield(name: "max")
quant = (q) =>
data
|> keep(columns: ["_value"]) // convert to single table with just values
|> quantile(column: "_value", q: q, method: "exact_selector")
|> set(key: "function", value: "p${q*100.0}")
// |> group(columns: ["function"])
mavg = (m) =>
data
|> movingAverage(n: m * 4) // dependent on 15s resolution
|> highestMax(n:1)
|> keep(columns: ["_value"]) // convert to single table with just values
|> set(key: "function", value: "mavg_${m}m")
// |> group(columns: ["function"])
|> map(fn: (r) => ({ r with _value: math.round(x: r._value) }))
quant(q:0.50)
|> yield(name: "50")
quant(q:0.60)
|> yield(name: "60")
quant(q:0.70)
|> yield(name: "70")
quant(q:0.80)
|> yield(name: "80")
quant(q:0.90)
|> yield(name: "90")
quant(q:0.95)
|> yield(name: "95")
quant(q:0.99)
|> yield(name: "99")
mavg(m:60)
|> yield(name: "mavg60")
mavg(m:30)
|> yield(name: "30")
mavg(m:10)
|> yield(name: "10")
mavg(m:5)
|> yield(name: "5")
mavg(m:2)
|> yield(name: "2")
mavg(m:1)
|> yield(name: "1")
If you have to perform this query over a large amount of data can you increase the every parameter in the aggregate window function to reduce the number of rows you have to work on?
@Randall_Theobald: I already tried using yields like thisāit was faster⦠faster to run out of memory 
I canāt reduce the aggregation window. The only reason we are doing this is to work on 15s windows over larger time intervals.
@Anais: What interval are you collecting data at?
@Randall_Theobald: 15 s
@Michael_Hall: so you have one reading each 15 seconds?
@Anais: Then why are you performing an aggregateWindow at 15s?
How long are you query for data for (from your range function)
@Randall_Theobald: āoneā reading, yes, but with a large cardinality
weād like to be able to do 30-60days. so far, we have a use case that fails after 2 weeks
(it has 64 time series reporting every 15 s)
@Anais: Well it sounds like you donāt need the aggregate window
@Michael_Hall: How much do you need to round
your data? Using map
so much for that is likely contributing a lot of overhead
@Anais: Iād also try pushing down any functions that change the shape of your data to end of your query. In other words, try to keep the following pattern from |> range |> meanā¦|> keep instead of from |> range |> keep |> mean
@Randall_Theobald: I donāt care about round until the end. I think the quantile and moving average functions are just too heavy in the influxDB process.
@Anais: Yah it looks like youāre talking about executing a lot over 172800-345600 points
@Nathaniel_Cook is Flux expected to be able to handle this type of workload/Flux script over 172800-345600 points?
@Randall_Theobald Why canāt you peform downsampling tasks?
Or split each calculation into a separate query and write the value to a new bucket and then query for all of the values you want from that bucket?
@Randall_Theobald: Is the sort(columns: [ā_timeā]) call unnecessary? Are results always returned sorted by time?
@Anais: itās not guaranteed but since youāre calculations donāt depend on that order being preserved 100% id say yes it is unnecessary
@Randall_Theobald: The profiler showed that that the quant and movingAverage functions were taking the longest time by far. But it doesnāt seem to show me what contributes to high memory usage.
@Michael_Hall: They are default sorted by time. Some transformations can break that, but none of the things you doing before that should
@Anais: oh nvm
@Randall_Theobald: My movingAverage calls depend on sorted time
@Anais: I think I was mistaken
@Randall_Theobald: I did some experiments and it is definitely pulling in multiple copies of all the data. I can do a single successful quantile function call over 31 days in 30 sec and around 4GB. I can add a moving average call to that and it takes around 68 sec and 6 GB or so. Performing additional quantile or moving average calls on the same data shouldnāt increase the memory utilization, but it does. Is there any way to get multiple of these functions to operate on the same data (without pulling duplicates into memory)?
The documentation implies that assigning it to a variable will accomplish that, but it doesnāt appear to work.
@Anais: Yah if youāre referencing the variable with the base data it shouldnāt be.
Maybe try rewriting the function so you pipe forward the table into the function instead of calling the variable?
like
// Function definition
multByX = (tables=<-, x) => tables
|> map(fn: (r) => ({r with _value: r._value * x}))
@Randall_Theobald: Is there a doc link that discusses that?
@Anais: Memory utilization? Or that syntax?
No the the former that Iām aware of yes to the latter https://docs.influxdata.com/influxdb/cloud/query-data/flux/custom-functions/
Create custom Flux functions | InfluxDB Cloud Documentation
@Scott might have some information on memory utilization with variables.
@Scott: You could try encapsulating your data as a function instead of a variable. I believe this will cache the return value of the function. Iām also wondering if, instead of multiple explicit yields, you just union all the data together:
import "math"
import "profiler"
option profiler.enabledProfilers = ["query", "operator"]
data = () =>
from(bucket: "noaa")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "average_temperature")
|> filter(fn: (r) => r["_field"] == "degrees")
|> filter(fn: (r) => r["location"] == "coyote_creek")
aggregateData = (fn, name, tables=<-) =>
tables
|> fn()
|> duplicate(column: "_stop", as: "_time")
|> set(key: "function", value: "${name}")
|> keep(columns: ["_value", "function"])
quant = (q, tables=<-) =>
tables
|> quantile(column: "_value", q: q, method: "exact_selector")
|> duplicate(column: "_stop", as: "_time")
|> set(key: "function", value: "p${q * 100.0}")
|> keep(columns: ["_value", "function"])
mavg = (m, tables=<-) =>
tables
|> movingAverage(n: m * 4)
|> highestMax(n: 1)
|> map(fn: (r) => ({_time: r._stop, _value: math.round(x: r._value), function: "mavg_${m}m"}))
union(
tables: [
data() |> aggregateData(fn: mean, name: "avg"),
data() |> aggregateData(fn: min, name: "min"),
data() |> aggregateData(fn: max, name: "max"),
data() |> quant(q: 0.5),
data() |> quant(q: 0.6),
data() |> quant(q: 0.7),
data() |> quant(q: 0.8),
data() |> quant(q: 0.9),
data() |> quant(q: 0.95),
data() |> quant(q: 0.99),
data() |> mavg(m: 60),
data() |> mavg(m: 30),
data() |> mavg(m: 10),
data() |> mavg(m: 5),
data() |> mavg(m: 2),
data() |> mavg(m: 1),
],
)
|> group(columns: ["function"])
@Randall_Theobald: Memory requirements have essentially been the same with all incantations that Iāve tried (variables vs functions, union vs yields, functions that refer to a variable vs pipe forward functions). Looks like Iām out of options?
@Scott: As the profiler data suggests, all the quantile()
and movingAverage()
calls are taking the longest. Because these operations canāt be pushed down to the InfluxDB storage engine, they have to pull all of the data returned from data()
into memory and operate on it there. All of these operations run in parallel, so itās likely having to load the full dataset into memory for each permutation of quant()
and mavg()
.
@Nathaniel_Cook: > All of these operations run in parallel, so itās likely having to load the full dataset into memory for each permutation of quant() and mavg().
This shouldnāt be the case but maybe we have a bug?
@Jonathan_Sternberg Iād be curious if you ahve any thoughts on how to reduce memory usage of the above query?
@Jonathan_Sternberg: Iām going to make a note to take a look at this tomorrow. Iād like to be able to run the above query on some test data. do you know the cardinality of the data youāre working with? I actually quite like the way youāve written the original function and I think it should work the best if all things are working correctly. for quantile, Iād also probably suggest using the tdigest implementation. youāre using exact selector so it will pull in everything.
moving average probably just needs some tuning to make it more efficient
the tuning is inside of the flux engine and itās not something you can help by rewriting the query
@Randall_Theobald: The cardinality of the data Iām pulling is 64 time series at 15-sec data point resolution for 30 days.
@Jonathan_Sternberg: so itās roughly 64 unique series (tag combinations) but a pretty decent range of data
so low cardinality high density is a good way of describing it?
@Randall_Theobald: for the matching time series, yes. The bucket itself has very high cardinality. This script only targets a small number of time series
@Jonathan_Sternberg: ok that should be the only thing that matters. if the bucket has high cardinality but you only select a measurement or some other condition thatās low cardinality, fluxās performance profile will be more about low cardinality.
btw is there a reason youāre using exact selector instead of tdigest for quantile?
@Randall_Theobald: I didnāt know enough about tdigest to trust it. The initial test I did on it wasnāt where I wanted to be with accuracy
but that was with shorter time ranges. with these higher ranges that I canāt complete yet, it may be close enough. Do you have references that speak to its accuracy?
@Jonathan_Sternberg: itāll be really difficult to use exact selector with quantile given your current data. itās mostly because thereās a lot of data that has to be in memory.
I havenāt read this so canāt vouch for its veracity, but maybe this article? https://medium.com/@mani./t-digest-an-interesting-datastructure-to-estimate-quantiles-accurately-b99a50eaf4f7
Medium: T-Digest: An interesting datastructure to estimate quantiles accurately.
@Randall_Theobald: 11.4 million integers doesnāt take much memory. so that speaks more to the size of points in influxdb
Iām currently working on a fall-back script to just extract the full data from Influx and process it. I donāt expect to have any memory issue with this amount of data
@Jonathan_Sternberg: Iāll take a look. we donāt really look at the exact selector for quantile so there might just be some low hanging fruit.
you might want to also consider exact_mean
. again, I havenāt spent much time in this code for awhile, but itās worth trying the different methods and seeing how they affect time spent.
@Randall_Theobald: extracting just the values into an array, sorting it, and then choosing the right indexes for various percentiles doesnāt take much until you get much higher in size
the profile showed moving average was even longer than the quant function call.
@Jonathan_Sternberg: I mean 11.4 millions integers is roughly 80 MB of data to sort. itās not going to cause an OOM, but allocating 80 MB of contiguous memory isnāt the easiest.
@Randall_Theobald: I gave the process 16GiB to work with. It should be easy
@Jonathan_Sternberg: yea moving average just needs some love. thereās transformations that were written at different times. ones that were written earliest generally are the most memory intensive b/c they were just written in a way to get them working with small amounts of data and donāt really consider performance. the most frequent functions were rewritten to be more optimized.
moving average is in the first group. written once to just work and never rewritten to be more performant. I apologize for that and I acknowledge that itās not good. thereās a lot of functions in flux though so itās difficult to get to all of them to rewrite.
itās helpful to have these conversations to help guide us about where the most urgent need is for this optimization work
for example, if a function isnāt used very frequently or is only ever used when the data has already been heavily downsampled, itās not really worth us spending the time to optimize that function. but if something is used very frequently and is causing pain to the end user (such as yourself), it really helps us to learn about it so we can prioritize fixing that specific function.
I may have missed it. do you have a link to the profile youāre referring to?
Iāll also be back in case I donāt respond for a bit. I need to get lunch.
@Randall_Theobald: No, I didnāt save the profile, sorry about that. Also, some background/context. We are a Datadog customer, and are only experimenting with InfluxDB in this way because Datadogās query language is very inflexible and forces a single time aggregation before a single space aggregation. So our only draw to InfluxDB is for the flexibility to do these fine-grained calculations over long time periods.
Just FYI, I was able to do this for 2mo (almost 19 million points) pulling all the raw data to my VM and processing in python in about 20min with <3GiB memory to the process. I had to get around a weird limitation in the python client, though, by chunking the calls to one week at a time. I posted that problem in the āgeneralā channel.