Query Data between Two Tags

Hello everyone,

I’m using InfluxDB 2.0.7 and I would like to query data between two tags, but I don’t know how to write a flux query. Attached is an image of the case. I have a tag "event" = "start", and a tag "event" = "end", I would like to query the data between the two tags. Assume the bucket is "myBucket", measurement is "myMeasurement", field to query is "myField", could anyone help me?

Thanks a lot!

Hello @mfrice,
Welcome! You bet:

start = from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "myMeasurement")
  |> filter(fn: (r) => r["_field"] == "myField")
  |> filter(fn: (r) => r["event"] == "start")
  |> findRecord(fn: (key) => true, idx: 0)

stop = from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "myMeasurement")
  |> filter(fn: (r) => r["_field"] == "myField")
  |> filter(fn: (r) => r["event"] == "stop")
  |> findRecord(fn: (key) => true, idx: 0)

from(bucket: "myBucket")
  |> range(start: start._time, stop: stop._time)
  |> filter(fn: (r) => r["_measurement"] == "myMeasurement")
  |> filter(fn: (r) => r["_field"] == "myField")
1 Like

Hello @Anaisdg ,

Thank you very much for the prompt reply. This is really helpful! I have one related question. If I have multiple start tags, how can I query 10 seconds data after each start tag? See the picture below. There are several (not a fixed number) events happened in a period of time with a start tag. Each start tag indicates the start of a 10 seconds event. The interval between two tags are longer than 10 seconds. I would like to query all of them (the green part).


Thanks a lot!

Hello @mfrice,
I don’t think this is possible without outer joins. I’m assuming you want to query for all green sections simultaneously. This is what I would do if they were:

import "experimental"

start_stop = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "average_temperature")
  |> filter(fn: (r) => r["_field"] == "degrees")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> limit(n: 5)
  |> map(fn: (r) => ({ r with my_start: r._time }))
  |> map(fn: (r) => ({ r with my_stop: experimental.addDuration(d: 10s, to: r._time) }))
  |> keep(columns: ["my_stop","my_start", "_time"])
//   |> yield(name: "start_stop") 

all = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "average_temperature")
  |> filter(fn: (r) => r["_field"] == "degrees")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
//   |> yield(name: "all") 

// if we had outer joins: 
// join(tables: {all: all, start_stop: start_stop}, on: ["_time"], method: "outer")
  |> filter(fn: (r) => r["_time"] >= "my_start" and r["_time"] <= "my_stop")

Let me forward your question to the Flux team to see if anyone can think of anything.
Additionally, @scott do you have any suggestions here?

Hello @Anaisdg , are there any updates from the Flux team? On the other hand, can we solve the problem by creating a noncontinuous range (a range with several parts) for the green sections, then filter data based on the range? The performance should be better by filtering by range than comparing each timestamp with “my_start” and “my_stop” (suppose we have outer join).

Hello @mfrice,
I submitted an issue with your question. The Flux team will get back to you asap.

Yes absolutely you can create a noncontinuous range, but you’ll have to:

  1. query the entire range and store in a variable
  2. reference that variable and limit query output to the first green bar time range with the range function
  3. repeat for every additional time range.
    It would be a manual process.

Hi @Anaisdg , thanks for your explanation! I’ll wait for a response of the Flux team to see if they have a better idea.

1 Like

Hi @Anaisdg ,

I have another question very similar to the original one. If I have only one start tag, and I want to create a 10 second range from the tag, how do I do it? I tried range(start: start._time, stop: start._time + 10s) which doesn’t work. It gave me an expected time but found duration error. Do you know how I can write a correct query? On the other hand, is it possible to create a range looking like range(start: start._time - 5s, stop: start._time + 5s) in which the tag is in the middle? Thanks!

Hello @mfrice,
I’m sorry no one has responded. I’ll go poke them again.
You’ll want to use the addDuration() function
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/experimental/addduration/

1 Like

I don’t believe there’s a way to do this at the moment but I’ll try to think through some ideas. It seems that the runtime will need to get a bit smarter and we’ll have to add some functions to operate on arrays directly in order to get this functionality.

The general process I would use for this is the following:

  1. Write a query that selects all the points where event == "start".
  2. Retrieve the time column from this.
  3. Retrieve the data for a contiguous time period.
  4. Run a filter function that looks roughly like: |> filter(fn: (r) => any(items: start_times, fn: (v) => v <= r._time and r._time < v + 10m).

There’s no such thing as v + 10m but I forgot how to add durations so that would get replaced with the appropriate function.

The problem with this is that we would need that any() function which would take an array and a predicate and return a boolean. While we can probably just implement that and this would be fine, I don’t want to add a one-off function like that without having a more thorough plan for how it works.

2 Likes

Thank you @jonathan!

1 Like

Hi @jonathan,

Thank you for your explanation!

The use case I mentioned is actually pretty common, so I guess it might be a great feature to add. When dealing with high frequency data, adding tags for every data point would be an overhead which can’t be ignored. So it’s feasible to have only one tag at the start of each event, then query all of them. Although I can query each segment one by one, I couldn’t utilize the influxDB GUI the same convenient way as before.

I agree with your idea to add an any() function. I assume Flux is lazy evaluation (I don’t know for sure), so it’s great to start with a range of continuous data containing all events, filter out data of all events with any() function in one go. If Flux is not lazy evaluation, maybe just support discrete range?

Thank you again! It would be really helpful if this feature is implemented!

import "array"
import "join"
import "experimental"

// our event data or StartData
startData = 
    array.from( 
        rows: [{_time: 2022-01-01T00:00:00Z, _value: 2, event: "start"},
               {_time: 2022-01-01T00:00:15Z, _value: 2, event: "start"}
        ]
)
// we add a stop column with a timestamp 10 seconds after each event timestamp
  |> map(fn: (r) => ({ r with myStart: r._time }))
  |> map(fn: (r) => ({ r with myStop: experimental.addDuration(d: 10s, to: r._time) }))

// the data we want to filter for based on the startData start and stop times. In this example we expect to filter out all the data where the timestamps are outside of 0-10s and 15-25s, or where the value is equal to 0. 
data =
    array.from(
        rows: [
              {_time: 2022-01-01T00:00:03Z, _value: 1},
              {_time: 2022-01-01T00:00:05Z, _value: 1},
              {_time: 2022-01-01T00:00:08Z, _value: 1},
              {_time: 2022-01-01T00:00:12Z, _value: 0},
              {_time: 2022-01-01T00:00:17Z, _value: 1},
              {_time: 2022-01-01T00:00:18Z, _value: 1},
              {_time: 2022-01-01T00:00:20Z, _value: 1},
              {_time: 2022-01-01T00:00:23Z, _value: 1},
              {_time: 2022-01-01T00:00:27Z, _value: 0},
              {_time: 2022-01-01T00:00:30Z, _value: 0}
        ],
    )

// perform a full join
join.full(
    left: startData,
    right: data,
    on: (l, r) => l._time == r._time,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        value = if exists l._value then l._value else r._value

        return {_time: time, value: value, myStart: l.myStart, myStop: l.myStop}
    },
)
|> fill(column: "myStart", usePrevious: true)
// finally filter for data that’s 10 seconds after each event. 
|> filter(fn: (r) => r._time >= r.myStart and r._time <= r.myStop)

Id do something like that. However please note:

Thanks!

Hello @mfrice.
Maybe use group:

import "array"

array.from(
        rows: [
              {_time: 2022-01-01T00:00:03Z, _value: 1, event: "on"},
              {_time: 2022-01-01T00:00:05Z, _value: 2, event: "on"},
              {_time: 2022-01-01T00:00:08Z, _value: 3, event: "on"},
              {_time: 2022-01-01T00:00:12Z, _value: 4, event: "on"},
              {_time: 2022-01-01T00:00:17Z, _value: 3, event: "on"},
              {_time: 2022-01-01T00:00:18Z, _value: 1, event: "off"},
              {_time: 2022-01-01T00:00:20Z, _value: 1, event: "off"},
              {_time: 2022-01-01T00:00:23Z, _value: 1, event: "off"},
              {_time: 2022-01-01T00:00:27Z, _value: 6, event: "on"},
              {_time: 2022-01-01T00:00:29Z, _value: 5, event: "on"},
              {_time: 2022-01-01T00:00:36Z, _value: 9, event: "on"},
              {_time: 2022-01-01T00:00:40Z, _value: 1, event: "off"},
              {_time: 2022-01-01T00:00:43Z, _value: 7, event: "on"},
              {_time: 2022-01-01T00:00:48Z, _value: 3, event: "on"},
        ],
    )
    |> map(fn: (r) => ({r with region: if r.event == "on" then 1 else 0}))
    |> difference(columns: ["region"], nonNegative: true)
    |> cumulativeSum(columns: ["region"])
    |> group(columns: ["region"])
    |> stateDuration(fn: (r) => true)
    |> filter(fn: (r) => r.stateDuration <= 10)
    |> reduce(
        fn: (r, accumulator) => ({_time: r._time,_value: r._value + accumulator._value}),
        identity: {_time: now(), _value: 0},
    )
    |> group()

Hello @Anaisdg
it would be much easier if we have stateWindow() funtion.