How to parse complex json in telegraf using external code?

Hi, I’m new to telegraf.
I’m trying to use telegraf to get mqtt infos to store to a postgres database. I managed to do it with some simple comercial sensors (simple mqtt message format) . Now i have to do the same with another comercial tool (i can not modify message format)…
The problem is that keys and values are in distinct arrays. I managed to reformat the message with jq but now i’m stuck with telegraf. Is it possible to call [[inputs.mqtt_consumer]] to get raw json message (i.e. without calling a data format and call external jq code to parse json ? Maybe it’s not the way to do it …
json from product
{“type”:“Feature”,“geometry”:{“type”:“Point”,“coordinates”:[-0.530000,44.830000,50.000000]},“properties”:{“loggerID”:“CR1000X_37295”,“observationNames”:[“VWC_5cm_Avg”,“T_5cm_Avg”,“VWC_10cm_Avg”,“T_10cm_Avg”,“VWC_20cm_Avg”,“T_20cm_Avg”,“VWC_30cm_Avg”,“T_30cm_Avg”,“VWC_40cm_Avg”,“T_40cm_Avg”,“VWC_50cm_Avg”,“T_50cm_Avg”,“VWC_60cm_Avg”,“T_60cm_Avg”,“VWC_75cm_Avg”,“T_75cm_Avg”,“VWC_100cm_Avg”,“T_100cm_Avg”,“VWC_5cm_2_Avg”,“T_5cm_2_Avg”,“VWC_10cm_2_Avg”,“T_10cm_2_Avg”,“VWC_20cm_2_Avg”,“T_20cm_2_Avg”,“VWC_30cm_2_Avg”,“T_30cm_2_Avg”,“VWC_40cm_2_Avg”,“T_40cm_2_Avg”,“VWC_50cm_2_Avg”,“T_50cm_2_Avg”,“VWC_60cm_2_Avg”,“T_60cm_2_Avg”,“VWC_75cm_2_Avg”,“T_75cm_2_Avg”,“VWC_100cm_2_Avg”,“T_100cm_2_Avg”,“VWC_5cm_4_Avg”,“T_5cm_4_Avg”,“VWC_10cm_4_Avg”,“T_10cm_4_Avg”,“VWC_20cm_4_Avg”,“T_20cm_4_Avg”,“VWC_30cm_4_Avg”,“T_30cm_4_Avg”,“VWC_40cm_4_Avg”,“T_40cm_4_Avg”,“VWC_50cm_4_Avg”,“T_50cm_4_Avg”,“VWC_60cm_4_Avg”,“T_60cm_4_Avg”,“VWC_75cm_4_Avg”,“T_75cm_4_Avg”,“VWC_100cm_4_Avg”,“T_100cm_4_Avg”,“AirTC_1_Avg”,“AirTC_1_Std”,“RH_1_Min”,“RH_1_Max”,“AirTC_2_Avg”,“AirTC_2_Std”,“RH_2_Max”,“RH_2_Min”,“AirTC_3_Avg”,“AirTC_3_Std”,“RH_3_Max”,“RH_3_Min”,“LWmV_Avg”,“LWMDry_Tot”,“LWMCon_Tot”,“LWMWet_Tot”,“LWmV_2_Avg”,“LWMDry_2_Tot”,“LWMCon_2_Tot”,“LWMWet_2_Tot”,“LWmV_3_Avg”,“LWMDry_3_Tot”,“LWMCon_3_Tot”,“LWMWet_3_Tot”,“FuelM_Avg”,“PA_uS_Avg”,“T_CS506_C_Avg”],“observations”:{“2024-04-24T10:00:00Z”:[null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,-425.9,0.983,0,0,-555.9,0.983,0,0,113.6,0.983,0,0,null,null,null]}}}

my parsing in jq
jq ‘[ (.properties | [.observationNames, .observations] | transpose | map({“variable”: .[0], “value”: .[1]}) | .) + {“time”: (.properties.observations | keys)} + {“gateway”: (.properties.loggerID)} ] | del(. | select(.value ==null))’

parsed message
[
{
“variable”: “LWmV_2_Avg”,
“value”: -545.7,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMDry_2_Tot”,
“value”: 0.983,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMCon_2_Tot”,
“value”: 0,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMWet_2_Tot”,
“value”: 0,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWmV_3_Avg”,
“value”: 88,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMDry_3_Tot”,
“value”: 0.983,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMCon_3_Tot”,
“value”: 0,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
},
{
“variable”: “LWMWet_3_Tot”,
“value”: 0,
“time”: “2024-04-05T13:30:00Z”,
“gateway”: “CR1000X_37295”
}
]

telegraf.conf doesn’t output anything …
[global_tags]
[agent]
interval = “10s”
round_interval = true
metric_batch_size = 10
metric_buffer_limit = 10000
collection_jitter = “0s”
flush_interval = “10s”
flush_jitter = “0s”
precision = “0s”
debug = true
quiet = false
hostname = “”
omit_hostname = false
[[outputs.file]]
files = [“stdout”, “/tmp/metrics.out”]
[[inputs.mqtt_consumer]]
servers = [“tcp://xxx.xxx.XXX.XXX:1883”]
topics = [
“cs/v1/data/cr1000x”,
]
name_override = “environnement”
topic_tag = “”
qos = 2
persistent_session = true
client_id = “mosq_ggggg”
username = “yyyyy”
password = “xxxxxx”

[[inputs.mqtt_consumer.topic_parsing]]
    topic = "cs/v1/data/cr1000x/+"
    tags = "_/_/_/_/centrale"

I would appreciate any help on how to approach the problem!

Yes, you can use the value data_type and store the original message as a string value to store the original message. Then you can use some combination of exec and parser processors to manipulate the original string.

Thanks, i will explore this solution. I’m testing another solution to call execd to launch a command calling mosquitto_sub xxx | jq ‘xxx’ as an input instead of using mqtt_consumer.

That could also work! Let us know what you decide on.