Parse XML data using telegraf

<root>  
<measEntity localDn="VNFID=bangloreiit-5005cucp1; gNBID=5005; gNBName=;" swVersion="6.0.00.49.6"/>
  <measData>
    <measInfo measInfoId="OR.end">
      <granPeriod duration="PT300S" endTime="2021-07-02T17:10:00+00:00"/>
      <measTypes>SgnbAddAttemptCell.enb.0 SgnbAddAttemptCell.enb.1 SgnbAddAttemptCell.enb.2 SgnbAddAttemptCell.enb.3</measTypes>
        <measResults>0 0 0 0</measResults>
</measInfo>
</measData>
</root>

Above is my xml data format. I wanted to parse it using telegraf. I am able to get measurement, but currently i am facing 2 challenges:

  1. To get Tag name - measEntity will provide tag name; eg - tag VNFID with value bangaloreiit-5005cucp1 and so on. But i am not able to get the tags.

  2. Field name - Using measTypes i want to get the fieldnames which are separated by space- eg - SgnbAddAttemptCell.enb.0, SgnbAddAttemptCell.enb.1 and so on and their respective values inside measResults portion.

Can someone help me out here. I am referring the following link for reference - How to Parse Your XML Data with Telegraf | InfluxData

I think that example is not valid xml because some closing tags are missing?

I pasted small portion from large xml file, hence missed ending part. Now i have updated it.

In my opinion, the XML data is very poorly structured. I don’t know where the XML data comes from, but I would first try to solve the problem at the source and make sure that better structured XML data is delivered.

Otherwise it will be a difficult task to parse the XML data. For meastypes and measResults you will need a custom processors.execd script to split that up. I don’t think this is possible with the xml parser alone.

This could be a starting point. However, in my opinion, the fields cannot be parsed with the xml parser alone in the way you want it.

[[inputs.file]]
  name_override = "xml"
  files = ["xmlparser.xml"]
  data_format = "xml"
  [[inputs.file.xml]]
    [inputs.file.xml.tags]
      name = "substring-after(substring-before(/root/measEntity/@localDn, ';'), '=')"
    [inputs.file.xml.fields]
      measTypes = "string(/root/measData/measInfo/measTypes)"
      measResults = "string(/root/measData/measInfo/measResults)"

[[outputs.file]]  # only for debugging
  files = ["xmlparser.out"]
  influx_sort_fields = true

This is a solution with a processors.execd script in Python.
See below, first the Telegraf config and then the python script.

[[inputs.file]]
  name_override = "xml"
  files = ["xmlparser.xml"]
  data_format = "xml"
  [[inputs.file.xml]]
    [inputs.file.xml.tags]
      name = "substring-after(substring-before(/root/measEntity/@localDn, ';'), '=')"
    [inputs.file.xml.fields]
      measTypes = "string(/root/measData/measInfo/measTypes)"
      measResults = "string(/root/measData/measInfo/measResults)"

[[processors.execd]]
  namepass = ["xml"]
  command = ["python", "xmlparser.py"]

[[outputs.file]]  # only for debugging
  files = ["xmlparser.out"]
  influx_sort_fields = true

from influxdb_client import Point
from line_protocol_parser import parse_line

while True:
    try:
        input_line = input()  # read from stdin
    except EOFError:  # catch EOF error
        break
    except KeyboardInterrupt:  # catch KeyboardInterrupt
        break
    else:
        data = parse_line(input_line)  # parse input line
        measTypes = data['fields'].get('measTypes')
        measResults = data['fields'].get('measResults')
        if measTypes and measResults:
            measTypesList = measTypes.strip().split()
            measResultsList = measResults.strip().split()
            if len(measTypesList) == len(measResultsList):
                for i in range(len(measTypesList)):
                    data['fields'].update({measTypesList[i]: int(measResultsList[i])})  # update field
                # drop original fields
                data['fields'].pop('measTypes')
                data['fields'].pop('measResults')
                point = Point.from_dict(data)  # metric from dict
                print(point.to_line_protocol())  # write to stdout

And here is a solution with a processors.execd plugin in Go.
See below, first the Telegraf config and then the Go program.

[[inputs.file]]
  name_override = "xml"
  files = ["xmlparser.xml"]
  data_format = "xml"
  [[inputs.file.xml]]
    [inputs.file.xml.tags]
      name = "substring-after(substring-before(/root/measEntity/@localDn, ';'), '=')"
    [inputs.file.xml.fields]
      measTypes = "string(/root/measData/measInfo/measTypes)"
      measResults = "string(/root/measData/measInfo/measResults)"

[[processors.execd]]
  namepass = ["xml"]
  command = ["xmlparser.exe"]  # compiled go file

[[outputs.file]]  # only for debugging
  files = ["xmlparser.out"]
  influx_sort_fields = true

package main

import (
	"fmt"
	"os"
	"strconv"
	"strings"
	"github.com/influxdata/telegraf/plugins/parsers/influx"
	"github.com/influxdata/telegraf/plugins/serializers"
)

var measTypesList []string
var measResultsList []string

func main() {
	parser := influx.NewStreamParser(os.Stdin)
	serializer, _ := serializers.NewInfluxSerializer()

	for {
		metric, err := parser.Next()
		if err != nil {
			if err == influx.EOF {
				return // stream ended
			}
			if parseErr, isParseError := err.(*influx.ParseError); isParseError {
				fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
				os.Exit(1)
			}
			fmt.Fprintf(os.Stderr, "ERR %v\n", err)
			os.Exit(1)
		}

		measTypes, err1 := metric.GetField("measTypes")
		measResults, err2 := metric.GetField("measResults")
		if err1 && err2 {
			measTypesList = strings.Fields(fmt.Sprintf("%s", measTypes))
			measResultsList = strings.Fields(fmt.Sprintf("%s", measResults))
			if len(measTypesList) == len(measResultsList) {
				for i := 0; i < len(measTypesList); i++ {
					intValue, err3 := strconv.Atoi(measResultsList[i])
					if err3 == nil {
						metric.AddField(measTypesList[i], intValue)
					} else {
						fmt.Fprintf(os.Stderr, "ERR %v\n", err3)
						os.Exit(1)
					}
				}
				metric.RemoveField("measTypes")   // drop original fields
				metric.RemoveField("measResults") // drop original fields
				out, err4 := serializer.Serialize(metric)
				if err4 != nil {
					fmt.Fprintf(os.Stderr, "ERR %v\n", err4)
					os.Exit(1)
				}
				fmt.Fprint(os.Stdout, string(out))
				measTypesList = nil   // clear array
				measResultsList = nil // clear array
			}
		}
	}
}