Simple and cheap sensor calibration in InfluxDB for Google Cloud?

We’re uploading sensor data to an InfluxDB bucket hosted via InfluxDB Cloud GCP. We’d like a way to “calibrate” this data by simply offsetting it for given time ranges (and also correcting incoming data as it arrives). Is there any out-of-the-box solution for this in InfluxDB that we could use for free with our account without additional compute resources? Or do we have to host something like Telegraf or Kapacitor? We really want the bare bones and minimal setup.

Hello @JLow,
Welcome!
Yes there definitely is with 2.x and Flux.
You can offset data or timeshift with the following Flux function:

And tasks:

If you want to give me more detail about what you’re trying to accomplish (input and expected output) then I can help you write this Flux task.

1 Like

Hi @Anaisdg apologies for the slow response and thank you for your reply. timeShift is not what I am looking for, we are not looking to offset the time of the data but rather to offset the value but for different amounts given the time range. For instance:

  • Temperature offset - 10 degrees for data between 1/1/2018 - 10/10/2019
  • Temperature offset - 25 degrees for data between 10/11/2019 - today

The solution we arrived at was to use map and a dictionary in the front-end of Grafana using Flux to offset the sensor readings. A better solution is probably to use an influxdb Task to process the data periodically.

Our code looks like:

import "dict"
import "array"
import "strings"

// specify which field we care about in this panel
field = "CO2 concentration"

nodes = ${Nodes:json}

// Must remain sorted
// Records are of type: [{ node: string, sensor: string, measurement: string, offset: float, start: time, }]
calibrationTable = []

// It is necessary to specify a "default" for the
// dictionary that has the same type as the values in the
// dictionary however empty arrays are not considered 
// equivalent in type and as afar as I can
// tell there are no type annotations available.
defaultRecords = array.from(rows: 
                  [{ node: "Dummy", 
                    sensor: "dummy", 
                    measurement: "dummy", 
                    offset: 0.0,
                    start: v.timeRangeStop, }])
                    

calibrateData = (field, node, relevantValues, data) => {
  sensors = relevantValues |> findColumn(fn: (key) => true, column: "sensor")
  getValues = (x) => {
    values = relevantValues |> filter(fn:(r) => r.sensor == x)
    return values
  }
  pairs = sensors |> array.map(fn: (x) => ({key: x, value: getValues(x: x)}))
  calibrationDict = dict.fromList(pairs: pairs) 
  // Find offset by "_measurement" (sensor) name and default to 0.0
  getOffset = (key) => {
    records = dict.get(dict: calibrationDict, key: key._measurement, default: defaultRecords)
    record = records |> filter(fn: (r) => r.start <= key._time) |> findRecord(fn:(key) => true, idx: 0)
    result = if exists record.offset and record.node != "Dummy" then record.offset else 0.0
    return result
  }
  result = data |> map(fn: (r) => ({ r with _value: float(v: r._value) + getOffset(key: r)}))
  return result
}

getData = (field, node) => {
  filteredCalibrationTable = calibrationTable |> array.filter(fn: (x) => x.measurement == field and x.node == node and x.start <= v.timeRangeStop)
  uncalibratedData = 
    from(bucket: "sensor-data")
      |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
      |> filter(fn: (r) => r["state"] == "ACTIVE" and r["_field"] == field and exists r._value)
      |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
      |> filter(fn: (r) => r["node"] == node)
  data = if length(arr: filteredCalibrationTable) == 0 then
      uncalibratedData |> yield(name: strings.joinStr(arr: [field, node], v:"-"))
  else
    calibrateData(field: field, node: node, relevantValues: array.from(rows: filteredCalibrationTable), data: uncalibratedData) |> yield(name: strings.joinStr(arr: [field, node], v:"-"))
  return data
}


nodes |> array.map(fn:(x) => getData(field: field, node: x))
1 Like

@JLow,
Thanks for clarifying and sharing your solution.
What are you using Flux for? That’s a pretty cool solution.

Thank you!

We are using Flux to query InfluxDB as a data source in Grafana.

1 Like

Hello @JLow,
Sorry I meant in general, what are you using InfluxDB for?

Ah sorry I don’t know how specific I can be but we are building out a network of IOT sensors. Dashboards are mostly used internally but may be exposed to clients at some point…it is not yet 100% clear what the requirements are.

1 Like