Aggregating arbitrary number of columns over time after pivoting and filtering results

We’re currently struggling with converting a relatively straightforward query from InfluxQL to Flux on InfluxDB 2.x. It mostly revolves on how to aggregate multiple columns after filtering on a pivotted table. Note that this can be a few dozen columns, and that these might also have NaN values.

A small sample has been added below.

This sample assumes an org called test_org, a bucket called test_bucket, and that the following command has been ran to allow for InfluxQL queries:

influx v1 dbrp create \
  --db example-db \
  --rp example-rp \
  --bucket-id <id for test_bucket> \
  --default

Inserting dummy data:

import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb import DataFrameClient

token = "<token>"
measurement = "test_measurement"

client = InfluxDBClient(
    url="http://localhost:8086",
    token=token,
    org="test_org",
)
write_api = client.write_api()
query_api = client.query_api()

df = pd.DataFrame(
    index=pd.date_range('2022-08-01T10:00Z', freq='0.5H', periods=6, tz="UTC"),
    data={
        "tag_1": ["A", "A", "A", "A", "A", "A"],
        "field_1": [100, 50, 100, 50, 100, 50],
        "field_2": [900, 1000, 900, 1, 1, 5],
        "field_3": [True, True, True, False, False, False],
    }
)

write_api.write(
    bucket="test_bucket",
    record=df,
    data_frame_measurement_name=measurement,
    data_frame_tag_columns=["tag_1"],
)

This inserts the following dataframe:

tag_1 field_1 field_2 field_3
2020-05-24 10:00:00+00:00 A 100 900 True
2020-05-24 10:30:00+00:00 A 50 1000 True
2020-05-24 11:00:00+00:00 A 100 900 True
2020-05-24 11:30:00+00:00 A 50 1 False
2020-05-24 12:00:00+00:00 A 100 1 False
2020-05-24 12:30:00+00:00 A 50 5 False

What we want to do in this case is get the average of field_1 and field_2 between two timestamps, for the cases where tag_1 is A and where field_2 is larger than 500. But note that yes, in this case we only want to get the average of two columns. In the real world scenario we might be talking about dozens of fields.

In InfluxQL we’d do this with the following query, where we can replace mean(*) with basically any number of columns:

dfclient = DataFrameClient(
    username=None,
    password=None,
    database="example-db",
    headers={"Authorization": f'Token {token}'},
)

# Or alternatively: 'SELECT mean("field_1") as "field_1", mean("field_2") as "field_2"', etc
res = dfclient.query(f"""
    SELECT mean(*)
    FROM "{measurement}" 
    WHERE
      tag_1 = 'A'
      AND field_2 > 500 
      AND time >= '2022-08-01T10:00:00Z'
      AND time < '2022-08-01T13:00:00Z'
    GROUP BY time(1h)
""")[measurement]

This produces the following dataframe:

mean_field_1 mean_field_2
2022-08-01 10:00:00+00:00 75 950
2022-08-01 11:00:00+00:00 100 900
2022-08-01 12:00:00+00:00 nan nan

For Flux we currently have the following:

query_api.query_data_frame("""
from(bucket: "test_bucket")
  |> range(start: 2022-08-01T10:00:00Z, stop: 2022-08-01T13:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "test_measurement")
  |> filter(fn: (r) => r["_field"] == "field_1" or r["_field"] == "field_2")
  |> filter(fn: (r) => r["tag_1"] == "A")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r["field_2"] > 500)
""")

This produces the following:

result table _start _stop _time _measurement tag_1 field_1 field_2
0 _result 0 2022-08-01 10:00:00+00:00 2022-08-01 13:00:00+00:00 2022-08-01 10:00:00+00:00 test_measurement A 100 900
1 _result 0 2022-08-01 10:00:00+00:00 2022-08-01 13:00:00+00:00 2022-08-01 10:30:00+00:00 test_measurement A 50 1000
2 _result 0 2022-08-01 10:00:00+00:00 2022-08-01 13:00:00+00:00 2022-08-01 11:00:00+00:00 test_measurement A 100 900

Which is where we’re stuck. We need to pivot before we filter, because we need to drop those entire timestamps. But now we can’t use aggregateWindow(every: 1h, fn: mean, createEmpty: false), because it’s in the wrong shape (runtime error @8:6-8:62: aggregateWindow: column "_value" does not exist)

So what’s the right way to aggregate data over time on a pivotted table, in a way that doesn’t really lock you to a number of sensors, or to a specific aggregation window, and supports NaNs?

Is the only option really just to do a window followed by a custom reduce method, that should be implemented for possibly up to a few dozen sensors, and also handle NaN values?

At least based on the few other topics I’ve found so far that seems to be the case:

  1. Multiple Aggregation Projections in fluxql
  2. https:// community. influxdata. com/t/flux-multiple-aggregates/10221
  3. Create custom aggregate functions | InfluxDB OSS 2.0 Documentation

Hi @fdorssers,

I guess you have to do it like this:

_api.query_data_frame("""
base_query = from(bucket: "test_bucket")
  |> range(start: 2022-08-01T10:00:00Z, stop: 2022-08-01T13:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "test_measurement")
  |> filter(fn: (r) => r["tag_1"] == "A")
  |> filter(fn: (r) => r["_field"] == "field_1" or r["_field"] == "field_2")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => r["field_2"] > 500)

field_1_query = base_query
  |> duplicate(column: "field_1", as: "_value")
  |> aggregateWindow(every: 1h, fn: mean)

field_2_query = base_query
  |> duplicate(column: "field_1", as: "_value")
  |> aggregateWindow(every: 1h, fn: mean)

union(tables: [field_1_query, field_2_query])
""")

At least this should work, I’m not sure about the performance, maybe there are better ways to do it.

Hope this helps.

Cheers,
Fabian

Hi @funfried ,

Thanks! That’s indeed the method we ended up using. Though we had to do some chained joins to get the multiple field queries lined up.

We also tried creating a custom aggregate function, but that just ended up being a lot slower. Your approach was about as fast as we managed to get it.

Cheers!

Frank