Stream inner join on a non time column with Kapacitor

kapacitor
#1

Hi,

I am trying to recreate an inner join with Kapacitor on a non time column, but so far I did not achieved that.

  • I have two measurement tables T1 and T2, each with 3 columns: time, seq_no, and value.
  • Sequence number (seq_no) is a unique column per table.
  • The arrival rate of new samples per table is about 50 per second
  • Maximum expected time interval (gap) between two rows in T1 and T2 with the same seq_no is a few seconds

I want to produce a third table T3 in the following way:

SELECT T1.value, T2.value
FROM T1
INNER JOIN T2 ON T1.seq_no = T2.seq_no;

There is a similar post here, but It didn’t answered my question.

This is what I have now:

var S1 = stream
|from()
    .measurement('T1')
    .groupBy('seq_no')
var S2 = stream
|from()
    .measurement('T2')
    .groupBy('seq_no')
S1
|join(S2)
   .as('S1', 'S2')
   .tolerance(10s)
   .fill('null')
|where(lambda: "S1.seq_no" == "S2.seq_no")
|eval(lambda: "S1.seq_no", lambda: "S2.seq_no")
   .as('s1', 's2')
|influxDBOut()
   .database('dbname')
   .buffer(2)
   .measurement('join3')

Below is the output that I get. The problem with it is that I am seeing a single sample per minute or so, thus majority of samples in T1 and T2 are never being matched!

time                s1      s2
----                --      --
1522941900000000000 3424253 3424253
1522941990000000000 3427773 3427773
1522942050000000000 3430717 3430717
1522942110000000000 3433461 3433461
1522942230000000000 3439337 3439337
1522942290000000000 3442237 3442237
1522942330000000000 3444125 3444125
1522942450000000000 3449709 3449709

Is there somehow I can increase the “matching” buffer between two tables, or redesign my query?
Thank you