Use data from one stream in another stream

Hi,

I like to get a device id from one database in one stream and use this for the id filter in another stream. This is the code that I use but it doesn´t seem to work and gives an error: undefined identifier inputID. I’m still new to using influxdb and was wondering if what I’m trying to do is possible and how I could accomplish this.

stream1 = from(bucket: “Bucket1”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® => r["_measurement"] == “measurement1”)
|> filter(fn: ® => r["_field"] == “name”)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
|> filter(fn: ® => r.name== v.name)
|> keep(columns: [“id”])
|> rename(columns: {id: “inputID”})
|> yield(name: “inputID”)

stream2 = from(bucket: “Bucket2”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® => r["_measurement"] == “measurement2”)
|> filter(fn: ® => r["_field"] == “name”)
|> filter(fn: ® => r[“id”] == r.inputID)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
|> yield(name: “output”)

@Michiel, how do you associate inputIDs in stream1 to data points in stream2? What logic associates the two?

My dashboard uses the variable name for most cells but this cell needs an ID that is linked to that name. I try to get the integer of the ID that is coupled to the name in stream1 and use that integer for the ID in stream2. Is it possible to create or fill a local variable in the code?

Ok, so you can query the ID using the name of the record and return it as a scalar value. The example below creates a custom function for returning the inputID. It then uses that function with the name variable as input to store the inputID as a variable. That variable is used to filter the 2nd dataset:

getInputID = (name) => {
  _idRecord = from(bucket: “Bucket1”)
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r._measurement == “measurement1”)
    |> filter(fn: (r) => r._field == “name”)
    |> filter(fn: (r) => r._value == name)
    |> findRecord(idx: 0, fn: (key) => true)
  return _idRecord.id
}

inputID = getInputID(name: v.name)

from(bucket: “Bucket2”)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == “measurement2”)
  |> filter(fn: (r) => r._field == “name”)
  |> filter(fn: (r) => r.id == inputID)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: “_value”)
  |> yield(name: “output”)

Thank you! This works like a charm. It is exactly what I needed.