Parse field names from payload

Hi,

I need some advice for the MQTT payload I want to ingest in Influx.

  • My Data is coming from multiple instruments fitted with multiple sensors

  • Each instrument is identified with a Serial Number

  • Number of sensors can be different for different instruments

  • Each sensor can be given different name by the end user

  • The payload is like this:
    {
    ‘timestamp’: ‘2024-03-19 13:09:00’,
    ‘SERNUM’: ‘INSTRUMENT-SERIAL#’,

    ‘Sensor_1_Name_Conc’: 1,
    ‘Sensor_1_Name_Unit’: ‘OU’,
    ‘Sensor_1_Name_Alarm_Level’: ‘NONE’,

    ‘Sensor_2_Name_Conc’: 101.1,
    ‘Sensor_2_Name_Unit’: ‘PPB’,
    ‘Sensor_2_Name_Alarm_Level’: ‘NONE’,

    ‘Sensor_3_Name_Conc’: 36.0,
    ‘Sensor_3_Name_Unit’: ‘PPB’,
    ‘Sensor_3_Name_Alarm_Level’: ‘NONE’,



    }

My aim is to get the data in Influx and then configure a Grafana dashboard to view the data.

So far what I have done is:

  • Setup Influx cloud with Telegraf running in a separate container
  • I am able to read the data from my MQTT broker via Telegraf
  • Telegraf is configured with xpath_json parser.
  • Telegraf is able to parse the tags, fields and ingest data into Influx.

The advice I am looking for is:

  1. How should I handle the variable property names? Is it possible that there is some plugin which can parse the property name and create new fields accordingly?
  2. Should I use different measurements for different instruments? (or even different buckets?)
  3. In the Grafana dashboard, I think I can provide a selector for the instrument serial number and then based on the serial number it can populate the required panels (I am yet to study Grafana in detail for that purpose) but if I have to query all serial numbers, is that possible? I am using the serial number as tag and I hope that is correct.
  4. Do I need to go in a different direction? Use Kapacitor? Or write my own code running separately?

I am using Influx cloud version and new to all this so open to use any query language (InfluxQL or Flux or SQL)

It will be good to have some opinions about it please.

Also, if this is not the right place to ask such advice, please let me know. Thanks!

Hello @ga29,
Thank you for your detailed question. It makes our jobs easier :slight_smile:

  1. As far as I know (@jpowers am I mistake here?) Telegraf plugin specifically designed to automatically parse property names and create fields accordingly without some form of scripting or preprocessing. You might need to write a small script that subscribes to the MQTT topics, restructures the payload into a more consistent format, and then forwards it to InfluxDB. Or use a processor plugin. Check out these telegraf processors for parsing:
    https://github.com/influxdata/telegraf/blob/master/plugins/processors/execd/README.md
    How to Use Starlark with Telegraf | InfluxData

  2. I would make different insturments different tags especially if you’re polling them at a similar frequency. If the instruments vary significantly like 1h frequency vs 1ns frequency then I would put them into different measurements.

  3. Yes you should be able to query all serial numbers easily if your data is in one measurement. You’ll have to perform multiple queries if you want to query multiple buckets or measurments.

  4. If you’re using the new version of Influx Cloud you’ll have access to SQL and InfluxQL.

As an aside you might want to take a look at Quix for stream processing you might find it easy to use too. Take a look at these projects with InfluxDB:

Event detection and alerting featuring InfluxDB and PagerDuty: In this tutorial you learn how to create a CPU overload alerting pipeline with Quix Cloud, Quix Streams, InfluxDB, and PagerDuty.
Predictive maintenance: This project template contains the full source code for a data pipeline and dashboard that illustrates how predictive maintenance can work in practice. It simulates data generated by a fleet of 3D printers and predicts which ones are going to fail before the print is finished using a time series forecasting algorithm.
Quix Saving the holidays: This project provides an example of how to use Quix and InfluxDB 3.0 to build a machine anomaly detection data pipeline. This repository contains the full data pipeline as a project but does not include the data simulator (See getting started for more details).

I’d stay away from kapacitor. Very few people know about the Kapacitor language and its only in maitenance mode. You definitely can use it but getting assistance will nearly impossible from Influx employees.

1 Like

You haven’t shared your config or what you want your metrics to look like so I feel like I am guessing, but if you are using the xpath processor it takes wildcards to be able to pull in random or unknown field names.

If you are instead trying to rename these fields, then like Anais said, the starlark processor or even the regex processor can be used to rename the fields.

1 Like

Thank you @Anaisdg @jpowers
I think Starlark is the key to my problem. I will experiment with it and provide an update. Thanks for a quick response, appreciate it.

1 Like

Just to update - Starlark processor was the solution I was looking for.

I included it and wrote some code to parse the fields dynamically, update the key names etc. as required. In Grafana now I can use these names to populate the dashboard panels dynamically based on the number of sensors in incoming payload and also I am able to separate the Grafana panels depending on the type of sensors.

I was able to do all this without requesting the original developers for any payload changes. Though payload format changes will make few things easier for me, at the moment starlark has helped significantly.

Thanks again for your support!

1 Like