Telegraf Dynamic Tag Parsing - Batch Style

Via Kafka, I have this incoming JSON payload, that looks very similar to this:

{
  "measurement": "weather",
  "batches": [
    {
      "time": 1735854321,
      "tags": {
        "tag1": "tagValueA",
        "tag2": "tagValueB"
      },
      "fields": {
        "field1": 1,
        "field2": 2
      }
    },
    {
      "time": 1735854322,
      "tags": {
        "tag1": "tagValueC",
        "tag2": "tagValueD"
      },
      "fields": {
        "field1": 3,
        "field2": 4
      }
    }
  ]
}

I’d like to parse this with Telegraf with either json_v2, or xpath_json, or another parser. In this situation, we don’t know the tags names in advance.

The expected LP result should be:

weather,tag1=tagValueA,tag2=tagValueB field1=1,field2=2 1735854321
weather,tag1=tagValueC,tag2=tagValueD field1=3,field4=2 1735854322

Q) How can I accomplish this?

I’m aware of both of the following, but don’t seem to meet my use case with dynamic tags AND batching in array:

Thanks so much in advance! :slight_smile:

REF: https://github.com/influxdata/telegraf/issues/16977


KEYWORDS:

  • Dynamic Tag Set Parsing
  • Parsing Dynamic Tags in Batched Array

A solution with xpath_json, which achieves both goals: dynamic tags AND batching in array.

[inputs.raw.kafka_consumer]
kafka_version = "3.5.1"
topics = [ "TOPICNAME" ]
consumer_group = "CONSUMERGROUPNAME"
enable_tls = true
sasl_username = "${MSK_USERNAME}"
sasl_password = "${MSK_PASSWORD}"
sasl_mechanism = "SCRAM-SHA-512"
data_format = "xpath_json"
xpath_native_types = true

  [[inputs.raw.kafka_consumer.xpath_json]]
  metric_name = "/measurement"
  metric_selection = "/batches/*"
  timestamp = "child::time"
  timestamp_format = "unix"
  field_selection = "child::fields/*"
  tag_selection = "child::tags/*"

Also good solution from Sven Rebhan. It’s better because xpath_json is being deprecated.

[[inputs.kafka_consumer]]
  ...

  data_format = "xpath_json"
  xpath_native_types = true
  [[inputs.kafka_consumer.xpath]]
    metric_name = "/measurement"
    metric_selection = "/batches/*"
    field_selection = "fields/*"
    tag_selection = "tags/*"
    timestamp = "time"
    timestamp_format = "unix"