I think i'm using FLUX pivot wrong

I am migrating from a mySQL based IoT system to InfluxDB. All is good except for one dashboard item.

The application is logging and displaying data in a solar energy system.

I used a complex SQL statement to access 8 datapoints to display a status statement, such as “Generator will start in about 12 minutes” based on current state of charge and consumption of energy.

when I switched to Influxdb I distributed the datapoints to separate measurements based on the devices in the system. . (inverter, generator, solar panel, battery monitor, etc.)

I’ve got it working but it takes many seconds to generate the statement. most other queries and dashboard graphs happen quickly
My question is two parts:

  1. Am I doing this all wrong and if so point me down the right path or…
  2. What can I do to speed this up

As a new user I cannot attach the script.

import "math"
// https://docs.influxdata.com/flux/v0.65/stdlib/math/
starttime = dashboardTime
// starttime = -2m
aggregatetime = 1m

from(bucket: "cottage_2020/autogen")
  |> range(start: starttime, stop: now())
  |> filter(fn: (r) => r._measurement =~ /REMOTE|BMK|AGS/ 
      and r._field =~ /socstart|socstop|amph|adc|soc|running|status|status_text/)
  |> fill(usePrevious: true)
  |> aggregateWindow(every: aggregatetime, fn: last)
  // removing last gives me a lot of results, using last restricts it to the most recent
  |> last()
  |> drop(fn: (column) => (column =~ /_measure*/))
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) =>
       ({_time: r._time, 
        socstart: float(v: r.socstart)  / 100.0, 
        socstop: float(v: r.socstop) / 100.0,
        amph: float(v: r.amph),
        adc: float(v: r.adc),
        soc: float(v: r.soc) / 100.0,
        usedsoc: 1.0 - (float(v: r.soc) / 100.0),
        running: r.running,
        status: r.status,
        status_text: r.status_text,
        }))
  |> map(fn: (r) =>
      ({_time: r._time, 
        description: 
          if r.running == true then 
              if r.status == 13 then 
                  "Generator will stop in about " + string(v: int(v: math.abs(x: r.amph) / r.usedsoc * (r.socstop - r.soc)  * 60.0)) + " minutes"
              else 
                  "Generator " +  r.status_text
          else if r.soc < r.socstart then "Generator should have started!"
          else if r.soc >= 1.0 or r.amph >= 0.0 then "All is good"
          else if r.adc < 0.0 then
              if (math.abs(x: r.amph) / r.usedsoc * (r.soc - r.socstart) / math.abs(x: r.adc)) > 2 then 
                  "Discharging " + string( v: 0.0 - r.adc)
              else 
                  "Generator starting in about " + string(v: int(v:  math.ceil(x: math.abs(x: r.amph)  / r.usedsoc * (r.soc - r.socstart) / math.abs(x: r.adc) * 60.0 ))) + " minute"
          else if r.adc > 0.0 then 
              if (math.abs(x: r.amph) / r.adc) > 2.0 then 
                  "Charging " + string( v: r.adc)
              else
                  "Fully charged in about " + string(v: int(v: math.abs(x: r.amph) / r.adc * 60.0)) + " minutes"
          else
              "Treading water"}
  ))

+1 for providing such a detailed description.
I don’t have much time right now, will look further later, but two things stand out:

Selecting column names by regular expression Into a list first, and then passing that in as a “in list” may yield some speed up

You might want to look at refactoring this data into a flux task , if I’m understanding it it would be recalculating the whole thing every time it executes.

Thanks for your interest.
I’m not sure what you mean by “Selecting column names by regular expression Into a list first”. I am using regular expressions on my initial filter. Should I use a different function or construct?

If I made this a task how would I exploit it in a dashboard, particularly a Grafana dashboard? Yes it is calculated each time but only when the dashboard is active. I don’t want to fill the database with these status items, just present the status in a dashboard. But… I’m still learning so I could be failing to grasp the concept of Tasks.

@CharlesGodwin I took a stab at it and you should see some performance gains. Here’s what I did:

  • Use the contains() function to see if the field key exists in a set of predefined field keys instead of evaluating by regex. I don’t know if you’ll get any performance gains from this, but I think this is what @FixTestRepeat was trying to say.
  • It didn’t look like you needed the aggregateWindow() function at all. You were using it to select the last point from each 1 minute window, but then you ran last() after that which will just return the last reported value for each table. aggreatedWindow() is notoriously slow (it’s been optimized in InfluxDB 2.0 and newer versions of Flux) and removing it here does not change the output of last() that you call right after it.
  • Merge the map() calls into a single call. Each time you call map(), it has to iterate over each row in the input data. Combine that with some pretty heavy conditional logic, and executing map twice, it can add up. The query isn’t as readable with one map call is it is with two, but it doesn’t have to iterate over the data twice.

Let me know if you see any improvements.

import "math"

starttime = dashboardTime
fields = ["socstart","socstop","amph","adc","soc","running","status","status_text"]

from(bucket: "cottage_2020/autogen")
  |> range(start: starttime)
  |> filter(fn: (r) =>
    r._measurement =~ /REMOTE|BMK|AGS/ and contains(set: fields, value: r._field)
  |> fill(usePrevious: true)
  |> last()
  |> drop(columns: ["_measurement"] )
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) =>
      ({_time: r._time, 
        description: 
          if r.running == true then 
              if r.status == 13 then 
                  "Generator will stop in about " + string(v: int(v: math.abs(x: float(v: r.amph)) / (1.0 - (float(v: r.soc) / 100.0)) * ((float(v: r.socstop) / 100.0) - (float(v: r.soc) / 100.0) )  * 60.0)) + " minutes"
              else 
                  "Generator " +  r.status_text
          else if (float(v: r.soc) / 100.0) < (float(v: r.socstart) / 100.0) then "Generator should have started!"
          else if (float(v: r.soc) / 100.0) >= 1.0 or (float(v: r.amph)) >= 0.0 then "All is good"
          else if (float(v: r.adc)) < 0.0 then
              if (math.abs(x: (float(v: r.amph))) / (1.0 - (float(v: r.soc) / 100.0)) * ((float(v: r.soc) / 100.0) - (float(v: r.socstart) / 100.0)) / math.abs(x: (float(v: r.adc)))) > 2 then 
                  "Discharging " + string( v: 0.0 - (float(v: r.adc)))
              else 
                  "Generator starting in about " + string(v: int(v:  math.ceil(x: math.abs(x: (float(v: r.amph)))  / (1.0 - (float(v: r.soc) / 100.0)) * ((float(v: r.soc) / 100.0) - (float(v: r.socstart) / 100.0)) / math.abs(x: (float(v: r.adc))) * 60.0 ))) + " minute"
          else if (float(v: r.adc)) > 0.0 then 
              if (math.abs(x: (float(v: r.amph))) / (float(v: r.adc))) > 2.0 then 
                  "Charging " + string( v: (float(v: r.adc)))
              else
                  "Fully charged in about " + string(v: int(v: math.abs(x: (float(v: r.amph))) / (float(v: r.adc)) * 60.0)) + " minutes"
          else
              "Treading water"}
  ))
1 Like

Thanks for your time to consider my problem.

  • I love the contains function. Thanks. It helps performance. I was looking for something like that but failed to notice it.
  • I used aggregateWindow as the time stamp for each measurement is slightly different and how else can I get the time normalized so that the pivot works? I also use the last to, hopefully, reduce the rows to only one row which should be the most recent.
  • The interval for all measurements is currently 30 seconds so all time stamps should be with 30 seconds but I would like the code to survive a 60 second interval.
  • Using 2 map steps allows me to precalculate and normalize some values. Is there a way to generate AND reference a variable in the map routine? I accept that 2 map statements will cause re iterating all rows but at this point in the code there should only be one row
  • I tested and deleted the second map and performance was significantly faster. That second map, which I admit is complex, seems to be the performance problem.
import "math"
// https://docs.influxdata.com/flux/v0.65/stdlib/math/

starttime = -1m
aggregatetime = 30m
measurements =  ["REMOTE", "BMK", "AGS"]
fields = ["socstart", "socstop", "amph", "adc", "soc", "running", "status", "status_text"]
from(bucket: "cottage_2020/autogen")
  |> range(start: starttime, stop: now())
  |> filter(fn: (r) =>
      contains(value: r._measurement, set: measurements) and  contains(value: r._field, set: fields)
  )
  |> aggregateWindow(every: aggregatetime, fn: last)
  // removing last gives me a lot of results, using last restricts it to the most recent
  |> last()
  |> drop(fn: (column) => (column =~ /_measure*/))
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) =>
       ({_time: r._time, 
        socstart: float(v: r.socstart)  / 100.0, 
        socstop: float(v: r.socstop) / 100.0,
        amph: float(v: r.amph),
        adc: float(v: r.adc),
        soc: float(v: r.soc) / 100.0,
        usedsoc: 1.0 - (float(v: r.soc) / 100.0),
        running: r.running,
        status: r.status,
        status_text: r.status_text,
        }))
  |> map(fn: (r) =>
      ({_time: r._time,  
        description: 
          if r.running == true then 
              if r.status == 13 then 
                  "Generator will stop in about " + string(v: int(v: math.abs(x: r.amph) / r.usedsoc * (r.socstop - r.soc)  * 60.0)) + " minutes"
              else 
                  "Generator " +  r.status_text
          else if r.soc < r.socstart then "Generator should have started!"
          else if r.soc >= 1.0 or r.amph >= 0.0 then "All is good"
          else if r.adc < 0.0 then
              if (math.abs(x: r.amph) / r.usedsoc * (r.soc - r.socstart) / math.abs(x: r.adc)) > 2 then 
                  "Discharging @ " + string( v: 0.0 - r.adc)
              else 
                  "Generator starting in about " + string(v: int(v:  math.ceil(x: math.abs(x: r.amph)  / r.usedsoc * (r.soc - r.socstart) / math.abs(x: r.adc) * 60.0 ))) + " minute"
          else if r.adc > 0.0 then 
              if (math.abs(x: r.amph) / r.adc) > 2.0 then 
                  "Charging @ " + string( v: r.adc)
              else
                  "Fully charged in about " + string(v: int(v: math.abs(x: r.amph) / r.adc * 60.0)) + " minutes"
          else
              "Treading water"}
  ))

Rather than using aggregateWindow() to normalize timestamps (it can be a very expensive operation), use truncateTimeColumn() to normalize all timestamps to a particular precision.

I suspected as much. The conditional logic does take some time to evaluate, especially with the mathematic calculations in there. I don’t know of an easy way to optimize this.

@scott truncateTimeColumn() helped with a few things. and it is also a more straightforward answer to the normalizing problem.

Thanks to your help I am satisfied that my two questions have been answered.

  • Yes, it’s a reasonable solution
  • I got excellent advise on optimizing the solution.
2 Likes