Poor performance for join(): cpu and memory grows quadratically with row count

I’m using InfluxDB 2.1.1 OSS.
The problem I want to solve is

  • data is organized by a tag host and has a field offabs.
  • one node is declared as reference
  • for all other nodes the difference of their offabs to the value of the reference node is calculated

A clear case of two queries, first the reference node, second all other nodes, join() them by _time, and calculate the difference with map().
Should be straight forward Flux.
The query time was much higher than expected.
So I used the python influxdb_client and looked into profile data, with puzzling results.

The investigated query was

v = {timeRangeStart: -14d, timeRangeStop: -0d, windowPeriod: 10m}
offmst = from(bucket: "dca")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "TfcMonitor")
  |> filter(fn: (r) => r["host"] == "cbmin00y")
  |> filter(fn: (r) => r["_field"] == "offabs")
  |> drop(columns: ["_measurement","_field","_start","_stop","host","oid"])
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> rename(columns: {_value: "off_mst"})
offend = from(bucket: "dca")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "TfcMonitor")
  |> filter(fn: (r) => r["host"] != "cbmin00y")
  |> filter(fn: (r) => r["_field"] == "offabs")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> drop(columns: ["_measurement","_field","_start","_stop"])
  |> rename(columns: {_value: "off_end"})
join(tables: {mst:offmst, end:offend}, on: ["_time"])
  |> map(fn: (r) => ({ r with _value: r.off_end - r.off_mst}))
  |> drop(columns: ["off_mst","off_end"])
  |> yield()

That selects 2017 rows for offmst and 6 tables with 2017 rows for offend.
I’ve also looked at windowPeriod of 20m and 60m, that gives 1009 and 337 rows, respectively,

From the profiling data I extracted

  • total CPU time
  • maximal allocated memory
  • CPU time of *universe.mergeJoinTransformation
Wind #rec  cpu_tot    mem_tot  cpu_join
10m  2017    3.746  665990080     3.514
20m  1009    1.148  167680832     1.000
60m   337    0.279   19480896     0.188

Both the CPU time and the memory grow with the square or the row count.

I’m really puzzled.
Why does a join of two tables by a unique key (here _time) have a complexity of O(N*N) ?
I’d expect that both tables are first sorted (O(N*lnN)) and than merged (O(N+N)).
The _time columns are sorted, and even if they weren’t, it’s trivial to build a sorted index table.
Even the operation primitive is called mergeJoinTransformation.

So I see no reason for an O(N*N) complexity here. But that’s what I get.

What am I overlooking here ?
Should the query be written differently ?
Any help/hint highly welcome.

I’ve modified the query and

  • collapsed the filter stages, used keep instead of drop (mostly cosmetics)
  • used group() to collapse the second tableset, and re-grouped later
offmst = from(bucket: "dca")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "TfcMonitor" and
                       r["host"] == "cbmin00y" and
                       r["_field"] == "offabs")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> keep(columns: ["_time","_value"])
  |> rename(columns: {_value: "off_mst"})
offend = from(bucket: "dca")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "TfcMonitor" and
                       r["host"] != "cbmin00y" and
                       r["_field"] == "offabs")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> keep(columns: ["_time","_value", "host", "oid"])
  |> group()
  |> sort(columns: ["_time"])
  |> rename(columns: {_value: "off_end"})
join(tables: {mst:offmst, end:offend}, on: ["_time"])
  |> map(fn: (r) => ({ r with _value: r.off_end - r.off_mst}))
  |> drop(columns: ["off_mst","off_end"])
  |> group(columns: ["host", "oid"], mode:"by")
  |> sort(columns: ["_time"])
  |> yield()

The performance issue is still the same

Wind #rec  cpu_tot    mem_tot  cpu_join
10m  2017  2.983    598445952  2.720
20m  1009  0.974    150529920  0.759 
60m   337  0.227     17273856  0.100 

I’ve tried this query directly in InfluxDB Explorer, and see a query time which grows roughly with row counts squared.

I’ve attached the profile loges generated with influxdb_client for detailed inspection by experts.
profile_10m.txt (9.3 KB)
profile_20m.txt (9.3 KB)
profile_60m.txt (9.3 KB)

You can analyze your query with the Profiler (http://wiki.webperfect.ch/index.php?title=InfluxDB:Flux-Analyze_Query(Profiler)) and then maybe you find the root cause of the slow performance and with the following guide line you can optimize your query: Optimize Flux queries | InfluxDB Cloud Documentation.

Hi @fluxator ,
I did use the profiler, I wrote

So I used the python influxdb_client and looked into profile data

influxdb_client allows to enable profiling, and that’s what I did.
The profile logs were appended in the 2nd posting.

And I did determine the root case, that’s the CPU time of *universe.mergeJoinTransformation.
That grows with row count squared, it should grow grow linear, or at worst with N ln(N).

And I looked into the Optimize Flux queries.
But all hints given there doesn’t address the current case.
When I correlate data from different series a join is unavoidable.
The join is imho the single most important improvement of Flux over InfluxQL
And join has a performance problem.

I read the issue quickly, but have you tried the experimental.join function? If I remember well, for joins on time like yours it positively impacts the performance quite a lot

@MzazM ,
yes, I’ve tried experimental.join, and ran into a stability issue

experimental.join() joins on group key and _time. That forced me to flatten a table stream with group() and later regroup. Because it panics when input starts with empty rows I’ve to take care of that too. After all of this indeed it works, and performs with query times linear in row count.

So we have join() with a performance issue and experimental.join() with stability issue.
And I repeat what I stated earlier:

Why does a join of two tables by a unique key (here _time ) have a complexity of O(N*N) ?

Is experimental.join() an attempt to bypass this problem (instead of solving it) ?
Maybe one of the developers can comment on this and the strategy to resolve this.

2 Likes

I have a very similar problem posted here: Panic: arrow/array: index out of range
Did you find any solution for your problem?