Group rows of a measurement based on the _time and _value of another

I have two measurements: link_duration_s and traffic_sensors. In the link_duration measurement, _time indicates the onset of a link, i.e., when a vehicle begins traveling on a specific road segment, and _value is the amount of time spent on that segment (in secs). Some other tags define the link-id, vehicle-id etc. For traffic_sensors measurements, _field indicates the sensor-id, and _value contains the amount of traffic at a particular _time.

Here is my flux query to get a table of all the _time instances of a link named ‘3523’ :

links = from(bucket: "bus-link")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "link_duration_s")
  |> filter(fn: (r) => r["_field"] == "link_duration_s")
  |> filter(fn: (r) => r["link"] == "3523")

For each row lr of the links table, I want to get all the traffic_sensors data in
range(start: r._time, stop: experimental.addDuration(d: r._value, to: r._time), i.e., the interval between the onset and offset of a link. Subsequently, I want to aggregate the traffic_sensors rows for each such group.

I did try a join(on:["_time"]) but ended up with only a few rows since_time for link_duration_s is not periodic (rather sparse), but for traffic_sensors has a periodicity of 1m.

Essentially, I want to capture the traffic state when a link is traversed by a vehicle.

I work on a query to investigate possible solutions. But no luck unfortunately. I am very new in Flux and made several mistakes so far. After correcting some of them, now I get An internal error has occured. Here is the query:

import "experimental" 

links = from(bucket: "ai4di-bus-link")
  |> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "link_duration_s")
  |> filter(fn: (r) => r["_field"] == "link_duration_s")
  |> filter(fn: (r) => r["direction"] == "0")
  |> filter(fn: (r) => r["line"] == "6")
  |> aggregateWindow(every: 1s, fn: mean, createEmpty: false)

sens = from(bucket: "ai4di-bus-link")
  |> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "sensor_value")
  |> filter(fn: (r) => r["_field"] == "tre084_a100")
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)

select = (onset, duration_s, tables) => 
tables
  |> range(start: onset, stop: experimental.addDuration(d: duration_s, to: onset))
  |> first()


links
  |> map(fn: (r) => ({
  _time: r._time,
  _field: r._field,
  duration_s: r._value,
  table: select(onset:r._time, duration_s:r._value, tables:sens)
  })
  )

My idea was to take each row from links table, use its _time and _value (i.e. the duration of a link) as input to the select function which will use them to filter sens and return the results.

Hello @estraven,
I believe you need to use a getRecord() function. getRecord() function | InfluxDB Cloud Documentation

Right now your select function is returning a table stream, but you want to return a single value so that you can pass that value into your map and have it be the value of your column “table”.

Please note that you can’t pass tables into a map() function. The map function creates new columns only.

Does that help?