the message from mqtt like this
{
“fId”:1001,
“time”: 1725386399,
“items”: [
{
“n”: “NACLO_BJL_NBJL_JYF2_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LH_BLH_BYCYF1_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LQJL_JYF2_GDW_DI”,
“v”: 0.0
}
]
}
add a new field “source” with the prefix “1001_” ,then hope kafka get the message is
{
“fId”: 1001
“source”: “1001_1001”,
“time”: 1725386399,
“items”: [
{
“n”: “NACLO_BJL_NBJL_JYF2_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LH_BLH_BYCYF1_GDW_DI”,
“v”: 1.0
},
{
“n”: “NACLO_LQJL_JYF2_GDW_DI”,
“v”: 0.0
}
]
}
my conf is like this
[agent]
interval = “3s”
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = “0s”
flush_interval = “1s”
flush_jitter = “0s”
precision = “”
debug = true
quiet = false
logfile = “D:/go/telegraf-log/telegraf.log”
logfile_rotation_max_size=104857600
hostname = “”
omit_hostname = false
[[outputs.file]]
files = [“stdout”]
data_format = “json”
json_transformation = ‘’’
fields.kafka_payload
‘’’
[[outputs.kafka]]
URLs of kafka brokers
brokers = [“39.108.50.42:39092”]
topic = “bigdata_2025”
routing_tag = “host”
data_format = “json”
json_transformation = ‘’’
fields.kafka_payload
‘’’
[[inputs.mqtt_consumer]]
servers = [“tcp://10.1.86.168:1883”]
topics = [“factory/data/#”
]
qos = 0
connection_timeout = “60s”
client_id = “big_factory123”
username = “thirduser”
password = “swkj_123#”
data_format = “value”
data_type = “string”
[[processors.starlark]]
source = ‘’’
load(“json.star”, “json”)
def apply(metric):
if ‘value’ in metric.fields :
data = json.decode(metric.fields[‘value’])
fId = data.get(‘fId’, 0)
fid = (“0000” + str(fId))[-4:]
time = int(data.get(‘time’, 0))
items = data.get(‘items’, )
serialized_items =
for item in items:
if type(item) == ‘dict’:
# 确保每个元素的 ‘v’ 是数值类型
value = item.get(‘v’, ‘’)
if type(value) == ‘string’:
value = float(value)
serialized_items.append({‘n’: item.get(‘n’, ‘’),‘v’: value})
output = {“fId”: fId,“source”: “1001_” + fid,“time”: time,“items”: items }
metric.fields.clear()
metric.fields[“kafka_payload”] = json.encode(output)
return metric
‘’’
I hope get the message is json ,not string