Error with calculating median: expected stream[A] but found (<-: stream[B]) => stream[{C with abs: D}] (function)

I am trying to compute the modified z-score for anomaly detection purposes. When I try and take the median to compute the MAD (as shown in the image), I get the error

expected stream[A] but found (<-: stream[B]) => stream[{C with abs: D}] (function)

image

I am unsure of how to solve this error since median is working perfectly fine in median. It is failing on line 19, where we take the median of abs.

import "math"

data = from(bucket: "timeseries")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["device"] == "XXXXXXXX")
    |> keep(columns: ["_time", "_value", "_field"])

median = data
    |> median()
    |> map(fn: (r) => ({ r with median: r._value }))
    |> keep(columns: ["median", "_field"])

abs = join(tables: {data: data, median: median}, on: ["_field"])
    |> map(fn: (r) => ({ r with abs: math.abs(x: r._value - r.median) }))
    |> keep(columns: ["_field", "abs"])

mad = abs
    |> median() // CODE FAILS HERE: expected stream[A] but found (<-: stream[B]) => stream[{C with abs: D}] (function)
    |> map(fn: (r) => ({ r with mad: r.abs }))
    |> keep(columns: ["mad", "_field"])

modifiedZScore = join(tables: {data: data, mad: mad}, on: ["_field"])
    |> map(fn: (r) => ({ r with modified_z_score: 0.6745 * (r._value - r.median) / r.mad }))
    |> keep(columns: ["_time", "_value", "_field", "modified_z_score"])
    |> yield()

Any help is greatly appreciated.

@NathanMLu I’m not 100% sure if it’s a stream typing issue. I think you may just need to tell median to operate on the abs column. By default, it operates on the _value column, which doesn’t exist after your join.

import "math"

data = from(bucket: "timeseries")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["device"] == "XXXXXXXX")
    |> keep(columns: ["_time", "_value", "_field"])

median = data
    |> median()
    |> map(fn: (r) => ({ r with median: r._value }))
    |> keep(columns: ["median", "_field"])

abs = join(tables: {data: data, median: median}, on: ["_field"])
    |> map(fn: (r) => ({ r with abs: math.abs(x: r._value - r.median) }))
    |> keep(columns: ["_field", "abs"])

mad = abs
    |> median(column: "abs") // CODE FAILS HERE: expected stream[A] but found (<-: stream[B]) => stream[{C with abs: D}] (function)
    |> map(fn: (r) => ({ r with mad: r.abs }))
    |> keep(columns: ["mad", "_field"])

As an alternative, you may consider using the anomalydetection.mad() function.