How to optimize query to reach acceptable run time?

Hi all,

we are currently evaluating from Prometheus to InfluxDB and are currently on v2.7.

To see, how well Influx works, I made up the task to see whether any of our compute nodes were experiencing high iowait during the past days[1]. As we currently already have Prometheus node exporter running on these nodes, we simply use telegraph to ingest these data.

At the moment, I’m using this flux

from(bucket: "nodes")
  |> range(start:-12h, stop: 0s)
  |> filter(fn: (r) => r._measurement == "node_cpu_seconds_total" and r.url =~ /^http:\/\/172\.23\./ and r.mode == "iowait")
  |> aggregateWindow(every: 5m, fn: mean)
  |> derivative()
  |> group(columns: ["url", "_time"])
  |> sum()
  |> group(columns: ["url"])
  |> filter(fn: (r) => r._value > 2)

This does almost what I want - except that I still need to figure out how to normalize/scale the values based on the number of cpu cores, as we have differently sized machines.

However, the problem is the run time is terrible, usually between 4 and 5 minutes on quite decent hardware.

We tries to address the most obvious bits (removing the regex in favor of strings.prefix which I need to learn how to use as it never returned) or using the experimental aggregate.rate which took far longer than this.

Anyone with an idea where I use influx wrongly?



[1] Subsequently, I would use these results to correlate with compute jobs run by various users to see which are too hard on the local machines, but that needs to wait until this one is somehow solved.

As I did not make much progress, I moved over to work on a created data set instead of live data to hopefully get a better feel what is a good and what is a less good approach.

With a small script, I’m creating three CSV files which I influx write into their own buckets. “Schema” is like

# hostinfo.csv
#datatype measurement,tag,double,dateTime:RFC3339

# hostdata.csv
#datatype measurement,tag,double,dateTime:RFC3339

# allinone.csv
#datatype measurement,tag,tag,double,dateTime:RFC3339

The goal for this exercise is to find hosts which have a high percent relative to the number of cores a system has. The first two CSV files separate this information into two buckets, one for direct/fast measurements (all 15s) and one with almost static host information (3600s). The third CSV file would inject the total number of cores into each measurement.

The script generates one day of data for about 100 hosts and one measurement per core this host has at that time. This results in CSV files with 27011292 lines (hostinfo 2351 lines) each, with a total size of about 1.2 GByte (hostinfo 100kByte) each.

Ingesting this into Influxdb takes some time, then I ran these fluxes

import "interpolate"

start = 2023-01-01T00:00:00Z
stop  = 2023-01-02T00:00:00Z
every = 5m

total = from(bucket: "test_hostinfo")
  |> range(start: start, stop: stop)
  |> drop(columns: ["_field", "_measurement", "_start", "_stop"])
  |> interpolate.linear(every: every)

data = from(bucket: "test_hostdata")
  |> range(start: start, stop: stop)
  //|> drop(columns: ["_field"])
  |> aggregateWindow(every: every, fn:mean)
  |> group(columns: ["_time", "host"])
  |> sum()
  |> group(columns: ["host"])

join(tables: {t1:data, t2:total}, on: ["_time", "host"])
  |> map(fn: (r) => ({r with _value: r._value_t1/r._value_t2}))
  |> drop(columns: ["_value_t1", "_value_t2"])

This runs for about 56s (wall clock without profiler, profiler rundown below[1]) and due to using interpolate creates some funky results when the underlying hostinfo changes. The other flux

start = 2023-01-01T00:00:00Z
stop  = 2023-01-02T00:00:00Z
every = 5m

from(bucket: "test_allinone")
  |> range(start: start, stop: stop)
  |> aggregateWindow(every: every, fn:mean)
  |> drop(columns: ["_field"])
  |> group(columns: ["_time", "host", "totalcpus"])
  |> sum()
  |> filter(fn: (r) => exists r._value)
  |> group(columns: ["host"])
  |> map(fn: (r) => ({r with _value: r._value / float(v: r.totalcpus)}))

does not have the the spurious interpolation issues, which is good, but is also slow!

It finishes after more than 80s (profiles output see [2]) and I do not know whether it is a good idea to push a lot of static tag data into each measurement as this will increase bucket size and probably also negatively influence query speed.

Thus bottom line question, what do I need to do, to scale this up to still have “snappy” query times, i.e. less than a minute, with 30 times as many hosts and a time frame of a couple of days. I.e. the underlying data relevant just for this type of query would increase by a factor of about 100. In reality, a lot of extra stuff would be in the bucket alongside this, e.g. cpu-user, cpu-idle, cpu-iowait, …

Is there a way or is this a futile effort?

Thanks a bunch in advance and sorry for the post’s lenght.

[1] Profiler 1 (using awk script to convert nanoseconds to full seconds)

             *influxdb.readFilterSource        0.080916
              *influxdb.readWindowAgg...        40.105304
              *universe.groupTransfor...        36.026369
              *execute.simpleAggregat...        18.514951
              *universe.groupTransfor...        0.110498
              *universe.schemaMutatio...        0.001165
              *interpolate.interpolat...        0.030931
              *universe.mergeJoinTran...        1.934977
              *universe.mergeJoinTran...        0.006108
              *universe.mapTransforma...        0.005281
              *universe.schemaMutatio...        0.000652

[2] Profiler 2:

     *influxdb.readWindowAggregateSource        1.712701
     *universe.schemaMutationTransfor...        0.104278
     *universe.groupTransformationAda...        51.089083
     *execute.simpleAggregateTransfor...        26.305873
          *universe.filterTransformation        0.866736
     *universe.groupTransformationAda...        0.093152
             *universe.mapTransformation        1.394154
          *universe.filterTransformation        0.282817

aand another small update, found formerly overlooked fill() function | Flux 0.x Documentation which fixes the interpolation generated issues but does not decrease run time much.

Hello @CArsten,
Thank you for your detailed question and information.
So your best course of action is to create tasks to periodically perform this logic for you and write the new transformed data to a bucket so you can simply query it without having to perform the transformation work in your query each time.
Essentially you want to adopt this logic

Alternatively it is to hold tight until 3.0 OSS is released which is blazing fast by comparison.

Hi @Anaisdg

thank you for your reply. A task would be possibly only int hose cases we knew beforehand what we would look for, so yeah, it may be possible for anything which we can anticipate but not really for all potential cases (as that would probably flood our systems with too many additional metrics). Sure, we could offload those to an independent influx and we will certainly discuss that. Thus, in the end the classical monitoring vs. observability trap.

OTOH, we will then await 3.0 OSS eagerly (any rough ETA for that?) and hopefully, it will come with some kind of automatic parallelization in storage. We were able to speed up our queries by a factor of 5-8 quite easily if we manually partitioned them by url or host, i.e. something we knew would not interfere with parallel running queries, and finally used union to get everything back into one table stream.

The major downside there was, that even with writing a function to do the heavy lifting, we still had to manually partition the data, e.g. by IP range in url.

If that was possible in aggregateWindow, i.e. specifying which column(s) are independent of each other and let flux/the backend then partition these columns and run the query in parallel, that would be extremely awesome!