Combining two stream


#1

I have a situation where I have to calculate a sum for two different tag values in the same measurement. There are 2 fields (if-_inOctets and if-_outOctets) that I have to combine for a particular matching tags.

I currently have the following script:

var intf1 = stream
        |from()
                .database('telegraf')
                .measurement('interface_snmp')
                .where(lambda: ("host" == 'akl-glf-rtr01' AND "ifDescr" == 'ae4'))
        |log().prefix('intf1-in')
        |window()
                .period(330s)
                .every(1s)
                .align()
        |log().prefix('intf1')

var intf1in = intf1
        |mean('if-_inOctets')
            .as('if-_inOctets-m')
        |log().prefix('intf1in-m')

var intf1out = intf1
        |mean('if-_outOctets')
            .as('if-_outOctets-m')
        |log().prefix('intf1out-m')

var intf2 = stream
        |from()
                .database('telegraf')
                .measurement('interface_snmp')
                .where(lambda: ("host" == 'akl-mdr-rtr01' AND "ifDescr" == 'ae3'))
        |log().prefix('intf2-in')
        |window()
                .period(330s)
                .every(1s)
                .align()
        |log().prefix('intf2')

var intf2in = intf2
        |mean('if-_inOctets')
            .as('if-_inOctets-m')
        |log().prefix('intf2in-m')

var intf2out = intf2
        |mean('if-_outOctets')
            .as('if-_outOctets-m')
        |log().prefix('intf2out-m')


intf1out
        |join(intf2out)
                .as('intf1', 'intf2')
                .tolerance(300s)
                .streamName('summar')
        |log().prefix('join')
        |eval(lambda: "intf1.if-_outOctets-m" + "intf2.if-_outOctets-m")
                .as('if-_outOctets')
        |log().prefix('post-lambda')
        |influxDBOut()
                .create()
                .database('telegraf')
                .measurement('summar')
                .tag('type','international')

intf1in
        |join(intf2in)
                .as('intf1', 'intf2')
                .tolerance(300s)
                .streamName('summar')
        |log().prefix('join')
        |eval(lambda: "intf1.if-_inOctets-m" + "intf2.if-_inOctets-m")
                .as('if-_inOctets')
        |log().prefix('post-lambda')
        |influxDBOut()
                .create()
                .database('telegraf')
                .measurement('summar')
                .tag('type','international')

But it complains in the ‘mean’ function about not being able to find the correct field:

[international_stream:mean5] 2017/09/13 05:51:47 E! failed to aggregate batch: field if-_inOctets missing from point cannot aggregate

But I can clearly see it above: in the log lines:

[international_stream:log4] 2017/09/13 05:51:47 I! intf1 {"name":"interface_snmp","tmax":"2017-09-13T05:46:48Z","points":[{"time":"2017-09-13T05:46:47Z","fields":{"if-_inOctets":7.2907403994319e+08},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}},{"time":"2017-09-13T05:46:47Z","fields":{"if-_outOctets":1.79077551245549e+08},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}},{"time":"2017-09-13T05:46:47Z","fields":{"if-_inDiscards":0},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}},{"time":"2017-09-13T05:46:47Z","fields":{"if-_outDiscards":0},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}},{"time":"2017-09-13T05:46:47Z","fields":{"if-_inErrors":0},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}},{"time":"2017-09-13T05:46:47Z","fields":{"if-_outErrors":0},"tags":{"host":"akl-glf-rtr01","ifDescr":"ae4"}}]}

If I remove the ‘mean’ function (and adjust the join accordingly) it works better, but then join gets called also on other fields (which throws an error: ‘E! invalid math operator + for type missing’).

What am I missing here?