We’re currently struggling with converting a relatively straightforward query from InfluxQL to Flux on InfluxDB 2.x. It mostly revolves on how to aggregate multiple columns after filtering on a pivotted table. Note that this can be a few dozen columns, and that these might also have NaN values.
A small sample has been added below.
This sample assumes an org called test_org
, a bucket called test_bucket
, and that the following command has been ran to allow for InfluxQL queries:
influx v1 dbrp create \
--db example-db \
--rp example-rp \
--bucket-id <id for test_bucket> \
--default
Inserting dummy data:
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb import DataFrameClient
token = "<token>"
measurement = "test_measurement"
client = InfluxDBClient(
url="http://localhost:8086",
token=token,
org="test_org",
)
write_api = client.write_api()
query_api = client.query_api()
df = pd.DataFrame(
index=pd.date_range('2022-08-01T10:00Z', freq='0.5H', periods=6, tz="UTC"),
data={
"tag_1": ["A", "A", "A", "A", "A", "A"],
"field_1": [100, 50, 100, 50, 100, 50],
"field_2": [900, 1000, 900, 1, 1, 5],
"field_3": [True, True, True, False, False, False],
}
)
write_api.write(
bucket="test_bucket",
record=df,
data_frame_measurement_name=measurement,
data_frame_tag_columns=["tag_1"],
)
This inserts the following dataframe:
tag_1 | field_1 | field_2 | field_3 | |
---|---|---|---|---|
2020-05-24 10:00:00+00:00 | A | 100 | 900 | True |
2020-05-24 10:30:00+00:00 | A | 50 | 1000 | True |
2020-05-24 11:00:00+00:00 | A | 100 | 900 | True |
2020-05-24 11:30:00+00:00 | A | 50 | 1 | False |
2020-05-24 12:00:00+00:00 | A | 100 | 1 | False |
2020-05-24 12:30:00+00:00 | A | 50 | 5 | False |
What we want to do in this case is get the average of field_1
and field_2
between two timestamps, for the cases where tag_1
is A
and where field_2
is larger than 500. But note that yes, in this case we only want to get the average of two columns. In the real world scenario we might be talking about dozens of fields.
In InfluxQL we’d do this with the following query, where we can replace mean(*)
with basically any number of columns:
dfclient = DataFrameClient(
username=None,
password=None,
database="example-db",
headers={"Authorization": f'Token {token}'},
)
# Or alternatively: 'SELECT mean("field_1") as "field_1", mean("field_2") as "field_2"', etc
res = dfclient.query(f"""
SELECT mean(*)
FROM "{measurement}"
WHERE
tag_1 = 'A'
AND field_2 > 500
AND time >= '2022-08-01T10:00:00Z'
AND time < '2022-08-01T13:00:00Z'
GROUP BY time(1h)
""")[measurement]
This produces the following dataframe:
mean_field_1 | mean_field_2 | |
---|---|---|
2022-08-01 10:00:00+00:00 | 75 | 950 |
2022-08-01 11:00:00+00:00 | 100 | 900 |
2022-08-01 12:00:00+00:00 | nan | nan |
For Flux we currently have the following:
query_api.query_data_frame("""
from(bucket: "test_bucket")
|> range(start: 2022-08-01T10:00:00Z, stop: 2022-08-01T13:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "test_measurement")
|> filter(fn: (r) => r["_field"] == "field_1" or r["_field"] == "field_2")
|> filter(fn: (r) => r["tag_1"] == "A")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r["field_2"] > 500)
""")
This produces the following:
result | table | _start | _stop | _time | _measurement | tag_1 | field_1 | field_2 | |
---|---|---|---|---|---|---|---|---|---|
0 | _result | 0 | 2022-08-01 10:00:00+00:00 | 2022-08-01 13:00:00+00:00 | 2022-08-01 10:00:00+00:00 | test_measurement | A | 100 | 900 |
1 | _result | 0 | 2022-08-01 10:00:00+00:00 | 2022-08-01 13:00:00+00:00 | 2022-08-01 10:30:00+00:00 | test_measurement | A | 50 | 1000 |
2 | _result | 0 | 2022-08-01 10:00:00+00:00 | 2022-08-01 13:00:00+00:00 | 2022-08-01 11:00:00+00:00 | test_measurement | A | 100 | 900 |
Which is where we’re stuck. We need to pivot before we filter, because we need to drop those entire timestamps. But now we can’t use aggregateWindow(every: 1h, fn: mean, createEmpty: false)
, because it’s in the wrong shape (runtime error @8:6-8:62: aggregateWindow: column "_value" does not exist
)
So what’s the right way to aggregate data over time on a pivotted table, in a way that doesn’t really lock you to a number of sensors, or to a specific aggregation window, and supports NaNs?
Is the only option really just to do a window
followed by a custom reduce
method, that should be implemented for possibly up to a few dozen sensors, and also handle NaN values?
At least based on the few other topics I’ve found so far that seems to be the case:
- Multiple Aggregation Projections in fluxql
- https:// community. influxdata. com/t/flux-multiple-aggregates/10221
- Create custom aggregate functions | InfluxDB OSS 2.0 Documentation