Invalid physical query plan: attribute "collation", required by "join.tables18", is missing

Hello,

i am trying to join 2 tables but i am getting this error message:

error in building plan while starting program: invalid physical query plan: attribute "collation", required by join.tables18", is missing from predecessor "ReadWindowAggregateByTime25"

My query is relative simple, i have an average and a max and just want to join them by time:

import "join"

avg = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:mean, createEmpty: false)

max = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:max, createEmpty: false)

join.full(
    left: max,
    right: avg,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        start = if exists l._start then l._start else r._start
        stop = if exists l._stop then l._stop else r._stop
        field = if exists l._field then l._field else r._field
        measurement = if exists l._measurement then l._measurement else r._measurement
        ain = if exists l.ain then l.ain else r.ain
        deviceName = if exists l.device_name then l.device_name else r.device_name

        return {_time: time, _start: start, _stop: stop, _field: field, _measurement: measurement, ain: ain, device_name: deviceName, max: l._value, avg: r._value}
    },
)

If i use fn:last instead of fn:mean, the query works fine but not with mean?

Is this a bug or am i doing something wrong?

I am looking at your query and at first glance I did not notice anything wrong, however. why are you using a full join? it seems that the data comes from the same source right?

so it is safe to assume that if there is a value max and avg exists, so just use the simpler version of join:

//import "join"

avg = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:mean, createEmpty: false)

max = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:max, createEmpty: false)

joinedData =
  join(tables: {key1: avg, key2: max}, on: ["_time", "_field"], method: "inner")
|> yield(name: "custom-name")

or join by time:

import "join"

left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...

join.time(
    left: left,
    right: right,
    as: (l, r) => ({l with field1: l._value, field2: r._value_}),
)

!!! I think I know what is going on!!!

mean and last aggregate functions create a “group column” in the metadata, join() package requires both streams have the same grouping:

adding |> group() at the end of each table to remove all grouping (if you dont have more values with the same tag), or create a new group by tag for example:

  |> group(columns: [ "device_name"], mode:"by")
 

I bet you this will work if you paste it as it is.

import "join"

avg = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:mean, createEmpty: false)
  |> group(columns: ["device_name"], mode:"by")

max = from(bucket: "x")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "abc")
  |> filter(fn: (r) => r["_field"] == "power")
  |> filter(fn: (r) => r["device_name"] == "xyz")
  |> aggregateWindow(every: 1h, fn:max, createEmpty: false)
  |> group(columns: ["device_name"], mode:"by")

joinedData=
join.time(
    left: avg ,
    right: max ,
    as: (l, r) => ({l with field1: l._value, field2: r._value}),
)

 |> yield(name: "joinedData")
1 Like

It worked immediately! Thank you very much!

I used the full join because I copied and pasted the query from another one and simply forgot to change it / consider a better approach. Thank you for pointing that out! :blush: