Continuous Query that rejects Outliers with count

I currently have a continuous query that runs and aggregates my raw data (coming in at about six samples per second) using a 1 minute mean aggregation. What I’d like to do though is aggregate not just a simple mean of all the measurements within the window but something like:

  1. Calculate the 3 sigma of all the measurements
  2. Reject outliers outside of the three sigma
  3. Count the number of outliers removed
  4. Recalculate the mean and stddev using only the remaining (un-rejected) measurements

Is this possible with a continuous Flux query and can you point me to resources that cover this kind of more complex aggregation?

Hi @Dave_Sprague

That’s a brain teaser, and my Flux knowledge is still a work-in-progress, but borrowing heavily from this well-explained blog post, I think this we can restate your objectives 1 & 2 as “filter out any entries where the Z-score is > 3.0”, since the Z-score tells you how many standard deviations from the mean the entry is.

I am sure your objective #4 is possible, but I need to mull it over. Meanwhile, I think this works for #1, 2, and 3 (I tested it on some sample temperature data that I had access to).

sdev=from(bucket: "HyperEncabulator")
  |> range(start: -1m)
  |> filter(fn: (r) => r["_measurement"] == "TemperatureData")
  |> filter(fn: (r) => r["_field"] == "Temperature")
  |> stddev()
  |> findColumn(
	   fn: (key) => key._measurement == "TemperatureData", column: "_value"
       )

avg=from(bucket: "HyperEncabulator")
  |> range(start: -1m)
  |> filter(fn: (r) => r["_measurement"] == "TemperatureData")
  |> filter(fn: (r) => r["_field"] == "Temperature")
  |> mean()
  |> findColumn(
	   fn: (key) => key._measurement == "TemperatureData", column: "_value"
        )

from(bucket: "HyperEncabulator")
  |> range(start: -1m)
  |> filter(fn: (r) => r["_measurement"] == "TemperatureData")
  |> filter(fn: (r) => r["_field"] == "Temperature")
  |> map(fn: (r) => ({ r with StandardDev: sdev[0] }))
  |> map(fn: (r) => ({ r with Average: avg[0] }))
  |> map(fn: (r) => ({ r with ZScore: (r._value-avg[0])/sdev[0] }))
  |> filter(fn: (r) => r["ZScore"] > 3.0)
  |> count()

Hi Grant, thank you very much for you response and the blog link. This is extremely helpful. So perhaps for the fourth objective, I would need to create a new column that holds the Z-score for each measurement and then do another “pass” where I recompute avg and stddev using the Z-score column as a filter? I’ll work on it some and let you know what I how it goes.

Hi @Dave_Sprague

Maybe get rid of the count() function (unless you really want to know how many values had a ZScore > 3.0), then replace the last filter function with all of this. We are basically filtering out all the “good” (ZScore < 3.0) measurements and writing to a new table:

  |> filter(fn: (r) => r["ZScore"] < 3.0)

  |> map(
        fn: (r) => ({
            _value: r._value,
            _time: r._time,
            _measurement: "unrejected_dataset",
        }),
    )
    |> to(
  bucket:"HyperEncabulator",
  fieldFn: (r) => ({"_value": r._value})

)

  |> mean()
  |> yield(name: "Mean_of_unrejected_dataset")

which gives me this:

Note the mean above is 53.375

By contrast, the original dataset seemed to have a mean of 54.125, but you might want to double check on your own data.

Thanks, I’ll give this a try.

Dave