Hello everyone
Problem statement:
While Telegraf is receiving a JSON message from the broker, I am facing an issue with influx being unable to show the measurement form one specific field called “Values”.
'Values': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]
The expanded form of the problem:
The MQTT broker is sending out a received-message to all subscribers to a topic.
The message contains a field called “Values”, that has an array of numbers [1,2,3,…].
I am unable to format this array into measurements to be posted to influx.
An MQTT published message from a sensor is in the following form:
{
"d": {
"Battery_Voltage": "2.90 V",
"Device_Type": "Advanced Vibration Meter",
"Internal_State": "False",
"Is_Current": "True",
"Radio_Signal_Strength": "-32 db",
"Sensor_ID": "730952",
"Sensor_Number": "730952",
"Time_of_Reading": "2021-10-10T17:47:47+00:00",
"Units": [
"mm/s",
"mm/s",
"mm/s",
"Hz",
"Hz",
"Hz",
"",
"deg. C",
"Percent",
"",
"",
""
],
"Values": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0,
26.0,
0,
1.41,
1.41,
1.41
],
"Vstamp": "45bbf7a389cf501a08d9540dcd7a8c1dbec5c80d3f9899e5c3829fd6aef94fd3",
"Vstring": "730952[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]False2021-10-10T17:47:47+00:00"
}
}
Telegraf plugin prepared in the telegraf configuration file:
[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = ["MQTTSensor/type/111/#"]
username = "user"
password = "password"
data_format = "json"
name_override = "mqttstuff"
json_query = "d"
json_string_fields = ["Sensor_ID", "Values", "Battery_Voltage", "Radio_Signal_Strength", "Device_Type", "Internal_State"]
tag_keys = ["Device_Type", "Sensor_ID"]
[[processors.enum]]
namepass = ["mqttstuff"]
[[processors.enum.mapping]]
field = "Internal_State"
[processors.enum.mapping.value_mappings]
"False" = 0
"True" = 1
[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Battery_Voltage"
cutset = " V"
[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Radio_Signal_Strength"
cutset = " db"
[[processors.converter]]
namepass = ["mqttstuff"]
[processors.converter.fields]
float = ["Values", "Battery_Voltage", "Radio_Signal_Strength", "Internal_State"]
[[outputs.file]]
namepass = ["mqttstff"]
files = ["stdout", "/tmp/metrics.out"]
data_format = "json"
json_timestamp_units = "1ns"
[[outputs.file]]
namepass = ["mqttstuff"]
files = ["/tmp/mqtt_telegraf.out"]
influx_sort_fields = true
[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "TOKEN"
organization = "myorg"
bucket = "mqttbucket"
namepass = ["mqttstuff"]
The corresponding measurement sent to influx:
Collected command line equivalent measurement from the file output shown in the above configuration
mqttstuff,Device_Type=Advanced\ Vibration\ Meter,Sensor_ID=730952,host=ip-IP_HERE,topic=MQTTSensor/type/111/id/730952 Battery_Voltage=2.9,Internal_State=0,Radio_Signal_Strength=-33,Values="0.0" 1634393488800763731
As can be seen, the Values field in the collected output shows only 1 numeric value.
I tried to process through many of the Telegraf Processors plugins and was not successful.
I was looking into passing the MQTT JSON measurement through outputs.exec to a Python script, but it can’t be passed back to influx output plugin. Plus, having no arguments fed into the sys.argv variable from telegraf by means of passing the json measurement into python, just leads the python file in the argument alone. Unless I got the documentation misunderstood, I need assistance to figure out how to parse or pivot or tag or what have you, of a “Value” field with an array of numbers.
'Values': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]
PS: I may need to add Field names manually in config later.
If there is a way to pass the json script to python from telegraf, I can opt to use this simple formatting script I wrote; however, I wasn’t successful in returning the generated output back into telegraf to continue to post to influx:
import json
x = json.loads("""{"d":{"Sensor_ID":"730952","Time_of_Reading":"2021-10-10T17:47:47+00:00","Device_Type":"Advanced Vibration Meter","Sensor_Number":"730952","Values":[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41],"Units":["mm/s", "mm/s", "mm/s", "Hz", "Hz", "Hz", "", "deg. C", "Percent", "", "", ""],"Battery_Voltage":"2.90 V","Radio_Signal_Strength":"-32 db","Internal_State":"False","Vstring":"730952[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 26.0, 0, 1.41, 1.41, 1.41]False2021-10-10T17:47:47+00:00","Vstamp":"45bbf7a389cf501a08d9540dcd7a8c1dbec5c80d3f9899e5c3829fd6aef94fd3","Is_Current":"True"}}
""")
Values_Values = x["d"]["Values"]
Values_Names = ["VelX", "VelY", "VelZ", "FreqX", "FreqY", "FreqZ", "", "Temp", "Duty", "CrestFacX", "CrestFacY",
"CrestFacZ"]
for i in range(len(x['d']['Values'])):
=tab=> x['d'][Values_Names[i]] = Values_Values[i]
del x['d']['Values']
del x['d']['']
del x['d']['Units'][6]
print(json.dumps(x, sort_keys=True, indent=4))
Finale:
Before I commit self-immolation by attempting to use Starlark, can anyone please confirm there is a simpler way? Please and thank you.
NOTE: metrics.out has nothing in it.