Parsing string values in a confusing JSON payload

I’ve run into a challenging data parsing problem, and would appreciate any help from more expert users here. How can I use telegraf to parse the only string value of a JSON payload, and split it into two float data points?

Background
To monitor the status of a vehicle I keep across the country for family adventures to the National Parks, I’m using a Teltonika FMM00A OBD-GPS device. It measures time and location from GPS, reads statistics from the car’s OBD port, and transmits to a user specified MQTT server.

I’m attempting to use telegraf to pull this data from the MQTT server, and store it in a an influxdb2 server I run, allowing it to be visualized in grafana. I’m currently running telegraf instances for other data that’s stored and visualized. The format of these OBD sensors is causing me an issue I’ve been unable to resolve.

Details
The data from the OBD sensor looks like (carriage returns added for readability):

MQTT Topic: device_imei / data

{"state":{"reported":{"ts":1720354973000,
"pr":0,"latlng":"XX.905603,-YY.399940",
"alt":132,"ang":189,"sat":7,"sp":0,"evt":240,
"239":0,"240":0,"21":5,"200":0,"69":1,"181":17,
"182":14,"66":13434,"67":4047,"68":0,"241":310260,
"16":408}}}

(XX and YY are used to obscure actual device location.)

Telegraf Config
My telegraf config reads all but one of these values appropriately, and of course it’s the one I’m most interested in: latlng. There’s no configuration option on the device to change the format of this JSON payload, nor to have it transmit latlng as separate floats rather than a single string.

# Consume messages from Teltonika GPS OBD via MQTT
#

# Global tags can be specified here in key="value" format.
[global_tags]

# Configuration for telegraf agent
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  # debug = false

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = true

[[outputs.influxdb_v2]]
  urls = ["https://influx_server"]
  token = "token_here"
  organization = "org"
  bucket = "teltonika_gps_mqtt"

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  servers = ["ssl://mqtt_server:8883"]
  topics = [
    "device_imei/#",
  ]
  client_id = "telegraf_teltonika"
  username = "user_name"
  password = "password"
  data_format = "json"

For reasons I don’t understand, I can only use data_format = "json". When I attempt to use json_v2, telegraf never posts the data to the influxdb2 bucket.

Problem
I think the solution is to parse the string into two different floats (for lat and lng), but I’m stuck at how to do that. This post on splitting mqtt json seems to depend on using the json_v2 data format, as does this guide on payload parsing.

Any suggestions or advice? Thank you in advance.

Hello @johnstonjs,
Yes you’re right youll want to use json v2. And I think potentially the starlark processor?

# Configuration for telegraf agent
[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  omit_hostname = true

[[outputs.influxdb_v2]]
  urls = ["https://influx_server"]
  token = "token_here"
  organization = "org"
  bucket = "teltonika_gps_mqtt"

# Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
  servers = ["ssl://mqtt_server:8883"]
  topics = ["device_imei/#"]
  client_id = "telegraf_teltonika"
  username = "user_name"
  password = "password"
  data_format = "json_v2"

  [[inputs.mqtt_consumer.json_v2]]
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "state.reported"
      tags = true

# Use starlark to process the latlng field
[[processors.starlark]]
  source = '''
def apply(metric):
    if "latlng" in metric.fields:
        latlng = metric.fields["latlng"]
        lat, lng = latlng.split(",")
        metric.fields["latitude"] = float(lat)
        metric.fields["longitude"] = float(lng)
        del metric.fields["latlng"]
    return metric
  '''

Unless @jpowers can think of something more simple?

@Anaisdg

Thank you! I think that’s a big step in the right direction.

Without the starlark processor, I switched over to using json_v2 and all the data is ingesting into influxdb, with the latlng data as a string. Also, my data fields no longer start with state_reported_, so thanks for helping fix that.

Attempting to use Starlark to split and convert to floats, I’m given an error which other forum threads indicate is a spacing/indent issue. Unfortunately, I’ve tried countless variations on spacing of your suggested configuration and can’t get it to run. Any recommendations on this error:

[telegraf] Error running agent: could not initialize processor 
starlark: :7:12: got illegal token, want primary expression

@Anaisdg,

Thanks to your help, and the post from @simonsmart9 I figured this out after a few hours of focus.

  [[inputs.mqtt_consumer.json_v2]]
    [[inputs.mqtt_consumer.json_v2.object]]
      path = "state.reported"
      tags = ["true"]

# Use starlark to process the latlng field from comma separated string
# into two float values
[[processors.starlark]]
source = '''
def apply(metric):
    latlng = metric.fields['latlng']
    metric.fields['latitude'] = float(latlng.split(",")[0])
    metric.fields['longitude'] = float(latlng.split(",")[1])
    return metric
'''