Telegraf, JSON and transformations

I have system that provides status information through an HTTP call returning JSON code. One section of the JSON looks like:

    "link_info": {
    "10.88.42.36": {
        "neighborLinkQuality": 0.71699999999999997,
        "noise": -95,
        "linkQuality": 1,
        "linkType": "RF",
        "hostname": "KJ6WEG-OAK-Griz-SectorM5.local.mesh",
        "tx_rate": 19.5,
        "olsrInterface": "wlan0",
        "rx_rate": 19.5,
        "signal": -81
    },
    "10.111.157.186": {
        "neighborLinkQuality": 1,
        "linkQuality": 1,
        "hostname": "KK6RUH-Oakland-Nanobeam-M5.local.mesh",
        "olsrInterface": "eth0.2",
        "linkType": "DTD"
    }
    },

This results in fields in Influx like link_info_10.100.98.190_linkQuality and link_info_10.88.42.36_rx_rate. I would like to see these fields be link_info_linkQuality and link_info_rx_rate with a tag with the IP address but cannot figure out how to configure Telegraf to do this.

Ideally, the JSON code would originally look like:

    "link_info": [
        {"IPAddress": "10.88.42.36",
         "neighborLinkQuality": 0.71699999999999997,
         "noise": -95,
         "linkQuality": 1,
         "linkType": "RF",
         "hostname": "KJ6WEG-OAK-Griz-SectorM5.local.mesh",
         "tx_rate": 19.5,
         "olsrInterface": "wlan0",
         "rx_rate": 19.5,
         "signal": -81
        },
        {"IPAddress": "10.111.157.186",
         "neighborLinkQuality": 1,
         "linkQuality": 1,
         "hostname": "KK6RUH-Oakland-Nanobeam-M5.local.mesh",
         "olsrInterface": "eth0.2",
         "linkType": "DTD"
        }
    },

but alas I’m currently not able to change the code to make that happen. It’s written in Lua on an embedded system and that’s just not something I can work with right now.

My current thought is to skip Telegraf and write a Python program that runs periodically to read the JSON, adjust to the desired format and then write to the Influx database.

Any other suggestions on how I can do the transformations I’m looking for?

Thanks!

I think your data source is not producing valid json code.
The standard json parser in Telegraf will not cope with this.

I would fix the problem at the source.

But if that is not possible, there is the possibility of a custom inputs.execd or processors.execd plugin.

I fiddled around with your json data and found a solution that could serve as a starting point for you.

I put your json snippet on jsonblob for testing:

https://jsonblob.com/4da2f61e-8a89-11eb-a428-a5fa7295f33b

I wrote a custom processors.execd plugin in Python 3 to do data cleaning.

Here the part of the telegraf.conf file:

[[inputs.http]]
  urls = ["https://jsonblob.com/api/jsonBlob/4da2f61e-8a89-11eb-a428-a5fa7295f33b"]
  method = "GET"
  headers = {"Content-Type"="application/json", "Accept"="application/json"}
  name_override = "http"
  data_format = "json"
  json_query = "link_info"
  json_string_fields = ["*hostname", "*linkType", "*olsrInterface"]

[[processors.execd]]
  namepass = ["http"]
  command = ["python", "pyjsonparser.py"]

[[outputs.file]]  # only for debugging
  files = ["link_info.out"]
  influx_sort_fields = true

That is the Python 3 processors plugin, stored in pyjsonparser.py

from influxdb_client import Point
from line_protocol_parser import parse_line

while True:
    try:
        input_line = input()  # read from stdin
    except EOFError:  # catch EOF error
        break
    else:
        data = parse_line(input_line)  # parse input line
        fields = data['fields']
        ips = dict()
        for key, value in fields.items():
            ipaddress, substring = key.split('_', 1)
            if ipaddress not in ips:
                ips[ipaddress] = dict()
            ips[ipaddress][substring] = value
        for key, value in ips.items():
            datapoint = dict()
            datapoint['measurement'] = data['measurement']
            datapoint['fields'] = value
            datapoint['tags'] = {'ip' : key}
            datapoint['time'] = data['time']
            point = Point.from_dict(datapoint)  # new metric object
            print(point.to_line_protocol())  # write to stdout

You will need two external python 3 libraries:

pip install influxdb-client
pip install line-protocol-parser

According to various JSON validators, this is valid JSON. I appreciate the verification that I’m not missing something obvious and I will try to get this resolved at the source. But in the mean time, I still want to be able to process the data from these devices.

You code is exactly the direction that I started heading. Never having written for the execd plugin, this helps in getting my problem resolved. I’ll add my final solution to the thread once I’ve had it running for a bit.

Thanks!

1 Like

Ok, the json snippet led me on the wrong track.

:+1: