Problems with json transformation in Telegraf - outputs.mqtt

Hello,

I am having some issues with a json_transformation in outputs.mqtt plugin in Telegraf configuration, with Telegraf version 1.27. I am getting the message: [outputs.mqtt] Could not serialize metric batch for topic "###": argument must be an object or an array of objects

I am doing the following transformation that I previously tried in https://try.jsonata.org/ with version JSONata 1.5.4 and was working, so the transformation is ok: https://try.jsonata.org/5aN2vrMY3

Input data:

{
  "metrics": [
    {
      "fields": {
        "FeederOverride": 1
      },
      "name": "data_1",
      "tags": {
        "id": "ns=2;s=unit/OPCUA.FeederOverride"
      },
      "timestamp": 1696586985
    },
    {
      "fields": {
        "PressSPM__value": 10
      },
      "name": "data_1",
      "tags": {
        "id": "ns=2;s=unit/OPCUA.PressSPM"
      },
      "timestamp": 1696586985
    },
    {
      "fields": {
        "PressSPM__target": 2
      },
      "name": "data_1",
      "tags": {
        "id": "ns=2;s=unit/OPCUA.PressSPM_target"
      },
      "timestamp": 1696586985
    }
  ]
}

Expected output data

{
  "FeederOverride": 1,
  "PressSPM": {
    "value": 10,
    "target": 2
  }
}

JSON trasnformation

$each($merge([metrics.fields]), function($value, $key) {{
    "key": $key,
    "value": $value
  }}) ~> $reduce(function($acc, $entry) {(
    $contains($entry.key, "__")
      ? (
        $outerLevelKey := $split($entry.key, "__")[0];
        $innerLevelKey := $split($entry.key, "__")[1];        
        $merge([$acc, { 
          $outerLevelKey: $merge([
            $lookup($acc, $outerLevelKey), 
            { $innerLevelKey: $entry.value }
          ])
        }])
      )
      : $merge([$acc, { $entry.key: $entry.value }])
  )}, {})

telegraf.conf

[[outputs.mqtt]]
  fieldpass = ["FeederOverride","PressSPM__value", "PressSPM__target"]             
  topic = "###"
  json_transformation = '''
$each($merge([metrics.fields]), function($value, $key) {{
    "key": $key,
    "value": $value
  }}) ~> $reduce(function($acc, $entry) {(
    $contains($entry.key, "__")
      ? (
        $outerLevelKey := $split($entry.key, "__")[0];
        $innerLevelKey := $split($entry.key, "__")[1];        
        $merge([$acc, { 
          $outerLevelKey: $merge([
            $lookup($acc, $outerLevelKey), 
            { $innerLevelKey:$entry.value } 
          ])
        }])
      )
      : $merge([$acc, { $entry.key: $entry.value }])
  )}, {})
'''
  batch = true
  data_format="json"

But it seems Telegraf is having problems with the line $lookup($acc, $outerLevelKey), because when I remove it, Telegraf is transforming the json (not the way I want, but it’s not complaning). Moreover, if I change the line to $acc.PressSPM, is also working.
So, the only function that is not used in the rest of the transformation is $lookup(), do you know if there is any problem using this function in Telegraf or do you know any other one that I use?

Thanks a lot!

Hello @iserranoe,
Ooof I’m not sure. @Jay_Clifford or @jpowers are you familiar?
Thank you!

The user filed an issue with Telegraf: problems with json_transformation in telegraf-outputs.mqtt · Issue #14069 · influxdata/telegraf · GitHub

Sven has been looking into this and we have narrowed it down to an issue with the upstream library telegraf is using.