Using increase() with noisy data

I am collecting data from various sensors which may occasionally report values with small fluctuations, e.g. 17.1°C, 17.2°C, 17.05°C, 17.1°C where the temperature doesn’t really change.

Some of these sensors are counters which may overlap, so I am doing statistics with these sensors using increase() and derivative(nonNegative: true). However, combined with noisy data this leads to huge errors in my reports, because when tiny constant fluctuations occur, increase() always only counts the positive part of these fluctuations. This way, e.g. a water meter will report 50 cubic meters of water used on one day instead of the (correct) 500 liters.

This is an except of the dataset:

,,0,2021-08-13T23:30:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T23:40:10Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T23:50:10Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:00:07Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:10:10Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:20:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:30:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:40:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T00:50:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:00:09Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:10:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:20:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:30:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:40:07Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T01:50:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:00:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:10:07Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:20:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:30:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:40:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T02:50:07Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:00:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:10:09Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:20:07Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:30:10Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:40:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T03:50:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:00:09Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:10:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:20:08Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:30:11Z,756.5649,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:40:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T04:50:10Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:00:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:10:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:20:12Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:30:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:40:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T05:50:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:00:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:10:10Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:20:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:30:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:40:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T06:50:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T07:00:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T07:10:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-14T07:20:11Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0

This is my query:

from(bucket: "Home")
  |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "mqtt.0.wasserzaehler.main.value" and r._field == "value")
  |> aggregateWindow(every: 5m, fn: last, timeSrc: "_start", createEmpty: false)
  |> increase()
  |> aggregateWindow(every: 24h, fn: last, timeSrc: "_start", createEmpty: false)
  |> derivative(unit: 24h, nonNegative: true)
  |> keep(columns: ["_measurement", "_value", "_time"])
  |> set(key: "_measurement", value: "Verbrauch alle 24h")
  |> sort(columns: ["_time"])

I need increase() with a small enough aggregation window to find counter wraps without losing (too much) data. (Using it without aggregation window is far too slow.)
I need derivative with a larger aggregation window to then calculate the change over time.

How can I denoise this data in a way so that I lose neither counter wraps nor real increases? movingAverage() will not help because it destroys the counter wrap detection of increase().

Thank you!

Hello @Jens,
I’m not sure I fully understand your issue. So let me ask some questions, can you first filter your data for false jumps in data before applying increase? Is that the goal? When you say denoise do you mean remove false spikes in temperature? Why are you losing counter wraps?
To denoise false spikes could you take your data first and filter out large jumps or thresholds?

Maybe after applying derivative first?

Could you use cumulativeSum() instead of counter if you want to include negative fluctuations?

Thanks

Hello,
thank you for replying! How would you write a filter which ignores (only) small negative jumps (let’s say, <0.2) in the dataset and simply uses the previous value? This may solve my issue (I’d have to test it).

I’ll do different queries and then a join.

temperature is noisy and can fluctuate so it makes sense use the aggregate function with mean:

|> aggregateWindow(every: 5m, fn: mean, timeSrc: "_start", createEmpty: false)

counters by the other hand makes sense to keep last value

|> aggregateWindow(every: 5m, fn: last, timeSrc: "_start", createEmpty: false)

you can at any point filter by value i.e

 |> filter(fn: (r) => r["_value"] >= 0.3)

Hello,
I dug a little deeper here and I may actually have found a bug in the increase function.

Consider this data:

,,0,2021-08-13T05:50:07Z,756.6925,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:00:07Z,756.6925,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:10:07Z,756.4683,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:20:09Z,756.4782,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:30:07Z,756.48,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:40:08Z,756.4894,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T06:50:08Z,756.4978,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:00:09Z,756.5014,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:10:07Z,756.5042,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:20:07Z,756.5057,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:30:08Z,756.5061,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:40:07Z,756.512,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:50:07Z,756.5163,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T07:58:00Z,756.4646,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:00:10Z,756.529,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:10:07Z,756.5357,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:20:08Z,756.5383,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:30:07Z,756.5429,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:40:12Z,756.5645,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T08:50:07Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0
,,0,2021-08-13T09:00:08Z,756.5659,value,mqtt.0.wasserzaehler.main.value,true,system.adapter.mqtt.0,0

Plotting it looks like this:

When plotting this data with increase() I would expect it to look similar except for the negative deltas at 7:58 and at 9:58 (only in the plot). However, if I try this I actually get absolute values once a negative delta occurs:

This is not what increase() is supposed to be doing, right? Either you get absolute values all the time, or you never get them - but this is mixed up, and it’s the reason for the sudden extreme spikes in my Grafana plots.

I tried experimenting with filtering negative values and then summing up, but this will then count positive deltas twice - and lead to incorrect summaries.

Any other ideas?

Thank you :slight_smile:

Yes your graph looks odd, increase() returns the cumulative of non negative differences is the only difference between those 2 queries the increase() at the end?

I believe that maybe you are right.

I tried everything but I am getting same results, I don’t think this is correct:

in this picture I tough that maybe something with the decimals? so I truncated the values just to get the numbers after the decimal point and the result doesn’t make sense to me:

tried to do the same as increase using excel:

returns a cumulative sum of non-negative differences between rows in a table.

I’ve used increase() before and I swear that it worked like was described, but in this case I am really confused too. :confused:

Yes. This is confusing me.

I´ve got a similar Issue, and I´m quite sure it came with an update at some point
I have one system running on a Server with grafana (InfluxDB 1.8) and a second running local just with InfluxDB UI (2.7).
On the old System, my query works perfectly and as expected, but on the new one it does not.
I want to count rising edges in a column consisting only of ones and twos. The old Version starts at zero and on every change from one to two, increases by one. On changes from two to one, it does nothing.
With influx 2.7, the same query on the same dataset, produces weird stuff. As the first number in my column always is a 1, it start counting at 1 and then increases on every difference, no matter if positive or negative.

It seems for me like this behaviour even is documented, but i don’t understand why one would change it without even a notice in the new documentation.

This piece of docu describes the old behaviour (And is wrong as I can see):

This piece describes the new (weird) behaviour:

@Anaisdg Could you take a look on this please?

For my specific problem, I just found the solution by reducing my column values by 1, so i got zeros and ones left, now i can count my rising edges without counting falling edges again.
Docu issue still exists though

``
I think I you could do a workaround. using a custom function and reduce to do the sum of the previous differences but if the result is negative, keep the last result, if it is positive ad the differences, but that only will give you the single last value and not the entire time series.

1 Like

OK, thanks!
… do you have an example how to do this, and would performance suffer?
I have millions of datapoints …

I need to figure it out myself. But first I need to track down all my queries and dashboard to see if I am still using increase() :man_facepalming: like @Philip_Reutzel I started with influxDB 1.8 and I run important stuff (with queries that I did back then) on my production server.

Right now, for me it could be potential problem and a headache to fix :weary:.

1 Like

Good news, I don’t think I have an issue because and seems my use cases works and according to the documentation its OK.

the documentation about that is still a little bit confusing but I get it now *(my example with the excel spread sheet is wrong btw the formula that I did is not the same as increase explanation). bad news for you @Jens increase() wont work for you use case.

so Increase(). assumes that before the “decrease” from 756.5163 to 756.4646 at 2021-08-13T07:58:00Z the “counter” value has been restarted so the output is the previous increased values plus 756.4646. the assumption is that in the time between those 2 datapoints the counter has reached its maximum value, started counting again back to 756.4646 so output will be near 1500.

however, as I mentioned before there is a way to make a custom function and ignore the “drop” of the value in your data.

@Jens does this works for you?

import "strings"
import "csv"

//table,time,_value,mqtt.0.wasserzaehler.main.value,system.adapter.mqtt.0

data =

"
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,boolean,long
#group,false,false,false,false,false,false,false,false
#default,,,,,,,,
,result,table,_start,_stop,_time,_value,mqtt.0.wasserzaehler.main.value,system.adapter.mqtt.0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T05:50:07Z,756.6925,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:00:07Z,756.6925,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:10:07Z,756.4683,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:20:09Z,756.4782,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:30:07Z,756.48,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:40:08Z,756.4894,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T06:50:08Z,756.4978,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:00:09Z,756.5014,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:10:07Z,756.5042,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:20:07Z,756.5057,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:30:08Z,756.5061,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:40:07Z,756.512,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:50:07Z,756.5163,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T07:58:00Z,756.4646,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:00:10Z,756.529,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:10:07Z,756.5357,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:20:08Z,756.5383,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:30:07Z,756.5429,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:40:12Z,756.5645,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T08:50:07Z,756.5659,TRUE,0
,mean,0,2021-08-13T05:50:07Z,2021-08-13T09:10:08Z,2021-08-13T09:00:08Z,756.5659,TRUE,0
"

csv.from(csv: data)


  |> duplicate(column: "_value", as: "previous")
 // |> increase(columns: ["_value"])  
 |> yield(name: "custom-name1")   
  |> difference(nonNegative: true, columns: ["previous"], keepFirst: true)
  |> filter(fn: (r) => exists r.previous)  

 // |> increase()
 // |> filter(fn: (r) => r._value > 0.0)
  
|> yield(name: "custom-name2")   

image
Since there is a drop in value at the beginning I’m not sure if jus filtering the non-negative differences will work for you

I understand. Then increase() in itself doesn’t work for me because every fluctuation is considered a reset. But your solution also unfortunately doesn’t work because it now (obviously) doesn’t consider counter resets.

I wonder if we could build an increase(v: minResetDelta) function that accepts a number, and if the downward shift is smaller than this number, it’s ignored (previous value used), and if not, then it’s considered a reset? Or better maybe, a increaese(v: minResetValue) function with an absolute value. This would solve my problem since we can assume that counter wraps always cause the counter to restart with a value < 1.

Define custom Flux functions | Flux 0.x Documentation (influxdata.com)
Create custom aggregate functions | InfluxDB OSS 2.7 Documentation (influxdata.com)
you can take a look at this and start tinkering with it to create your own.

1 Like