Use data from one query to aggregate data from a second query

Hi,

I have two Timeseries looking like this:

TimeSeriesA
_time | _timestamp
t1 |  ts1
t2 |  ts1
t3 |  ts1
t4 |  ts2
t5 |  ts2
t6 |  ts2
...
TimeSeriesB
_time | _temperature
t1  |  temp1
t2  |  temp2
t3  |  temp3
t4  |  temp4
t5  |  temp5
t6  |  temp6
...

What I’m trying to reach is following:

TimeSeriesTarget
_timestamp | _temperature(avg)
ts1 | avg(temp1, temp2, temp3)
ts2 | avg(temp4, temp5, temp6)

Note:
_time: is identical in TimeSeriesA and TimeSeriesB
_timestamp: has a sequence of m-times identical value (above shown 3-times)

What it shall do is to:
a) select from TimeSeriesA a “set” of _timestamp’s
b) walk through “set” of _timestamp’s and select from TimeSeriesA all _time values meeting _timestamp
c) use _time values from b) to select _temperature values from TimeSeriesB and calculate avg
d) combine _timestamp’s (out of a) and _temperature values (out of c) into a new TimeSeries

I would highly appreciate any help.

Bernhard


Update 2021-04-07T22:00:00Z
Having meanwhile Chronograf installed I’m trying to figure out how to join the two tables based on _time. The current output shows No Result despite the fact having check the _time column in both tables being equal.

fcBucket = “openhab/autogen”
fcTimestamp = “Localweatherandforecast_ForecastHours03_Timestamp”
fcTemperature = “Localweatherandforecast_ForecastHours03_Temperature”

temp1 = from(bucket: fcBucket)
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == fcTimestamp and (r._field == “value”))
|> window(every: autoInterval)
//|> yield(name: “temp1”)

temp2 = from(bucket: fcBucket)
|> range(start: dashboardTime)
|> filter(fn: (r) => r._measurement == fcTemperature and (r._field == “value”))
|> window(every: autoInterval)
//|> yield(name: “temp2”)

tempjoin = join(tables: {key1:temp1, key2:temp2}, on: [“_time”])
|> yield(name: “tempjoin”)


2021-04-08T15:30:00Z
Playing with parameters helped to solve the first issue with No Result.

fcBucket = “openhab/autogen”
fcTimestamp = “Localweatherandforecast_ForecastHours03_Timestamp”
fcTemperature = “Localweatherandforecast_ForecastHours03_Temperature”

fcTemp = from(bucket: fcBucket)
|> range(start: -12h, stop: now())
|> filter(fn: (r) => r._measurement == fcTemperature )
|> drop(columns: [“item”, “_measurement”])

fcTime = from(bucket: fcBucket)
|> range(start: -12h, stop: now())
|> filter(fn: (r) => r._measurement == fcTimestamp )
|> drop(columns: [“item”, “_measurement”])

join(tables: {fcTemp:fcTemp, fcTime:fcTime}, on: [“_start”, “_stop”], method: “inner”)

root cause: _time values were different at milliseconds value (at least by 1ms)


2021-04-08T17:55:00Z
Here is now the latest script version.

fcBucket = “openhab/autogen”
fcTimestamp = “Localweatherandforecast_ForecastHours03_Timestamp”
fcData = “Localweatherandforecast_ForecastHours03_Temperature”

fcValue = from(bucket: fcBucket)
|> range(start: dashboardTime, stop: now())
|> filter(fn: (r) => r._measurement == fcData )
|> drop(columns: [“item”, “_measurement”])
|> truncateTimeColumn(unit: 1m)

fcTime = from(bucket: fcBucket)
|> range(start: dashboardTime, stop: now())
|> filter(fn: (r) => r._measurement == fcTimestamp )
|> drop(columns: [“item”, “_measurement”])
|> truncateTimeColumn(unit: 1m)

join(tables: {fcValue:fcValue, fcTime:fcTime}, on: [“_time”], method: “inner”)
|> drop(columns: [“_start_fcValue”, “_start_fcTime”, “_stop_fcValue”, “_stop_fcTime”])
|> group(columns: [“_value_fcTime”], mode:“by”)
|> duplicate(column: “_value_fcTime”, as: “_time_fcTime”)
|> map(fn: (r) => ({ r with _time_fcTime: time(v: r._time_fcTime * 1000000) }))
//|> duplicate(column: “_time_fcTime”, as: “_stop”)
//|> mean(column: “_value_fcValue”)
//|> duplicate(column: “_stop”, as: “_time”)

This is what I get.

Now creating the mean value from column _value_fcValue fails with message: panic: runtime error: invalid memory address or nil pointer dereference
Any suggestions?

This is now my solution after having read through tutorial TL;DR InfluxDB Tech Tips – How to Extract Values, Visualize Scalars, and Perform Custom Aggregations with Flux and InfluxDB | InfluxData from InfluxData

fcBucket = “openhab/autogen”
fcTimestamp = “Localweatherandforecast_ForecastHours03_Timestamp”
fcData = “Localweatherandforecast_ForecastHours03_Temperature”

fcValue = from(bucket: fcBucket)
|> range(start: dashboardTime, stop: now())
|> filter(fn: (r) => r._measurement == fcData )
|> drop(columns: [“item”, “_measurement”])
|> truncateTimeColumn(unit: 1m)

fcTime = from(bucket: fcBucket)
|> range(start: dashboardTime, stop: now())
|> filter(fn: (r) => r._measurement == fcTimestamp )
|> drop(columns: [“item”, “_measurement”])
|> truncateTimeColumn(unit: 1m)

fcTable = join(tables: {fcValue:fcValue, fcTime:fcTime}, on: [“_time”], method: “inner”)
|> drop(columns: [“_start_fcValue”, “_start_fcTime”, “_stop_fcValue”, “_stop_fcTime”])
|> group(columns: [“_value_fcTime”], mode:“by”)
|> duplicate(column: “_value_fcTime”, as: “_time_fcTime”)
|> map(fn: (r) => ({ r with _time_fcTime: time(v: r._time_fcTime * 1000000) }))

fcSum = fcTable
|> reduce(fn: (r, accumulator) => ({ _sum: r._value_fcValue + accumulator._sum }), identity: {_sum: 0.0})

fcCnt = fcTable
|> reduce(fn: (r, accumulator) => ({ _cnt: 1.0 + accumulator._cnt }), identity: {_cnt: 0.0})

join(tables: {fcSum:fcSum, fcCnt:fcCnt}, on: [“_value_fcTime”])
|> map(fn: (r) => ({ r with _avg: r._sum / r._cnt }))
|> map(fn: (r) => ({ r with _time_fcTime: time(v: r._value_fcTime * 1000000) }))
|> rename(columns: {_time_fcTime: “_time”})
|> drop(columns: [“_sum”, “_cnt”])

this is the result: