Group rows of a measurement based on the _time and _value of another

I have two measurements: link_duration_s and traffic_sensors. In the link_duration measurement, _time indicates the onset of a link, i.e., when a vehicle begins traveling on a specific road segment, and _value is the amount of time spent on that segment (in secs). Some other tags define the link-id, vehicle-id etc. For traffic_sensors measurements, _field indicates the sensor-id, and _value contains the amount of traffic at a particular _time.

Here is my flux query to get a table of all the _time instances of a link named ‘3523’ :

links = from(bucket: "bus-link")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "link_duration_s")
  |> filter(fn: (r) => r["_field"] == "link_duration_s")
  |> filter(fn: (r) => r["link"] == "3523")

For each row lr of the links table, I want to get all the traffic_sensors data in
range(start: r._time, stop: experimental.addDuration(d: r._value, to: r._time), i.e., the interval between the onset and offset of a link. Subsequently, I want to aggregate the traffic_sensors rows for each such group.

I did try a join(on:["_time"]) but ended up with only a few rows since_time for link_duration_s is not periodic (rather sparse), but for traffic_sensors has a periodicity of 1m.

Essentially, I want to capture the traffic state when a link is traversed by a vehicle.

I work on a query to investigate possible solutions. But no luck unfortunately. I am very new in Flux and made several mistakes so far. After correcting some of them, now I get An internal error has occured. Here is the query:

import "experimental" 

links = from(bucket: "ai4di-bus-link")
  |> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "link_duration_s")
  |> filter(fn: (r) => r["_field"] == "link_duration_s")
  |> filter(fn: (r) => r["direction"] == "0")
  |> filter(fn: (r) => r["line"] == "6")
  |> aggregateWindow(every: 1s, fn: mean, createEmpty: false)

sens = from(bucket: "ai4di-bus-link")
  |> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
  |> filter(fn: (r) => r["_measurement"] == "sensor_value")
  |> filter(fn: (r) => r["_field"] == "tre084_a100")
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)

select = (onset, duration_s, tables) => 
tables
  |> range(start: onset, stop: experimental.addDuration(d: duration_s, to: onset))
  |> first()


links
  |> map(fn: (r) => ({
  _time: r._time,
  _field: r._field,
  duration_s: r._value,
  table: select(onset:r._time, duration_s:r._value, tables:sens)
  })
  )

My idea was to take each row from links table, use its _time and _value (i.e. the duration of a link) as input to the select function which will use them to filter sens and return the results.

Hello @estraven,
I believe you need to use a getRecord() function. getRecord() function | Flux 0.x Documentation

Right now your select function is returning a table stream, but you want to return a single value so that you can pass that value into your map and have it be the value of your column “table”.

Please note that you can’t pass tables into a map() function. The map function creates new columns only.

Does that help?

Hello @Anaisdg. Thank you for your advice. It helps a lot to get to understand things better.

I have changed my select function and the final query that calls it as follows:

select = (onset, offset, tables) => 
tables
  |> range(start: onset, stop: offset)
  |> mean(column: "_value")
  |> tableFind(fn: (key) => true)
  |> getRecord(idx: 0)


links
  |> map(fn: (r) => ({
  l_start: r._time,
  l_stop: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)),
  _field: r._field,
  duration_s: r._value,
  table: select(onset: r._time, offset: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)), tables:sens)
  })
  )

After several things I have tried, I figured out that experimental.addDuration was causing errors in adding duration_s (sec) to link start time r._time since there was no way to indicate it in seconds. I know the current way to solve this range definition problem appears super weird but I couldn’t find another way to do it. On its own, the range now works fine. Cause if I comment out the final row table: select(onset: r._time, offset: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)), tables:sens), I get the rest of the output fine. But if I leave it, I get an internal error message.

Even if I use static date values in select, still the same problem:
select(onset: 2021-02-23T00:00:00Z, offset: 2021-02-23T17:57:16Z, tables:sens)
But if I use the above line alone, as a standalone call, it works fine (without tableFind and getRecord though, just the mean()). So it seems that the problem is when I call select inside map() I think.

Hello @estraven,
That’s odd, with addDuration. That function works for me within a map().

import "experimental"
time = experimental.addDuration(
  d: 3600s,
  to: 2021-04-19T11:00:00Z
)
from(bucket: "Website Monitoring Bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "http_response")
  |> filter(fn: (r) => r["_field"] == "content_length")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> map(fn: (r) => ({ r with _addduration: time }))
  |> yield(name: "mean")

That’s how I indicated to add seconds with addDuration. How did you:

figure out that experimental.addDuration was causing errors in adding duration_s (sec) to link start time r._time since there was no way to indicate it in seconds
?

Do you need to use the toTime()

This blog might be helpful as it contains a lot of functions you can use to manipulate timestamps:

Hi @Anaisdg
The problem with experimental.addDuration() is that although it seems straightforward to use it with hard-coded values for d, I get error undefined identifier s if I pass a variable to d:

import "experimental"

d_value = 3600
time = experimental.addDuration(
  d: (d_value)s,
  to: 2021-02-23T00:00:00Z
)

However thanks to the resources you gave me and some more experimentation, I have managed to get what I needed as follows:

select = (onset, offset) => 
sens
  |> range(start: onset, stop: offset)
  |> mean(column: "_value")
  |> set(key:"agg_type", value:"mean")
  |> findColumn(fn: (key) => key._measurement == "sensor_value", column: "_value")


add_time = (onset, duration) => {return time(v: uint(v: onset) + uint(v: (duration * 1000000000.0)))}

links
  |> map(fn: (r) => ({ r with 
  onset: r._time,
  offset: add_time(onset: r._time, duration: r._value),
  mean_sens_value: select(onset: r._time, offset: add_time(onset: r._time, duration: r._value))[0]
  }))

Many thanks for your help!

1 Like

Hello @estraven,
Sure thing! I’m glad that helped/nice solution. Are you running this query in a task by chance?

Not really, just a UI query. I have not experimented with tasks yet. And currently I need to figure out so many other things. I only run it for a single sensor and takes almost a minute. After I watched your video (5 hurdles) I suspect my schema is not correct.

@estraven,
This could be useful to you as well:

1 Like