Telegraf from mqtt to kafa keep json message

the message from mqtt like this
{
“fId”:1001,
“time”: 1725386399,
“items”: [
{
“n”: “NACLO_BJL_NBJL_JYF2_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LH_BLH_BYCYF1_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LQJL_JYF2_GDW_DI”,
“v”: 0.0
}
]
}
add a new field “source” with the prefix “1001_” ,then hope kafka get the message is
{
“fId”: 1001
“source”: “1001_1001”,
“time”: 1725386399,
“items”: [
{
“n”: “NACLO_BJL_NBJL_JYF2_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LH_BLH_BYCYF1_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LQJL_JYF2_GDW_DI”,
“v”: 0.0
}
]
}
my conf is like this

[agent]
interval = “3s”
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = “0s”
flush_interval = “1s”
flush_jitter = “0s”
precision = “”
debug = true
quiet = false
logfile = “D:/go/telegraf-log/telegraf.log”
logfile_rotation_max_size=104857600

hostname = “”
omit_hostname = false

[[outputs.file]]
files = [“stdout”]
data_format = “json”
json_transformation = ‘’’
fields.kafka_payload
‘’’

[[outputs.kafka]]

URLs of kafka brokers

brokers = [“39.108.50.42:39092”]
topic = “bigdata_2025”
routing_tag = “host”
data_format = “json”
json_transformation = ‘’’
fields.kafka_payload
‘’’

[[inputs.mqtt_consumer]]
servers = [“tcp://10.1.86.168:1883”]

topics = [“factory/data/#”
]

qos = 0

connection_timeout = “60s”

client_id = “big_factory123”

username = “thirduser”
password = “swkj_123#”

data_format = “value”
data_type = “string”

[[processors.starlark]]
source = ‘’’
load(“json.star”, “json”)
def apply(metric):
if ‘value’ in metric.fields :
data = json.decode(metric.fields[‘value’])
fId = data.get(‘fId’, 0)
fid = (“0000” + str(fId))[-4:]
time = int(data.get(‘time’, 0))
items = data.get(‘items’, )
serialized_items =
for item in items:
if type(item) == ‘dict’:
# 确保每个元素的 ‘v’ 是数值类型
value = item.get(‘v’, ‘’)
if type(value) == ‘string’:
value = float(value)
serialized_items.append({‘n’: item.get(‘n’, ‘’),‘v’: value})

output = {“fId”: fId,“source”: “1001_” + fid,“time”: time,“items”: items }

metric.fields.clear()
metric.fields[“kafka_payload”] = json.encode(output)
return metric
‘’’

I hope get the message is json ,not string

The main issue is in your Starlark processor script. Let me provide a corrected configuration:

[[processors.starlark]]
source = '''
load("json.star", "json")

def apply(metric):
    if 'value' in metric.fields:
        # Decode the incoming JSON string
        data = json.decode(metric.fields['value'])
        
        # Get the fId and create the padded version
        fId = data.get('fId', 0)
        fid_str = str(fId)
        padded_fid = ("0000" + fid_str)[-4:]
        
        # Add the new source field
        data["source"] = "1001_" + fid_str
        
        # Convert the modified data back to JSON
        metric.fields.clear()
        metric.fields["kafka_payload"] = json.encode(data)
    
    return metric
'''

The key changes I made:

  1. Removed the incomplete serialized_items code that wasn’t doing anything
  2. Used the original fId value directly for the source field instead of the padded version
  3. Fixed the JSON handling to ensure the entire structure remains intact
  4. Removed unnecessary type conversions for the items array

Make sure your Kafka output is properly configured to use the kafka_payload field, which your configuration already includes with the json_transformation setting.

Thanks for your reply.
In fact, the type of “kafka_payload” in the metric’s fields from startlark result is also string. and then using the json_transformation setting method does not solve the problem of strings becoming json. What should I do to get the json? I try to use eval or parseJson in json_transformation ,but cannot call non-function

If you want Telegraf to send real JSON objects instead of strings that look like JSON, you’ll need to make sure the data format and serialization settings are properly configured. Here’s a corrected example of how your config should look:

[agent]
  interval = "3s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "1s"
  flush_jitter = "0s"
  precision = ""
  debug = true
  quiet = false
  logfile = "D:/go/telegraf-log/telegraf.log"
  logfile_rotation_max_size=104857600
  hostname = ""
  omit_hostname = false

[[outputs.file]]
  files = ["stdout"]
  data_format = "json"

[[outputs.kafka]]
  # URLs of kafka brokers
  brokers = ["39.108.50.42:39092"]
  topic = "bigdata_2025"
  routing_tag = "host"
  data_format = "json"

[[inputs.mqtt_consumer]]
  servers = ["tcp://10.1.86.168:1883"]
  topics = ["factory/data/#"]
  qos = 0
  connection_timeout = "60s"
  client_id = "big_factory123"
  username = "thirduser"
  password = "swkj_123#"
  data_format = "json_v2"
  
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "mqtt_data"
    timestamp_path = "time"
    timestamp_format = "unix"
    
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "fId"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "items"

[[processors.starlark]]
  namepass = ["mqtt_data"]
  source = '''
load("json.star", "json")

def apply(metric):
    # Get the original data
    fId = metric.fields.get("fId", 0)
    items = metric.fields.get("items", [])
    timestamp = metric.time
    
    # Create the new metric with the desired structure
    new_metric = Metric("kafka_message")
    new_metric.time = timestamp
    
    # Add fields that will become the JSON structure
    new_metric.fields["fId"] = fId
    new_metric.fields["source"] = "1001_" + str(fId)
    new_metric.fields["time"] = int(timestamp.unix_nano / 1000000000)
    new_metric.fields["items"] = items
    
    return new_metric
'''

This approach handles your JSON data natively throughout the pipeline, ensuring that when it’s sent to Kafka, it remains as a proper JSON object with the structure you specified.

If you encounter any issues with this configuration, you might need to adjust the inputs.mqtt_consumer.json_v2 section based on the exact structure of your incoming data.