Telegraf Kafka Consumer - needs to be restarted

I am using Kafka Consumer as a input. One issue I am facing is that, from source side when data is being stopped into Kafka for few hours and then its started again. On telegraf side, it is not dynamically fetching the data. I need to restart it again, in order to get the data otherwise no data comes until restart. I am using 1.23.0 version and following is the configuration:

 [[inputs.kafka_consumer]]
   brokers = ["provided"]
   topics = ["provided"]
   offset = "oldest"
  max_processing_time = "100s"
   #balance_strategy = "roundrobin"
  # max_message_len = 1000000
   #max_undelivered_messages = 2000
   consumer_group = "consumer_old_cloud"
   ## Data format to consume.
   data_format = "json"
   json_time_key = "beginTime"
   json_time_format = "unix"
   tag_keys = [
    provided
   ]
   json_string_fields = ["counters_*"]
   name_override = "gnb_om_oldest"
   interval = "1s"
 

Anything I need to change to make that work?

A couple thoughts:

  1. There was a known issue with loosing connections if something happened to the kafka server in versions v1.22.1 through v1.23.0. It was fixed in v1.23.1.
  2. I would suggest running with debug mode and see what the sarama library is reporting and if you can report that here.

Hope that helps!

Hi, encountered strange behaviour on telegraf. I am also using a bunch of processor plugins which worked fine for other kafka brokers I have access to. Never faced any issue like this.
So, when I use offset as “oldest” with processor plugin and without any processor plugin data comes for some time then it stops again however, when I am using offset as “newest” without any processor plugin data is coming continously, but with processor plugin; every time only 5-6 data points are coming and it stops until I restart the container again and same cycle follows.

Any idea why this is happening. This is the sarama logs, just after last datapoint got inserted and then stopped.

2022-09-21T06:20:12Z D! [sarama] client/metadata fetching metadata for all topics from broker 10.15.244.100:31509
2022-09-21T06:20:12Z D! [outputs.influxdb_v2] Buffer fullness: 0 / 20000 metrics
2022-09-21T06:20:12Z D! [sarama] client/metadata got error from broker -1 while fetching metadata: EOF
2022-09-21T06:20:12Z D! [sarama] Closed connection to broker 10.15.244.100:31509
2022-09-21T06:20:12Z D! [sarama] client/metadata fetching metadata for all topics from broker 10.15.244.100:31510
2022-09-21T06:20:12Z D! [sarama] Connected to broker at 10.15.244.100:31510 (unregistered)
2022-09-21T06:20:13Z D! [sarama] client/brokers replaced registered broker #1 with 10.15.244.3:31510
2022-09-21T06:20:13Z D! [sarama] Connected to broker at 10.15.244.3:31510 (registered as #1)
2022-09-21T06:20:13Z D! [sarama] Closed connection to broker 10.15.244.2:31509
2022-09-21T06:20:13Z D! [sarama] consumer/broker/1 disconnecting due to error processing FetchRequest: kafka: broker not connected
2022-09-21T06:20:13Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming topic-name-1/0: kafka: broker not connected
2022-09-21T06:20:13Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming topic-name-2/0: kafka: broker not connected
2022-09-21T06:20:14Z D! [sarama] consumergroup/session/Telegraf-6580bc57-aa64-406d-97ae-5aab96693c89/7 heartbeat loop stopped
2022-09-21T06:20:14Z D! [sarama] consumergroup/consumer_new_cloud loop check partition number coroutine will exit, topics [topic-name-1 topic-name-2]
2022-09-21T06:20:14Z D! [sarama] consumergroup/session/Telegraf-6580bc57-aa64-406d-97ae-5aab96693c89/7 released
2022-09-21T06:20:14Z D! [sarama] client/metadata fetching metadata for [topic-name-1 topic-name-2] from broker 10.15.244.100:31510
2022-09-21T06:20:14Z D! [sarama] client/coordinator requesting coordinator for consumergroup consumer_new_cloud from 10.15.244.100:31510
2022-09-21T06:20:15Z D! [sarama] client/coordinator coordinator for consumergroup consumer_new_cloud is #1 (10.15.244.3:31510)
2022-09-21T06:20:15Z D! [sarama] consumer/broker/1 accumulated 2 new subscriptions
2022-09-21T06:20:15Z D! [sarama] consumer/broker/1 added subscription to topic-name-1/0
2022-09-21T06:20:15Z D! [sarama] consumer/broker/1 added subscription to topic-name-2/0

On Processor plugins I am using in the same order. The frequency of data is every 5 minutes:

  1. Unpivot
  2. starlark
  3. string replace
  4. regex
  5. converter