Trying To Subtract First Point From Time Series In Fluxlang

Hello Everyone.

I am trying to combine a few join’s via the below code. In short, I have a time series graph that I would like to start at value 0 for a given time range instead of the initial dynamic value.

For example; if the time series starts at value 123, then 123 would have to be subtracted from every value displayed to create a graph that starts at 0.

InfluxDB: v1.8.10
Chronograf: v1.8.10

Apologies for the image, I am only allowed to post one embed. Please open the image in a new window.

Why do I want this?
In Grafana you can only alert on Time Series panels. Take a look at the purple and green Stat panels shown in the below image. These panels have a delta transformation applied and the data starts at 0 for the given time range.
For example the time range selected here is 3 hours. Over this 3 hour period 1.09 GB of data was used in total per the Stat panel. I could set a alert for 100 GB for the start of the time period, however alerts can only be used on Time Series.

Now if we look at the Time Series, it showing 2.74 TB at the start! Way past the 100 GB limit. Although the total usage increases by the correct amount of the time period, the panel needs to start at 0 if you wanted to set a proper alert.

Code
Flux query language code.

rx = from(bucket: "unifidreammachinepro/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "usg_wan_ports" and r._field == "rx_bytes" )

tx = from(bucket: "unifidreammachinepro/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "usg_wan_ports" and r._field == "tx_bytes" )

rxtx = join(tables: {rx: rx, tx: tx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rx + r._value_tx
      })
    )

firstrxtx = join(tables: {rx: rx, tx: tx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rx + r._value_tx
      })
    )
  |> first()
  |> fill(usePrevious: true)

join(tables: {rxtx: rxtx, firstrxtx: firstrxtx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rxtx - r._value_firstrxtx
      })
    )

Hello @ThePie,
Thanks for your detailed explanation.

rx = from(bucket: "unifidreammachinepro/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "usg_wan_ports" and r._field == "rx_bytes" )

tx = from(bucket: "unifidreammachinepro/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "usg_wan_ports" and r._field == "tx_bytes" )

Since your data is from the same measurement there isn’t really a need to join your data. What you really want is to pivot your data. pivot() function | Flux 0.x Documentation Specifically use the schema.fieldsAsCol function which is a special application of the pivot function.

rxtx = join(tables: {rx: rx, tx: tx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rx + r._value_tx
      })
    )

firstrxtx = join(tables: {rx: rx, tx: tx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rx + r._value_tx
      })
    )
  |> first()
  |> fill(usePrevious: true)

Now you don’t need to fill previous with the first value instead you can extract it with findRecord(). This way you also avoid needing to perform another join.

join(tables: {rxtx: rxtx, firstrxtx: firstrxtx}, on: ["_time"])
  |> map(fn: (r) => ({
      _time: r._time,
      _value: r._value_rxtx - r._value_firstrxtx
      })
    )

Joins can be computationally expensive and increase your query duration, especially over large ranges of data. It’s better to try pivoting or extracting a scalar with findRecord.

I might try:

import "influxdata/influxdb/schema"

rxtx = from(bucket: "unifidreammachinepro/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => r._measurement == "usg_wan_ports" and r._field == "rx_bytes"  or r._field == "tx_bytes" )
 |> schema.fieldsAsCols()
 |> map(fn: (r) => ({ _value: r.rx_bytes + r.tx_bytes }))
// |> yield(name: "sum")

firstrxtx = rxtx
 |> first()
// |> yield(name: "first value of sum")
 |> findRecord(fn: (key) => true ),  idx: 0)

//to verify that you have extracted the correct value with find record you can do: 
// import "array" <--add this to the top of the script

// array.from(rows: [
//   {_time: now(), test: firstrxtx._value},
// ])

final = rxtx  |> map(fn: (r) => ({ _value: r._value - firstrxtx._value }))
 |> yield(name: "final")

use multiple yield statements to verify that you’re getting what you expect along the way.
uncomment/comment out code with cmd + / on mac in the UI.

For combining fields of within a measurement the usage if pivot() is the certainly the way to go.
@Anaisdg wrote: Joins can be computationally expensive.
That’s what I found for in a case where one afaik really needs a join().
See posting Poor performance for join(): cpu and memory grows quadratically with row count .
So questions to @ThePie :

  • did your original query with a join() have decent performance ?
  • did you also see a quadratic dependence of row count ?