Merging stream data using union not caching multiple points

kapacitor

#1

Setup:
Points from two sources using the streamNode are continuously logged. They have identical tag and field keys. I am attempting to send an alert with the most recent data from s1 and s2 (cache each point and send an alert with the combined most recent data point from each stream on each new event).

Attempts:
I attempted several solutions using the joinNode, but I need to send the alert on every new update and joinNode reformats the timestamps to the specified interval. I most recently attempted using the unionNode, but the alert output only shows the most recent point’s data. I’ve tried to use flatten and intermediary points similar to another topic, but the result is the same: [kapacitor] Collect and compare two streams with sparse data

// STREAM 1
var s1 = stream
    |from()
        .database(db_in)
        .retentionPolicy(rp_in)
        .measurement(ms_in)
        .where(f1)
    |eval(lambda: "f1", lambda: "f2")
        .as('s1_f1', 's1_f2')

// STREAM 2 
var s2 = stream
    |from()
        .database(db_in)
        .retentionPolicy(rp_in)
        .measurement(ms_in)
        .where(f2)
    |eval(lambda: "f1", lambda: "f2")
       .as('s2_f1', 's2_f2')

// UNION 
s1
    |union(s1, s2)
        .rename('sp')
    |alert()
        .crit(lambda: 2 > 1)
        .message('sp alert')
        .log('/tmp/alerts.log')

Is there an alternate solution? Should I try and edit each last element of both stream node’s timestamps and then join after? Open to any and all suggestions.


#2

Similar problem using the union node:


#3

Can you provide the join node solution that didn’t work and why? A join node seems like the best fit for the behavior you are looking for, something like:

var s1 = stream
    |from()
    ...
    |barrier()
        .idle(1s)

var s2 = stream
    |from()
    ...
    |barrier()
        .idle(1s)

s1
    |join(s2)
        .as('s1', 's2')
        .tolerance(1s)
        .streamName('combined')
    |alert()
        ...

#4

Hi Gunnar,

The objective is to generate a new point on every stream update. Some streams update once every 10 minutes whereas others update every 100ms. When setting a tolerance, my output points were segmented into 1 or more second increments and I could achieve no higher accuracy than the tolerance value. It’s not that the join did not work, but it did not satisfy the requirements. Is a union operator not the right node to use in this situation?