TICKscript: Calculate difference between two streams but join is failing

iot
time-series
kapacitor
#1

I’m just learning the basics at the moment. TICK seems like an excellent IoT back end and I’m hoping I can use it for many future projects.

However in this specific instance, what I have is an embedded device as part of a heating system that is publishing two temperature values to two MQTT topics every 5 seconds via a mosquitto MQTT broker. “mydevice/sensor1” is the pre-heated temperature, and “mydevice/sensor2” is post-heating temperature. The values are published at almost the same time, so there’s typically never more than half a second of delay between the two messages - but they aren’t synchronised exactly.

Telegraf is subscribed to the same broker and is happily putting these measurements into an InfluxDB database called “telegraf.autogen”. The measurements both appear under a single measurement called “mqtt_consumer” with a field called “value”. In InfluxDB I can differentiate between topic-tagged values by filtering with the “topic” tag:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)

This all seems to be working correctly.

What I want to do is calculate the difference between these two topic values, for each pair of incoming values, in order to calculate the temperature differential and eventually calculate the energy being transferred by the heating system (the flow rate is constant and known). I tried to do this with InfluxDB queries in Grafana but it seemed quite difficult (I failed), so I thought I’d try and use TICKscript to break down my process into small steps.

I have been putting together a TICKscript to calculate the difference based on this example:

However in my case I don’t have two separate measurements. Instead, I create two separate streams from the single “mqtt_consumer” measurement, using the topic tag as a filter. Then I attempt to join these with a 1s tolerance (values are always close in time). I’m using httpOut to generate a view for debugging (this only updates every 10 seconds, missing every second value, even though my stream operates at 5 second intervals - why is that? I can see in the new db that the values are all present though).

Once I have them joined, I would evaluate the difference in values, and store this in a new database under a measurement called “diff”.

Here’s my script so far:

    var sensor1 = stream
        |from()
    	    .database('telegraf')
            .retentionPolicy('autogen')
            .measurement('mqtt_consumer')
            .where(lambda: "topic" == 'mydevice/sensor1')
            .groupBy(*)
    	|httpOut('sensor1')

    var sensor2 = stream
        |from()
            .database('telegraf')
            .retentionPolicy('autogen')
            .measurement('mqtt_consumer')
            .where(lambda: "topic" == 'mydevice/sensor2')
            .groupBy(*)
    	|httpOut('sensor2')

    sensor1
        |join(sensor2)
            .as('value1', 'value2')
            .tolerance(1s)
        |httpOut('join')
        |eval(lambda: "sensor1.value1" - "sensor1.value2")
    	    .as('diff')
        |httpOut('diff')
        |influxDBOut()
            .create()
            .database('mydb')
            .retentionPolicy('myrp')
            .measurement('diff')

Unfortunately my script is failing to pass any items through the join node. In kapacitor show I can see that the httpOut nodes are both passing items to the join node, but it isn’t passing any on. The kapacitor logs don’t show anything obvious either. An HTTP GET for httpOut('join') returns:

{"series":null}

So I have two questions:

  1. is this approach, using Kapacitor with a TICKscript for calculating energy based on the difference between two values in a single measurement, valid? Or is there a better/simpler way to do this?
  2. why isn’t the join node producing any output? What can I do to debug this further?
Kapacitor Join is not working with Group clause
#2

According to Subqueries Example 2 here:

The difference between two fields in the same measurement can be calculated as:

SELECT "cats" - "dogs" AS "difference" FROM "pet_daycare"

However in my case I have two series in the same measurement, but only one field (“value”) which means I’d need something like this to work (which it doesn’t):

SELECT "temp1" - "temp2" FROM (
    SELECT "value" AS "temp1" FROM "mqtt_consumer" WHERE "topic" = 'mydevice/sensor1'; 
    SELECT "value" AS "temp2" FROM "mqtt_consumer" WHERE "topic" = 'mydevice/sensor2'
)

This makes me wonder if perhaps I might be better off running all the incoming Telegraf data (through Kapacitor?) and rewriting each series into separate fields called “temp1” and “temp2” rather than a single field with a “topic” tag? Then I could do:

SELECT "temp1" - "temp2" FROM "my_data"
#3

I think I worked out how my first example isn’t working - the |from().groupBy(*) directive results in slightly different JSON than when it’s not present - the Group and Dimensions elements are different.

With no .groupBy(*):

{
    "Name": "mqtt_consumer",
    "Database": "telegraf",
    "RetentionPolicy": "autogen",
    "Group": "",
    "Dimensions": {
        "ByName": false,
        "TagNames": null
    },
    "Tags": {
        "host": "telegraf",
        "topic": "mydevice/sensor1"
    },
    "Fields": {
        "value": 17.3
    },
    "Time": "2017-08-26T08:02:59.563686175Z"
}

Then with groupBy(*) added (slightly later sample):

{
    "Name": "mqtt_consumer",
    "Database": "telegraf",
    "RetentionPolicy": "autogen",
    "Group": "host=telegraf,topic=mydevice/sensor1",
    "Dimensions": {
        "ByName": false,
        "TagNames": [
            "host",
            "topic"
        ]
    },
    "Tags": {
        "host": "telegraf",
        "topic": "mydevice/sensor1"
    },
    "Fields": {
        "value": 17.2
    },
    "Time": "2017-08-26T08:03:34.611898831Z"

It appears that this difference is enough to cause the join(sensor2) to fail when trying to combine these streams.

However I’m not really sure why - the second case seems to carry more information (the tag names are preserved whereas the first loses them), and the Group doesn’t look like it would cause a problem. Does anyone know why this difference prevents join() from working?

#4

I think this might be related to the problem reported here:

#5

Can anyone confirm whether this is intended behaviour? I’m concerned I may simply be suffering from a lack of understanding. If it looks wrong then I’m happy to file an issue, but I need to know first whether this is just operator error.