Query performance

Hi all,
I have a Flux query using Influx 2.7 that turns out to be really slow on our reasonably small datasets. So slow in fact, that the C# Influx client will time out after 30s.
I know I can stream the results rather than wait for all of them to be returned at once.
But still I am trying to find a way to optimize the query itself.
I am open to using InfluxQL as well.

The implicit schema of the “message” measurement is as follows:
tags: source, equipment_number, production_line, machine, station, hint, severity
fields: state (bool), importance (long)

The query:
import “contrib/tomhollingworth/events”

from(bucket: “pv_deduplications”)
|> range(start:1704110390, stop:1704110400)
|> filter(fn: (r) => exists r[“source”])
|> filter(fn: (r) => r.equipment_number == “ma_1000”)
|> filter(fn: (r) => r[“_measurement”] == “message”)
|> sort(columns: [“_time”])
|> pivot(columnKey: [“_field”], rowKey: [“_time”], valueColumn: “_value”)
|> group(columns: [“production_line”, “machine”, “station”, “hint”])
|> events.duration(unit: 1ms, stopColumn: “_time”)

In practice there are many "equipment_number"s in the filter, say 20-100.
One equipment_number corresponds to exactly one tuple (station, machine, production_line).
There are on the order of 1000s of "hint"s.
There are 3 "severity"s.
There are 3 "source"s, they are not important for the query.

I found that events.duration unsurprisingly slows down the query.
Pivot also slows it down, but is a prerequisite of using events.duration properly.

What can I do on the query side to accelerate things?
What can I do on the client side?

@shuebner Actually, I think the biggest culprit here is sort. Sorting is a heavy operation and data returned from from() is sorted by time by default, so you don’t even need to do it. You can completely remove sort.

events.duration() should work with both pivoted and unpivoted data. Did you get an error when you tried using it on unpivoted data? When you using using it “properly,” what do you mean?

@scott Thanks for answering that quickly.

Sorry, you are right, the “sort” does not belong in the above query. The above query is a simplification of my original query.
The original query has three almost identical subqueries that are joined by “union”, which does not preserve time ordering. AFAIK there is (surprisingly) no way to concatenate multiple series in Influx while keeping time ordering.

For events.duration to work correctly, the stream after “union” needs to be sorted by time, so no way around that I suppose.
By “working properly” I meant that I need the data pivoted in the end anyway. Working unpivoted seems to be ~20 % faster, but will transfer twice the data, which also adds to the response time.

In the end I just want something very simple: a list of time intervals where the “state” field was “true” (the latter is done on the client side since there seems to be no way of doing this in influx at all).

Could I see your previous query? union() does preserve sort order, but group() does not guarantee sort order. I’m betting it was grouping your data that caused things to be out of time order.

Yes, data does need to be sorted by time, but it does not need to be pivoted for events.duration() to work.

There’s ways to leverage Flux’s “pushdown” capabilities that pushes computation down to the storage layer rather than pulling the data into memory and operating on it there. Pivot operations can not be pushed down, so all the data has to be pulled into memory. You can still pivot the data, but I’d do it as close to the end of the query as possible.

Oh man, there’s one feature missing from Flux that would make identifying “cycles” like this so easy. It’s a feature similar to map() that lets you use data from previous rows while operating on the current row. Here’s the GitHub issue with the proposal: EPIC: scan function · Issue #4671 · influxdata/flux · GitHub. With this, you could uniquely identify each period where the state was true and then identify the time range of each period. As it stands, there isn’t really a way to do this with Flux. If you group by state, all the true states are grouped together and there’s no way to identify where false states existed between them.

There are functions that provide some functionality that may help (monitor.stateChanges and monitor.stateChangesOnly), but these are specifically designed for the internal InfluxDB checks and notification system and don’t provide much flexibility for using them outside of that system.

This is the whole query:

import "date"
import "contrib/tomhollingworth/events"
before = from(bucket: "pv_deduplications")
 |> range(start:date.add(d: -601s, to: 2024-01-01T11:59:50.0000000+00:00), stop:1704110390)
 |> filter(fn: (r) => exists r["source"])
 |> filter(fn: (r) => r.equipment_number == "ma_1000")
 |> filter(fn: (r) => r["_measurement"] == "message")
 |> last()
after = from(bucket: "pv_deduplications")
 |> range(start:1704110400, stop:date.add(d: 601s, to: 2024-01-01T12:00:00.0000000+00:00))
 |> filter(fn: (r) => exists r["source"])
 |> filter(fn: (r) => r.equipment_number == "ma_1000")
 |> filter(fn: (r) => r["_measurement"] == "message")
 |> first()
range = from(bucket: "pv_deduplications")
 |> range(start:1704110390, stop:1704110400)
 |> filter(fn: (r) => exists r["source"])
 |> filter(fn: (r) => r.equipment_number == "ma_1000")
 |> filter(fn: (r) => r["_measurement"] == "message")
union(tables: [before, after, range])
 |> sort(columns: ["_time"])
 |> pivot(columnKey: ["_field"], rowKey: ["_time"], valueColumn: "_value")
 |> group(columns: ["production_line", "machine", "station", "hint"])
 |> events.duration(unit: 1ms, stopColumn: "_time")

union() does preserve sort order

That is surprising, given that the documentation explicitly states that it does not: union() function | Flux Documentation

You can still pivot the data, but I’d do it as close to the end of the query as possible

And even then, TTFB is on the order of 50 times slower than without pivoting.

In summary, I cannot do anything with Flux except filtering by measurement, field names and tags. Primitive things like concatenating, grouping by tags and pivoting are prohibitively expensive. Even time ordering is lost there, requiring another prohibitively expensive sort operation.

This makes InfluxDB a glorified buffer where almost everything needs to be done on the client side. Disappointing.