I have set up a Kafka Broker and a producer correctly, I can send and receive info from the topic I have created. I’ve serialized my JSON data correctly as well. I’m trying to set up InfluxDB as a Kafka consumer via Telegraf. The issue I’m having is in Telegraf trying to receive the JSON Data.
My data is consisted of the following:
from random import randint
def get_data():
# Create random values
id_m= 1
tp = randint(70, 85) * 0.01
tc = randint(60,76) * 0.01
td = randint(78, 90) * 0.01
trs= tptctd100
return {
“id”: id_m,
“Tp”: tp100,
“Td”: tc100,
“Tc”: td100,
“Trs”: round(trs,2)
} #Returns an array of information.
My Producer is the following:
from kafka import KafkaProducer
import json
from data import get_data
import time
def json_serializer(data):
return json.dumps(data).encode(“utf-8”)
producer = KafkaProducer(bootstrap_servers=[‘192.168.137.1:9092’],
value_serializer=json_serializer)
if name == “main”:
while 1 == 1:
# Serializing json
demo_topic = json.dumps(get_data())
print(demo_topic)
producer.send(“DemoDay”, demo_topic)
time.sleep(0.2)
My data is correctly sent and received.
my Telegraf.conf file has the following parameters for the Kafka_consumer
[[inputs.kafka_consumer]]
brokers = [“192.168.137.1:9092”]
topics = [“DemoDay”] ##name of your topic
max_message_len = 1000000
data_format = “json_v2”
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “id”
optional= true
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “Tp”
optional= true
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “Td”
optional= true
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “Tc”
optional= true
[[inputs.kafka_consumer.json_v2]]
[[inputs.kafka_consumer.json_v2.field]]
path = “Trs”
optional= true
When i try to start my containers using docker i receive the following error :
2022-06-13T11:08:07Z I! Using config file: /etc/telegraf/telegraf.conf
2022-06-13T11:08:07Z I! Starting Telegraf 1.22.4
2022-06-13T11:08:07Z I! Loaded inputs: kafka_consumer
2022-06-13T11:08:07Z I! Loaded aggregators:
2022-06-13T11:08:07Z I! Loaded processors:
2022-06-13T11:08:07Z I! Loaded outputs: influxdb_v2
2022-06-13T11:08:07Z I! Tags enabled: host=13aedb3302c6
2022-06-13T11:08:07Z I! [agent] Config: Interval:1s, Quiet:false, Hostname:“13aedb3302c6”, Flush Interval:1s
2022-06-13T11:08:09Z E! [inputs.kafka_consumer] Error in plugin: the path id doesn’t exist
2022-06-13T11:08:09Z E! [inputs.kafka_consumer] Error in plugin: the path id doesn’t exist
2022-06-13T11:08:09Z E! [inputs.kafka_consumer] Error in plugin: the path id doesn’t exist
2022-06-13T11:08:10Z E! [inputs.kafka_consumer] Error in plugin: the path id doesn’t exist