MQTT JSON Output has a field with an array of numeric values

Hello everyone

Problem statement:
While Telegraf is receiving a JSON message from the broker, I am facing an issue with influx being unable to show the measurement form one specific field called “Values”.
'Values': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]

The expanded form of the problem:
The MQTT broker is sending out a received-message to all subscribers to a topic.
The message contains a field called “Values”, that has an array of numbers [1,2,3,…].
I am unable to format this array into measurements to be posted to influx.

An MQTT published message from a sensor is in the following form:

{
"d": {
"Battery_Voltage": "2.90 V",
"Device_Type": "Advanced Vibration Meter",
"Internal_State": "False",
"Is_Current": "True",
"Radio_Signal_Strength": "-32 db",
"Sensor_ID": "730952",
"Sensor_Number": "730952",
"Time_of_Reading": "2021-10-10T17:47:47+00:00",
"Units": [
"mm/s",
"mm/s",
"mm/s",
"Hz",
"Hz",
"Hz",
"",
"deg. C",
"Percent",
"",
"",
""
],
"Values": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0,
26.0,
0,
1.41,
1.41,
1.41
],
"Vstamp": "45bbf7a389cf501a08d9540dcd7a8c1dbec5c80d3f9899e5c3829fd6aef94fd3",
"Vstring": "730952[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]False2021-10-10T17:47:47+00:00"
}
}

Telegraf plugin prepared in the telegraf configuration file:
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = ["MQTTSensor/type/111/#"]
username = "user"
password = "password"
data_format = "json"
name_override = "mqttstuff"
json_query = "d"
json_string_fields = ["Sensor_ID", "Values", "Battery_Voltage", "Radio_Signal_Strength", "Device_Type", "Internal_State"]
tag_keys = ["Device_Type", "Sensor_ID"]

[[processors.enum]]
namepass = ["mqttstuff"]
[[processors.enum.mapping]]
field = "Internal_State"
[processors.enum.mapping.value_mappings]
"False" = 0
"True" = 1

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Battery_Voltage"
cutset = " V"

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Radio_Signal_Strength"
cutset = " db"

[[processors.converter]]
namepass = ["mqttstuff"]
[processors.converter.fields]
float = ["Values", "Battery_Voltage", "Radio_Signal_Strength", "Internal_State"]

[[outputs.file]]
namepass = ["mqttstff"]
files = ["stdout", "/tmp/metrics.out"]
data_format = "json"
json_timestamp_units = "1ns"

[[outputs.file]]
namepass = ["mqttstuff"]
files = ["/tmp/mqtt_telegraf.out"]
influx_sort_fields = true

[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "TOKEN"
organization = "myorg"
bucket = "mqttbucket"
namepass = ["mqttstuff"]

The corresponding measurement sent to influx:
Collected command line equivalent measurement from the file output shown in the above configuration
mqttstuff,Device_Type=Advanced\ Vibration\ Meter,Sensor_ID=730952,host=ip-IP_HERE,topic=MQTTSensor/type/111/id/730952 Battery_Voltage=2.9,Internal_State=0,Radio_Signal_Strength=-33,Values="0.0" 1634393488800763731

As can be seen, the Values field in the collected output shows only 1 numeric value.
I tried to process through many of the Telegraf Processors plugins and was not successful.
I was looking into passing the MQTT JSON measurement through outputs.exec to a Python script, but it can’t be passed back to influx output plugin. Plus, having no arguments fed into the sys.argv variable from telegraf by means of passing the json measurement into python, just leads the python file in the argument alone. Unless I got the documentation misunderstood, I need assistance to figure out how to parse or pivot or tag or what have you, of a “Value” field with an array of numbers.
'Values': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]

PS: I may need to add Field names manually in config later.

If there is a way to pass the json script to python from telegraf, I can opt to use this simple formatting script I wrote; however, I wasn’t successful in returning the generated output back into telegraf to continue to post to influx:

import json

x = json.loads("""{"d":{"Sensor_ID":"730952","Time_of_Reading":"2021-10-10T17:47:47+00:00","Device_Type":"Advanced Vibration Meter","Sensor_Number":"730952","Values":[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41],"Units":["mm/s", "mm/s", "mm/s", "Hz", "Hz", "Hz", "", "deg. C", "Percent", "", "", ""],"Battery_Voltage":"2.90 V","Radio_Signal_Strength":"-32 db","Internal_State":"False","Vstring":"730952[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]False2021-10-10T17:47:47+00:00","Vstamp":"45bbf7a389cf501a08d9540dcd7a8c1dbec5c80d3f9899e5c3829fd6aef94fd3","Is_Current":"True"}}
""")

Values_Values = x["d"]["Values"]
Values_Names = ["VelX", "VelY", "VelZ", "FreqX", "FreqY", "FreqZ", "", "Temp", "Duty", "CrestFacX", "CrestFacY",
"CrestFacZ"]
for i in range(len(x['d']['Values'])):
=tab=> x['d'][Values_Names[i]] = Values_Values[i]

del x['d']['Values']
del x['d']['']
del x['d']['Units'][6]
print(json.dumps(x, sort_keys=True, indent=4))

Finale:
Before I commit self-immolation by attempting to use Starlark, can anyone please confirm there is a simpler way? Please and thank you. :stuck_out_tongue:

NOTE: metrics.out has nothing in it.

Hi @Shayban_Sawan ,
Many thanks for your detailed query. It always helps us out trying to get to the bottom of your issue. Have you tried using the JSON serializer version 2? It should be able to process your array. The main issue I see however is there is not an easy way to define each item in the array as unique.

Since JSON_V2 will write each value under the same measurement you will overwrite each over when reaching InfluxDB. Is the array consistently of the same size? If so what you could do explicitly define the values like so:

       [[inputs.mqtt_consumer.json_v2.object]]
           path = "newEvent"
           disable_prepend_keys = true
           excluded_keys = ["Values"]
           timestamp_format = "unix"
           timestamp_timezone = ""

The above will make sure to add all your other fields of data except Values. We will process this manually.
This next part will be added straight after the above:

[[inputs.file.json_v2.object]]
path = "d.Values"
            [[inputs.file.json_v2.object.field]]
                path = "1" 
                rename = "array_item_1" 
                type = "float" 

You would need to repeat this for each item in the array.
Not a great method but hopefully that gives you a workaround for now.

Hi @Jay_Clifford

Hope you’re doing well.
I found out about json_v2 by accident today, and honestly i found it a bit difficult.
Still, I took your example and adjusted my config as such:

[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = ["MQTTSensor/type/111/#"]
username = "user"
password = "password"
data_format = "json_v2"
name_override = "mqttstuff2"

#######################################

[[inputs.mqtt_consumer.json_v2]]
namepass = ["mqttstuff2"]

[[inputs.mqtt_consumer.json_v2.object]]
path = "newEvent"
disable_prepend_keys = true
excluded_keys = ["Values"]
timestamp_format = "unix"
timestamp_timezone = ""
namepass = ["mqttstuff2"]

[[inputs.mqtt_consumer.json_v2.object]]
namepass = ["mqttstuff2"]
path = "d.Values"
[[inputs.mqtt_consumer.json_v2.object.field]]
path = "8"
rename = "DutyCycle"
type = "float"
[inputs.mqtt_consumer.json_v2.object.fields]
key = "int"

#######################################

I wasn’t able to get any output, neither over the syslog, nor over telegraf.log, or mqtt_telegraf2.out from [[outputs.file]] or stdout.

Note: the last line in the snippet above was added after

I may have rushed to conclusions here, so I will try to limit inputs to just a simple [[inputs.file]] and try again later on by passing --test to the CLI.

I welcome any suggestions you may have meanwhile.

On another note, I tried starlark; however, I will post findings over another thread, faced issues with “Values” when decoding the JSON string mentioned in the opening of this thread, as an array, when referencing the index returns the individual letters constituting the string in “Values” = “[1,2,3,5]”. Index 0 return “[”, etc…