Patterns over time

I would like to perform an operation on two different streams using values from different times. For example, I’d like to search for cases where event A occurs in stream X and event B has occurred in stream Y an hour prior. My thought is to select data from the two series, timeShift() one of them, and then join them on the timestamps. I may want more complicated logic, though, like conditionally analyzing patterns in time windows. In this case, I perceive having a cascading set of data stream variables, storing the result of each conditional analysis. What are the best tools to do this kind of analysis?

Hello @simon38,
Welcome!

What are the best tools to do this kind of analysis?

I think you can do this with Flux, but of course you can always use a client library and perform the logic in the language of your choice.

Without specific example data and expected output it’s hard for me to help you in detail. What does your schema look like? How do you query for stream X and Y?

However here are some tips and doc links that might be useful to you:

For example you could do something like:

from(bucket: "X")
  |> range(start: -1h, stop: now())
  |> filter(fn: (r) => r["_field"] == "A" and exists r["_value"] )
  from(bucket: "Y")
  |> range(start: -2h, stop: -1h)
  |> filter(fn: (r) => r["_field"] == "B" )
  |> yield(name: "B")

You could also expand upon this logic and window data, conditionally map(), and use getRecord() to extract meaningful data that could be used in your second query/filter.

 |> map(fn: (r) => ({
      r with
      human_readable:
        if exists r._value then "${r._field} is ${string(v:r._value)}."
        else "${r._field} has no value."
  }))

If you could give me some more detail around “more complicated logic” it would be easier for me to help you.

Sorry it took so long to reply. I’ve been directed to explore the capabilities and performance of Flux without details of what we’ll be doing with it. I’m trying to understand when to process data with Flux and when to do it with our own code. I decided to first study basic queries and document my results before coming back to this question.

My question is about how much can be “remembered” by a query, so to speak. I was given a general notion of regular expressions. More simply, how would you find patterns in the data? One example is finding local minima and maxima, which we already discussed in a different thread. I had to go through quite a few steps to accomplish that. Other people have asked how to find runs in the data, which is something I’ve also been trying to do. For example, I want to get the histogram of the length of all periods where some value is above a threshold. I have a solution and will pose that as a separate question, but let’s say I then want to find bursts of long periods, or short periods. It gets complicated without an accumulator.

The best I found in flux is the map function, which only maps the current row, and reduce, which only produces an accumulated value. A combination of the two, let’s call it map-reduce, would be very helpful. Even a function to create in index would help, because then I can join and pivot on the indices.

I haven’t studied database systems, so maybe this is more than what can be expected of one. I don’t know if people use stored procedures for this kind of logic in relational databases, or just retrieve the data and manipulate it. Either way, I find that retrieving a full set of data takes much longer than just reading from a flat file, so I would want to manipulate and reduce it as much as possible before retrieving.

I’ll point out that we’re only using OSS right now. The cloud version may be quicker with more optimizations, but it will only be an option for someone who will make commercial use of our work.

Hello @simon38,
You can use the following to create an index:

  |> map(fn: (r) => ({ r with index: 1.0 }))
  |> cumulativeSum(columns: ["index"])

Why can’t you use map first() and then apply reduce()? I’m having trouble imagining an instance where a combination of the two would be different than applying them together.
To find bursts of long or short periods where a value exceeded a threshold I would:

  1. create a check to evaluate whether your data meets that condition. This check will assign a level to your data that will describe whether or not your value exceeded the threshold.
  2. Use the stateDuration function() stateDuration() function | InfluxDB OSS 2.0 Documentation
  3. filter for instances where the duration is meets the “long” or “short” period conditions.

I don’t mean to say that there should be a function that does map followed by reduce, but rather that it maps the points in a series based on previous values in the series, similar to the way “reduce” has an accumulator to remember the results of an operation on previous points in the series. A similar question was asked on the Slack site, here:

I only recently signed up so I’m still not posting questions there.

More generally, it would be useful to define static memory locations that would have persistent value over successive calls to the defined function as rows are processed. Using the example again of finding the local minima, if we wanted to mark each row following a local minimum, we might have a function like this:

data |> mapreduce(
map: (r, memory) => ({_value: memory.x_2 >= memory.x_1 && r._value > memory.x_1}),
update: (r, memory) => ({x_1: r._value, x_2: memory.x_1}),
identity: {x_1: 0, x_2: 0})

where the anonymous functions “map” and “update” are called on each row, except that map outputs a new table just as the existing “map” function does and “update” updates “memory” just as the existing “reduce” function updates “accumulator”.

This is just an example of many conditions (patterns) that one might want to use in a query, which are quite simple if static variables are available to “remember” some of the “history” of previous rows but potentially quite complex in Flux without this capability. I suggested before creating an index on each table of the data to be used in the calculation, which itself is complex, then joining the tables on the indices, and finally calling map. Overall this would be complex, confusing, and very slow. Having the type of function I described would be much simpler to write and run much faster.

Am I supposed to make a feature request? I don’t know anything about that.

1 Like

Ah yes I understand now. Thank you for explaining!
That’s a great idea. I too have needed that functionality.
Can you please create a new feature request here:

and let me know when you have so I can tag the appropriate people and comment?
Thank you for this feedback!