Filtering incomming data with JSON Schema

hello

i am bit new to telegraf. till now i used it mostly for collecting system metrics.

currently i am facing following problem: from several resources/devices i am collecting measurements in form of JSON data. these data are produced by sensors and send as JSON documents over mqtt protocol like this:

{
   "battery":100,
   "ts":1706550016,
   "temperature":24.4
}

because those data are generated by students, i need to validate them - best way is with json schema. so what i need ideally is:

  • to write own validator in python (there will be more different measurements and related JSON documents)
  • if such JSON document fails the validation, it will be not passed further
  • if the input document is valid, i would like to convert it to influx line protocol by my own (the result will be composition of mqtt topic and mqtt message) and with output plugin i will write it to influxdb.

so the flow looks like:

inputs:mqtt_consumer → my python validator → outputs:influxdb_v2

is it possible to do something like this with telegraf or do i need to write my own script? and what is it then? is it input plugin? processor? can you point me, how to do that?

thanks for any help

mirek

Hello @bletvaska,
Yes absolutely. You can use the execd processor plugin to do that.
Here’s an example repo:

# A simple processor. It reads stdin and writes it to stdout.
import sys

def main():
   for line in sys.stdin:
       print(line.rstrip())
       sys.stdout.flush()

if __name__ == '__main__':
   main()

And the telegraf config would look like:

[[processors.execd]]
  command = ["python", "./processors/myscript.py"]

Let me know if that helps.

ahoj

thanks for the point. i found it few minutes ago in telegraf basics training, so i am completely in :slight_smile:

mirek