Patterns over time

I would like to perform an operation on two different streams using values from different times. For example, I’d like to search for cases where event A occurs in stream X and event B has occurred in stream Y an hour prior. My thought is to select data from the two series, timeShift() one of them, and then join them on the timestamps. I may want more complicated logic, though, like conditionally analyzing patterns in time windows. In this case, I perceive having a cascading set of data stream variables, storing the result of each conditional analysis. What are the best tools to do this kind of analysis?

Hello @simon38,
Welcome!

What are the best tools to do this kind of analysis?

I think you can do this with Flux, but of course you can always use a client library and perform the logic in the language of your choice.

Without specific example data and expected output it’s hard for me to help you in detail. What does your schema look like? How do you query for stream X and Y?

However here are some tips and doc links that might be useful to you:

For example you could do something like:

from(bucket: "X")
  |> range(start: -1h, stop: now())
  |> filter(fn: (r) => r["_field"] == "A" and exists r["_value"] )
  from(bucket: "Y")
  |> range(start: -2h, stop: -1h)
  |> filter(fn: (r) => r["_field"] == "B" )
  |> yield(name: "B")

You could also expand upon this logic and window data, conditionally map(), and use getRecord() to extract meaningful data that could be used in your second query/filter.

 |> map(fn: (r) => ({
      r with
      human_readable:
        if exists r._value then "${r._field} is ${string(v:r._value)}."
        else "${r._field} has no value."
  }))

If you could give me some more detail around “more complicated logic” it would be easier for me to help you.