Problems migrating InfluxQL query to Flux

Hi, we are in the midst of migrating a query that generates a data export.

Our data is made up of a measurement that represent the customer, a single tag that represents the device the data is coming from and a number of fields (in the tenths) that represent several sensor and cumulative values.
An example would be:

11EB768FB98C4270B6D16BAEE17CF0DD,deviceId=78767 EM1_Energy=124642,EM1_Energy_Tariff1=0,EM1_Energy_Tariff2=0,EM1_Flow=361,EM1_Flow_Temperature=64.18,EM1_Power=8.1,EM1_Return_Temperature=44.44,EM1_Serial=224212,EM1_Uptime=3192,EM1_Volume=4237,HC1_FlowTemperature=34.64,HC1_FlowTemperature_Ref=11,HC1_ReturnTemperature=30.96,HC1_ReturnTemperature_Extra=192,HC1_ReturnTemperature_Ref=40,HC1_RoomTemperature=192,HC1_RoomTemperature_Limit=18,HC2_FlowTemperature=49.3,HC2_FlowTemperature_Ref=52,HC2_ReturnTemperature=192,HC2_ReturnTemperature_Extra=0.01,HC2_ReturnTemperature_Ref=50,Outdoor_Temperature=27.64,Primary1_FlowTemperature=192,Primary1_ReturnTemperature=192 1625492641215000000

Over this series, we run a query that performs a variety of aggregations over several of the fields for a subset of devices for a time window. The aggregations can be min, max, first, last, average and delta (the difference between the value of a field at the end of window minus the value of a field at the beginning of the window).

The InfluxQL query looks like:

select Min("Outdoor_Temperature") as "0", (Last("Outdoor_Temperature") - First("Outdoor_Temperature")) AS "1" from "11111111111111111111111111111111" where ("deviceId" = '179474' or "deviceId" = '237129' or "deviceId" = '237140' or "deviceId" = '254415' or "deviceId" = '323126' or "deviceId" = '323127' or "deviceId" = '5312' or "deviceId" = '53753' or "deviceId" = '999000037') and time >= 1617228000000ms and time <= 1619819999999ms group by "deviceId",time(60m) fill(previous)

We tried with this flux query:

doRename = (t=<-, name= "") => t
        |> map(fn: (r) => ({ r with _field: name }))

data = from(bucket: "development_readouts/high")
        |> range(start: time(v: "2021-03-31T22:00:00.000Z"), stop: time(v: "2021-04-30T21:59:59.999Z"))
        |> filter(fn: (r) => r._measurement == "11111111111111111111111111111111")
        |> filter(fn: (r) => r.deviceId == "179474" or r.deviceId == "237129" or r.deviceId == "237140" or r.deviceId == "254415" or r.deviceId == "323126" or r.deviceId == "323127" or r.deviceId == "5312" or r.deviceId == "53753" or r.deviceId == "999000037")

t0 = data
        |> filter(fn: (r) => r._field == "EM1_Power")
        |> aggregateWindow(every: duration(v: "60m"), fn: first, timeSrc: "_start")
        |> difference(nonNegative: false, columns: ["_value"])
        |> doRename(name: "0")

t1 = data
        |> filter(fn: (r) => r._field == "Outdoor_Temperature")
        |> aggregateWindow(every: duration(v: "60m"), fn: min)
        |> doRename(name: "1")


union(tables: [t0, t1])
        |> pivot(
                rowKey:["deviceId", "_time"],
                columnKey: ["_field"],
                valueColumn: "_value"
        )
        |> group(columns: ["deviceId"], mode:"by")
        |> drop(columns: ["_start", "_stop", "_measurement"])
        |> rename(columns: {_time: "t"})
        |> yield(name: "detailed")

But we have two problems:

  1. Whenever we select more than one field (we can have multiple multiple aggregations over the same field) results come “out of order”
doRename = (t=<-, name= "") => t
        |> map(fn: (r) => ({ r with _field: name }))

data = from(bucket: "development_readouts/high")
       |> range(start: time(v: "2021-03-31T22:00:00.000Z"), stop: time(v: "2021-04-30T21:59:59.999Z"))
        |> filter(fn: (r) => r._measurement == "11111111111111111111111111111111")
        |> filter(fn: (r) => r.deviceId == "179474" or r.deviceId == "237129" or r.deviceId == "237140" or r.deviceId == "254415" or r.deviceId == "323126" or r.deviceId == "323127" or r.deviceId == "5312" or r.deviceId == "53753" or r.deviceId == "999000037")

t0 = data
        |> filter(fn: (r) => r._field == "EM1_Power")
        |> aggregateWindow(every: duration(v: "60m"), fn: first, timeSrc: "_start")
        |> difference(nonNegative: false, columns: ["_value"])
        |> doRename(name: "0")

t1 = data
        |> filter(fn: (r) => r._field == "EM1_Power")
        |> aggregateWindow(every: duration(v: "60m"), fn: last)
        |> doRename(name: "1")

t2 = data
        |> filter(fn: (r) => r._field == "EM1_Power")
        |> aggregateWindow(every: duration(v: "60m"), fn: first)
        |> doRename(name: "2")

t3 = data
        |> filter(fn: (r) => r._field == "Outdoor_Temperature")
        |> aggregateWindow(every: duration(v: "60m"), fn: min)
        |> doRename(name: "3")

union(tables: [t0, t1, t2, t3])
        |> pivot(
                rowKey:["deviceId", "_time"],
                columnKey: ["_field"],
                valueColumn: "_value"
        )
        |> group(columns: ["deviceId"], mode:"by")
        |> drop(columns: ["_start", "_stop", "_measurement"])
        |> rename(columns: {_time: "t"})
        |> yield(name: "detailed")
  1. Performance of the flux query is much, much worse than it InfluxQL counterpart. InfluxQL 184ms with correct results. Flux 2537s and the results come “out of order”

Can we get some hints on what are we doing wrong with this fux query?

Thanks in advance

1 Like

Addressed here: