Kevin_T
December 29, 2023, 12:22pm
1
Hello,
my goal is to aggregate my data on a daily basis and compare every entry to the entry a week ago.
I tried this with a self join:
import "join"
// basis table
original = from(bucket: "iobroker")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "0_userdata.0.knx_logik.verbraucher._HK_Ver_WaschmaschineLäuftZähler") // filter sensor
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 1d, fn: sum, createEmpty: true) // aggregare daily (sum), insert empty days
|> fill(value:0.0) // substitute null by 0
|> cumulativeSum(columns:["_value"])
|> drop(columns: ["_start", "_stop"])
|> yield(name: "sum")
join.tables(
method: "inner",
left: original,
right: original,
on: (l,r) => l._time == time(v: uint(v: r._time) + uint(v: 7*24*60*60*1000)),
as: (l,r) => ({_time:l._time, _measurement:l._measurement, _field:l._field, cumulated:l._value, cumulated_7d_ago:r._value})
)
|> map(fn: (r) => ({ r with difference: r.cumulated - r.cumulated_7d_ago }))`
Unfortunately I encountered an error:
error in building plan while starting program: invalid physical query plan: found logical operation "join.tables13"
I do not understand my mistake…
scott
January 4, 2024, 11:53pm
2
@Kevin_T Joins can be very… tricky. I’ve found that you get better performance out of unioning two streams together and then pivoting fields into columns. In your case, you can create two streams: current data and last week’s data. You can then use experimental.alignTime()
to align last weeks timestamps to the current query range and then pivot on time.
import "date"
import "experimental"
lastWeekStart = date.sub(d: 1w, from: v.timeRangeStart)
lastWeekStop = date.sub(d: 1w, from: v.timeRangeStart)
dataset = (start, stop) => from(bucket: "iobroker")
|> range(start: start, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "0_userdata.0.knx_logik.verbraucher._HK_Ver_WaschmaschineLäuftZähler")\
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 1d, fn: sum, createEmpty: true)
|> fill(value:0.0)
|> cumulativeSum()
original = dataset(start: v.timeRangeStart, stop: v.timeRangeStop)
lastWeek = dataset(start: lastWeekStart, stop: lastWeekStop)
|> experimental.alignTime(alignTo: v.timeRangeStart)
|> set(key: "_field", value: "lastWeek_value")
union(tables: [original, lastWeek])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({ r with difference: r.value - r.lastWeek_value }))
|> drop(columns: ["_start", "_stop"])
1 Like
Thanks for the response.
I think I get the geral idea. Unfortunately it does not work and my understanding is to low to trouble shoot.
After deleting the additional backslash it gives me:
error in building plan while starting program: cannot query an empty range
To get at least any response I added a yield command and started the following code:
import "experimental"
lastWeekStart = date.sub(d: 1w, from: v.timeRangeStart)
lastWeekStop = date.sub(d: 1w, from: v.timeRangeStart)
dataset = (start, stop) => from(bucket: "iobroker")
|> range(start: start, stop: stop)
|> filter(fn: (r) => r["_measurement"] == "0_userdata.0.knx_logik.verbraucher._HK_Ver_WaschmaschineLäuftZähler")
|> filter(fn: (r) => r["_field"] == "value")
|> aggregateWindow(every: 1d, fn: sum, createEmpty: true)
|> fill(value:0.0)
|> cumulativeSum()
original = dataset(start: v.timeRangeStart, stop: v.timeRangeStop)
|> yield(name: "sum")
lastWeek = dataset(start: lastWeekStart, stop: lastWeekStop)
|> experimental.alignTime(alignTo: v.timeRangeStart)
|> set(key: "_field", value: "lastWeek_value")
//union(tables: [original, lastWeek])
// |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
//|> map(fn: (r) => ({ r with difference: r.value - r.lastWeek_value }))
// |> drop(columns: ["_start", "_stop"])
The result was: