I would use the join.inner() function. You define your two streams as a “left” and “right” side, and use a predicate function on to define how they should match up. Something like this:
import "join"
data1 = from(bucket: "testBucket")
|> range(start: -3h)
|> filter(fn: (r) => r._measurement == "example")
// Ensure data is aligned to the same time grid
|> aggregateWindow(every: 1m, fn: last, createEmpty: false)
data2 = from(bucket: "testBucket")
|> range(start: -3h)
|> filter(fn: (r) => r._measurement == "example")
|> timeShift(duration: 1h)
|> aggregateWindow(every: 1m, fn: last, createEmpty: false)
join.inner(
left: data1,
right: data2,
on: (l, r) => l._time == r._time,
as: (l, r) => ({
_time: l._time,
// We pull the values from the left and right records
val1: l._value,
val2: r._value,
difference: l._value - r._value
})
)