Fill usePrevious with threshold

To create a graph for event based data it is usefull to create empty entries with the aggregateWindow funtion and backfill them with the fill function.

However, by doing this you lose the ability to “not connect null values” (in grafana) at a certain threshold, because there aren’t any null values left. This makes critical data loss invisible at first glance.

I had the idea to add a column to group by the existence of a value

  |> map(fn: (r) => ({ r with data: if exists r._value then 1 else 0 }))

and then calculate the elapsed time, but now I am stuck.

  |> group(columns: ["data"], mode:"by")  
  |> elapsed(unit: 1s)

I think I would need a column with cumulative elapsed time that resets every time the value changes from existing to non existing.

Does anyone have an idea to how to filter out null values after a threshold time and then fill the remaining ones?

Can you please help me understand this better? It feels in contradiction. If you want null values then don’t fill? Or fill with a conditional map statement at that certain threshold?

Why are you trying to calculate the elapsed time? Perhaps you want to use this?

And make ok crit instead of 1 0?

You are correct in that it is a bit of a contradiction. I will try elaborate some more.

When datapoints are irregularly spaced, it is nice to fill windows that don’t have any data.
This way the tooltip of the crosshair or the excel export has a value for each timestamp(window).

But when you know there must be a data point every minute at least, having to fill multiple consecutive empty windows that cross more than the minute period has to show as a gap of null values in the data.

How I would define it if it were an addition of the fill function would be like this:

|> fill(usePrevious: true, threshold: 2m)

This would bridge gaps for a maximum of 2 minutes. Without it, fill would happily fill the previous value for days on end.

Grafana needs the intermediate null values to identify data gaps, if the timestamps are just not there grafana draws a straight line to the next datapoint.

Ah I see now thanks for clarifying. How about trying:

import "array"
import "experimental"
import "internal/debug"


data = array.from(rows: [
  {_time: time(v: "2024-03-20T00:00:00Z"), _field: "temperature", _value: 10.0},
  {_time: time(v: "2024-03-20T00:01:00Z"), _field: "temperature", _value: 12.0},

  // 1.5-minute gap (should be filled)
  {_time: time(v: "2024-03-20T00:02:30Z"), _field: "temperature", _value: debug.null(type: "float")},
  {_time: time(v: "2024-03-20T00:03:00Z"), _field: "temperature", _value: 15.0},

  // 8-minute gap (should NOT be filled)
  {_time: time(v: "2024-03-20T00:11:00Z"), _field: "temperature", _value: debug.null(type: "float")},
  {_time: time(v: "2024-03-20T00:12:00Z"), _field: "temperature", _value: 20.0},

  // 1-minute gap (should be filled)
  {_time: time(v: "2024-03-20T00:13:30Z"), _field: "temperature", _value: debug.null(type: "float")},
  {_time: time(v: "2024-03-20T00:14:00Z"), _field: "temperature", _value: 22.0},

  // 8-minute gap (should NOT be filled)
  {_time: time(v: "2024-03-20T00:22:00Z"), _field: "temperature", _value: debug.null(type: "float")},
  {_time: time(v: "2024-03-20T00:23:00Z"), _field: "temperature", _value: 25.0}
])

// Apply range function to filter data within the desired time window
filtered_data = data
  |> range(start: time(v: "2024-03-20T00:00:00Z"), stop: time(v: "2024-03-20T00:30:00Z"))

// Compute elapsed time
elapsed_data = filtered_data
  |> elapsed(unit: 1m)  // Calculate elapsed time in minutes

// First, fill null values with previous values (so we have something to check)
filled_data = elapsed_data
  |> fill(usePrevious: true)

// Then, only allow filling if the gap is ≤ 2 minutes
final_data = filled_data
  |> map(fn: (r) => ({
      r with _value: if r.elapsed < 2.0 then r._value else debug.null(type: "float")
  }))

final_data

I thought you might be heading in the right direction, but you didn’t use the aggregateWindow function which is needed to create the null values that Grafana uses to determine a gap.
image
When there are no null values in between, Grafana draws a straight line (as does influxdb explorer).
And when you create the empty windows, the result of the elapsed function is always the same.
We need to do something like:

  |> aggregateWindow(every: 1m, fn: mean, createEmpty: true)
  |> duplicate(column: "_value", as: "_original")
  |> fill(column: "_value", usePrevious: true)
  |> elapsed(unit: 1m)
  |> cumulativeSum(columns: ["elapsed"]) //Make this reset each time the original value is not null 
  //Don't filter the data but replace the values with null.
  |> map(fn: (r) => ({ r with _value: if elapsed > 5 then debug.null(type: "float") else r._value})) 

If there is a function to get the elapsed value of the previous row you can perform your own cumulative sum in a map function that only counts when the original value was null.

I created null values.
You don’t need aggregateWindow to create null values, there are multiple ways to create them. You can add an aggregate window function after data as well if you want.

I don’t think we are on the same page about the source data yet.

The source does not contain any null values, it does however have variable intervals because of the subscription nature of the datasource. As discussed in the replication topic, my data is replicated at (for example) 1 minute intervals if communication is still good. But if communication goes bad or telegraf stops working larger data gaps occur. These are the ones I do not want to fill with the previous value.

All queries start with the usual:

from(bucket: "ProcessLog")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] =~ /PT|PDT/)
  |> filter(fn: (r) => r["_field"] == "Val_ActPV")
  |> keep(columns: ["_time", "_measurement","_field","_value"])
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> yield(name: "mean")

Resulting in this kind of table:

Adding the fill makes the output much more usable because each row has all the values. *Setting createEmpty to false creates the same kind of table only eliminating the completely empty windows.

I need the null values after the allowed fill period to make the gap in the data visible to the user. There is no reliable data after the threshold period so the fill function should not fill windows with stale values. Otherwise end users will make assumptions based on wrong data.

So my question is how do I add a fill function to my example query without filling the windows with stale data. Mind you that the example query with a regular fill works in all situations where there are no errors in the data pipeline.

Hope this makes the issue a bit more clear.

1 Like

Thank you this is helpful. But I’m sorry I’m still confused.
The goal with my query above wasn’t to show the source data but to show a table like the screen shot you just shared, once you have null values. And then the flux i shared above lets you fill the values if the gap is small and not fill them if the gap is large or stale (i.e. there is no reliable data after the threshold period so the fill function should not fill windows with stale values)

Can you please help me by using array.from() to share some dummy input data (maybe at the point of the screenshot you shared) and some example output data?

I’m going to loop in @scott because I’m feeling dense. I’m not sure what I’m missing here.

@JeroenVH I think I understand what you’re trying to do:

  • You have a non-homogenous schema (not all rows have values for every field) and want to use fill with usePrevious to homogenize the schema.
  • You want to be able to detect gaps in reporting and visualize those gaps with null values.

Is this correct? Also, some other questions:

  • What is the schema of your data as it’s stored in InfluxDB? Tags, fields, etc.?
  • Do you query multiple fields at once, or just one field per query?
  • Are you trying to visuals gaps when reporting per field, or in the data overall (no data for ValActPV vs no data for any fields)?

@scott Yes you are correct. Fill should indeed homogenize my schema but not for abnormally long periods. It might cover up a problem with the data collection.

  • What is the schema of your data as it’s stored in InfluxDB? Tags, fields, etc.?

Here an example of the LP that gets stored: (single field at a time using OPCUA_Listener)

06PIT01030,bucket=ProcessLog,engunit=mBar,unit=F06010 Val_ActSP=4 1742774180000000000

Each measurement has multiple fields and fields might have a different tag like engunitbut are mostly the same.

  • Do you query multiple fields at once, or just one field per query?

I do a single query to retrieve the Val_ActPV field of all measurements containing PT|PIT for the unit selected on the dashboard page.

  • Are you trying to visuals gaps when reporting per field, or in the data overall (no data for ValActPV vs no data for any fields)?

The timeseries data of each field should report the gaps.

*The screenshot I included in reply 7 is a pivoted view of multiple fields returned that Grafana provides aligned by time. There is only one field per timeseries.

Guys, it just dawned on me this morning. If the only way to get context of the row above is fill and you want a increasing delta when it is filling, you need to fill a column with the timestamp of the value you are filling! :smiley:

Fully worked out formula:

  • Create null values with aggregateWindow
  • Duplicate the original value to observe the filling later (just for show)
  • Create a new time column to store the original value timestamp
  • Fill both the value and the original time column
  • Replace the filled values with null when de difference between the timestamp and the original time is greater than the threshold.
import "internal/debug"
THRESHOLD = 30m

from(bucket: "ProcessLog")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(<your filter here>)
  |> keep(columns: ["_time", "_measurement","_field","_value"])
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: true)
  |> duplicate(column: "_value", as: "original_value")
  |> map(fn: (r) => ({ r with original_time: if exists r._value then r._time else debug.null() }))  
  |> fill(column: "_value", usePrevious: true)
  |> fill(column: "original_time", usePrevious: true)
  |> map(fn: (r) => ({ r with delta: int(v: r._time) - int(v: r.original_time) }))  
  |> map(fn: (r) => ({ r with timeout: if r.delta > int(v: THRESHOLD) then true else false }))  
  |> map(fn: (r) => ({ r with _value: if r.timeout then debug.null() else r._value }))  
  |> keep(columns: ["_time", "_measurement","_field","_value"])
  |> yield(name: "mean")

If there were any future development on flux, this whole ordeal should be wrapped up in the builtin fill function.

|> fill(usePrevious: true, threshold: 30m)

While I am at it, might as wel make an example function:

import "internal/debug"
fillWithThreshold = (tables=<-, threshold) => tables
  |> map(fn: (r) => ({ r with original_time: if exists r._value then r._time else debug.null() }))  
  |> fill(column: "_value", usePrevious: true)
  |> fill(column: "original_time", usePrevious: true)
  |> map(fn: (r) => ({ r with _value: if int(v: r._time) - int(v: r.original_time) > int(v: threshold) then debug.null() else r._value }))
  |> drop(columns: ["original_time"])

Usage:

  |> fillWithThreshold(threshold: 5m)

Can I store this function on my server so I can reuse in each query?

Many thanks @Anaisdg and @scott to think with me on this topic!