I have two measurements: link_duration_s
and traffic_sensors
. In the link_duration
measurement, _time
indicates the onset of a link, i.e., when a vehicle begins traveling on a specific road segment, and _value
is the amount of time spent on that segment (in secs). Some other tags define the link-id, vehicle-id etc. For traffic_sensors
measurements, _field
indicates the sensor-id, and _value
contains the amount of traffic at a particular _time
.
Here is my flux query to get a table of all the _time
instances of a link named ā3523ā :
links = from(bucket: "bus-link")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "link_duration_s")
|> filter(fn: (r) => r["_field"] == "link_duration_s")
|> filter(fn: (r) => r["link"] == "3523")
For each row lr
of the links
table, I want to get all the traffic_sensors
data in
range(start: r._time, stop: experimental.addDuration(d: r._value, to: r._time)
, i.e., the interval between the onset and offset of a link. Subsequently, I want to aggregate the traffic_sensors
rows for each such group.
I did try a join(on:["_time"])
but ended up with only a few rows since_time
for link_duration_s
is not periodic (rather sparse), but for traffic_sensors
has a periodicity of 1m.
Essentially, I want to capture the traffic state when a link is traversed by a vehicle.
I work on a query to investigate possible solutions. But no luck unfortunately. I am very new in Flux and made several mistakes so far. After correcting some of them, now I get An internal error has occured
. Here is the query:
import "experimental"
links = from(bucket: "ai4di-bus-link")
|> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "link_duration_s")
|> filter(fn: (r) => r["_field"] == "link_duration_s")
|> filter(fn: (r) => r["direction"] == "0")
|> filter(fn: (r) => r["line"] == "6")
|> aggregateWindow(every: 1s, fn: mean, createEmpty: false)
sens = from(bucket: "ai4di-bus-link")
|> range(start: 2021-02-23T00:00:00Z, stop: 2021-02-23T23:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "sensor_value")
|> filter(fn: (r) => r["_field"] == "tre084_a100")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
select = (onset, duration_s, tables) =>
tables
|> range(start: onset, stop: experimental.addDuration(d: duration_s, to: onset))
|> first()
links
|> map(fn: (r) => ({
_time: r._time,
_field: r._field,
duration_s: r._value,
table: select(onset:r._time, duration_s:r._value, tables:sens)
})
)
My idea was to take each row from links
table, use its _time
and _value
(i.e. the duration of a link) as input to the select
function which will use them to filter sens
and return the results.
Hello @estraven,
I believe you need to use a getRecord() function. getRecord() function | Flux 0.x Documentation
Right now your select function is returning a table stream, but you want to return a single value so that you can pass that value into your map and have it be the value of your column ātableā.
Please note that you canāt pass tables into a map() function. The map function creates new columns only.
Does that help?
Hello @Anaisdg. Thank you for your advice. It helps a lot to get to understand things better.
I have changed my select function and the final query that calls it as follows:
select = (onset, offset, tables) =>
tables
|> range(start: onset, stop: offset)
|> mean(column: "_value")
|> tableFind(fn: (key) => true)
|> getRecord(idx: 0)
links
|> map(fn: (r) => ({
l_start: r._time,
l_stop: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)),
_field: r._field,
duration_s: r._value,
table: select(onset: r._time, offset: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)), tables:sens)
})
)
After several things I have tried, I figured out that experimental.addDuration
was causing errors in adding duration_s (sec) to link start time r._time since there was no way to indicate it in seconds. I know the current way to solve this range definition problem appears super weird but I couldnāt find another way to do it. On its own, the range now works fine. Cause if I comment out the final row table: select(onset: r._time, offset: time(v: uint(v: r._time) + uint(v: math.round(x: r._value) * 1000000000.0)), tables:sens)
, I get the rest of the output fine. But if I leave it, I get an internal error message.
Even if I use static date values in select, still the same problem:
select(onset: 2021-02-23T00:00:00Z, offset: 2021-02-23T17:57:16Z, tables:sens)
But if I use the above line alone, as a standalone call, it works fine (without tableFind and getRecord though, just the mean()). So it seems that the problem is when I call select inside map() I think.
Hello @estraven,
Thatās odd, with addDuration. That function works for me within a map().
import "experimental"
time = experimental.addDuration(
d: 3600s,
to: 2021-04-19T11:00:00Z
)
from(bucket: "Website Monitoring Bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "http_response")
|> filter(fn: (r) => r["_field"] == "content_length")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> map(fn: (r) => ({ r with _addduration: time }))
|> yield(name: "mean")
Thatās how I indicated to add seconds with addDuration. How did you:
figure out that experimental.addDuration was causing errors in adding duration_s (sec) to link start time r._time since there was no way to indicate it in seconds
?
Do you need to use the toTime()
This blog might be helpful as it contains a lot of functions you can use to manipulate timestamps:
Hi @Anaisdg
The problem with experimental.addDuration()
is that although it seems straightforward to use it with hard-coded values for d
, I get error undefined identifier s if I pass a variable to d:
import "experimental"
d_value = 3600
time = experimental.addDuration(
d: (d_value)s,
to: 2021-02-23T00:00:00Z
)
However thanks to the resources you gave me and some more experimentation, I have managed to get what I needed as follows:
select = (onset, offset) =>
sens
|> range(start: onset, stop: offset)
|> mean(column: "_value")
|> set(key:"agg_type", value:"mean")
|> findColumn(fn: (key) => key._measurement == "sensor_value", column: "_value")
add_time = (onset, duration) => {return time(v: uint(v: onset) + uint(v: (duration * 1000000000.0)))}
links
|> map(fn: (r) => ({ r with
onset: r._time,
offset: add_time(onset: r._time, duration: r._value),
mean_sens_value: select(onset: r._time, offset: add_time(onset: r._time, duration: r._value))[0]
}))
Many thanks for your help!
1 Like
Hello @estraven,
Sure thing! Iām glad that helped/nice solution. Are you running this query in a task by chance?
Not really, just a UI query. I have not experimented with tasks yet. And currently I need to figure out so many other things. I only run it for a single sensor and takes almost a minute. After I watched your video (5 hurdles) I suspect my schema is not correct.
@estraven,
This could be useful to you as well:
1 Like