Hello,
i am trying to join 2 tables but i am getting this error message:
error in building plan while starting program: invalid physical query plan: attribute "collation", required by join.tables18", is missing from predecessor "ReadWindowAggregateByTime25"
My query is relative simple, i have an average and a max and just want to join them by time:
import "join"
avg = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:mean, createEmpty: false)
max = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:max, createEmpty: false)
join.full(
left: max,
right: avg,
on: (l, r) => l._time == r._time,
as: (l, r) => {
time = if exists l._time then l._time else r._time
start = if exists l._start then l._start else r._start
stop = if exists l._stop then l._stop else r._stop
field = if exists l._field then l._field else r._field
measurement = if exists l._measurement then l._measurement else r._measurement
ain = if exists l.ain then l.ain else r.ain
deviceName = if exists l.device_name then l.device_name else r.device_name
return {_time: time, _start: start, _stop: stop, _field: field, _measurement: measurement, ain: ain, device_name: deviceName, max: l._value, avg: r._value}
},
)
If i use fn:last
instead of fn:mean
, the query works fine but not with mean
?
Is this a bug or am i doing something wrong?
I am looking at your query and at first glance I did not notice anything wrong, however. why are you using a full join? it seems that the data comes from the same source right?
so it is safe to assume that if there is a value max and avg exists, so just use the simpler version of join:
//import "join"
avg = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:mean, createEmpty: false)
max = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:max, createEmpty: false)
joinedData =
join(tables: {key1: avg, key2: max}, on: ["_time", "_field"], method: "inner")
|> yield(name: "custom-name")
or join by time:
import "join"
left = from(bucket: "example-bucket-1") |> //...
right = from(bucket: "example-bucket-2") |> //...
join.time(
left: left,
right: right,
as: (l, r) => ({l with field1: l._value, field2: r._value_}),
)
!!! I think I know what is going on!!!
mean and last aggregate functions create a “group column” in the metadata, join() package requires both streams have the same grouping:
adding |> group()
at the end of each table to remove all grouping (if you dont have more values with the same tag), or create a new group by tag for example:
|> group(columns: [ "device_name"], mode:"by")
I bet you this will work if you paste it as it is.
import "join"
avg = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:mean, createEmpty: false)
|> group(columns: ["device_name"], mode:"by")
max = from(bucket: "x")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "abc")
|> filter(fn: (r) => r["_field"] == "power")
|> filter(fn: (r) => r["device_name"] == "xyz")
|> aggregateWindow(every: 1h, fn:max, createEmpty: false)
|> group(columns: ["device_name"], mode:"by")
joinedData=
join.time(
left: avg ,
right: max ,
as: (l, r) => ({l with field1: l._value, field2: r._value}),
)
|> yield(name: "joinedData")
1 Like
It worked immediately! Thank you very much!
I used the full join because I copied and pasted the query from another one and simply forgot to change it / consider a better approach. Thank you for pointing that out! 