Kapacitor Stream timestamp-confusion when using JoinNode

Hey there! I’m using kapacitor to perform some operations on two timeseries and I want to combine them to one new series.

First I took a look at this tutorial: Calculate rates across joined series + backfill | Kapacitor 1.5 Documentation

After I’ve done that to create a simple added series, I had a look at the documentation and tried to understand the different nodes and possibilities on how to do this. After a while I got confused on the behavior of the nodes and what the timestamps have to do with this. I hope someone can help me understand this.

Background: I’ve two series with a period of around 15 minutes emitting new points each. So at most two points that should be considered the same are at a 7.5 minutes distance to each other.

I got it to work with the example using a windows node like this:

...
|window()
    .period(15m)
    .every(1m)
    .align()
|last('myfield')

series1|join(series2)
    .as('s1', 's2')
...

As I understand this the window is configured to span e.g. from 10:15-10:30 due to the .align() property. The window spans 15 minutes and is emitted every 1 minute. After that, the last point in this 15 minute window is selected. As I understand the join node correctly, it should merge two incoming points, if they are considered the same by timestamp. Since it is already working this way, I’m wondering how the join node knows when to consider the points the same. In my example the points are 3 seconds away from each other in general. It could be 3 minutes though. I’m getting the idea, that the last()-node might be altering the timestamps? Or is it the window messing with the timestamps, such that, when they arrive at the join node, both points have the same timestamp? But how are the two streams synchronizing about that?

Because of this confusion I tried to build a joined stream with the documentation as I thought it would make sense.

...
|from()
    ...

stream1|join(stream2)
    .tolerance(8m)
...

So in this approach I just used two plain streams and joined them by tolerance. As the most distance between two points is 7.5 minutes (don’t compute a point if a source provides no point) I thought this should be equivalent, such that every 8 minutes a point in the new series should be computed out of every incoming point of the two originating series. Unfortunately this works for several minutes, hours or even days but eventually stops emtting points while the first approach works for several weeks now.

I’m asking myself, why is the first approach working, when based on the documentation no timestamp should be modified and hence the join node should never consider two points the same, while the second approach does not always work when the tolerance() property suggests to be the exact right tool for this job.

Hi, i am sure the following link will clarify a bit :slight_smile:

Point times versus batch times

Also , the window version will have a “tolerance” of almost 15 minutes ,
A point that arrives at 18:01 will have a time 18:15 , a point that arrives in the other window at 18:14 will also have 18:15 as time … and will be joined even with a difference of 13 minutes …
Are you sure the distance between 2 points will never exceed 8 minutes ?

I think it is also better to set the every to 15 minutes …same as the period in your windowNodes

I have done some tests and I have the impression that you need at least two points in a window before the last() returns a point but I am not sure about that yet …

Thank you for that clarification. So the windowNode is actually adding a batch-time to the stream and the aggregation function last() is then using this by default instead of the points timestamp. Got that :wink:

In both originating streams points are written at more or less 15 minutes distance. If we think of them getting more and more shifted away from each other from 0s distance, to 10s distance to eventually e.g. distance the point will then start to be closer to the next point of the other stream than to the point before. So after 15/2 minutes, or 450s a point of stream A is equally distant to two points in stream B. I’d like to have the join node combine the two points closer to each other. So instead of two points 14 minutes away, I’d rather drop this point and combine the one that now has 1 minute distance.

Screenshot%20from%202019-03-19%2011-15-21

The image shows that if the tolerance is set to 15/2 minutes, if there is a bad window first (containing two points with higher distance than 15/2), one point should be dropped and following points should be closer to each other.

Actually I’m not really sure if this property of the combined series is even necessary :smiley:

I also noticed, when using the join.tolerance, the combined series only emits points at multiples of the tolerance value. I think I also read about that in the documentation. I’m wondering if it’s possible, to have the tolerance only be applied as a kind of timeout, when no two points arrive in that time, but to complete the join as soon as two points are present. I’d like to join the stream “right at the tip” instead of caching anything more than the last node. When using the window I have to pass and think about parameters like window size and emit-period, although the only thing I’m interested in is to combine two most recent points of two series with unkown properties like shift, period etc. So basically I’d like to generalize this join concept of two streams and have the properties of the outgoing new stream be derived automatically from the two or more incoming streams :smiley: