How to track changes in columns and create new column with binary values?

I have x and y fields in influx. I am trying to write a flux query which defines a threshold value 2 or more and creates new column binary starting with value 0 and then next values depend on the changes in x, y or both. lets say x changes with threshold 2 or more then my second value in binary field(column) becomes 1(opposite to previous value zero). so value of binary column would toggle between 0 and 1 based on changes in x or y or both.below is my query but throwing error - Can I please get a help on this?
threshold = 2.0

from(bucket: “fluxbucket”)
|> range(start: -1000h) // Adjust the time range as needed
|> filter(fn: (r) => r._measurement == “random_coordinates”)
|> pivot(rowKey:[“_time”], columnKey: [“_field”], valueColumn: “_value”)
|> sort(columns: [“_time”])
|> difference(columns: [“x”, “y”], nonNegative: false)
|> map(fn: (r) => ({
_time: r._time,
x: r.x,
y: r.y,
x_change: if abs(r.x) >= threshold then 1 else 0,
y_change: if abs(r.y) >= threshold then 1 else 0
}))
|> map(fn: (r) => ({
_time: r._time,
x: r.x,
y: r.y,
x_change: r.x_change,
y_change: r.y_change,
change_detected: if r.x_change == 1 or r.y_change == 1 then 1 else 0
}))
|> aggregateWindow(every: inf, fn: (tables=<-) => tables
|> reduce(
fn: (r, accumulator) => ({
_time: r._time,
x: r.x,
y: r.y,
x_change: r.x_change,
y_change: r.y_change,
change_detected: r.change_detected,
binary_state: if r.change_detected == 1 then (accumulator.binary_state + 1) % 2 else accumulator.binary_state
}),
identity: {binary_state: 0}
)
)
|> yield(name: “result”)

Hello @SSG,
what error are you getting?

I tried:

import "math"
import "array"

data = array.from(rows: [
    {_time: 2020-01-01T00:00:00Z, y: 3.0, x: 2.0},
    {_time: 2020-01-02T00:00:00Z, y: 3.0, x: 1.0},
    {_time: 2020-01-01T00:00:00Z, y: 6.0, x: 9.0}
])

threshold = 2.0



data
  |> difference(columns: ["x", "y"], nonNegative: false)
  |> map(fn: (r) => ({
      _time: r._time,
      x: r.x,
      y: r.y,
      change_detected: if math.abs(x:r.x) >= threshold or math.abs(x:r.y) >= threshold then 1 else 0
  }))
  |> group() // Group the data if needed
  |> reduce(fn: (r, accumulator) => ({binary_state: if r.change_detected == 1 then 1 - accumulator.binary_state
        else accumulator.binary_state }), identity: {binary_state: 0})
  |> yield(name: "result") // Uncomment this line to yield the result

I’m not sure why you need aggregatewindown with inf if you can jsut group all your data together?