Telegraf inputs kafka consumer

Hi all. I am having kafka brokers as input for telegraf. I have two setups and there are 2 brokers per setup, i wanted to differentiate data based on setup hence using two kafka consumer plugin. Now even when logs are coming, the telegraf don’t push data into influxdb, but when i restart telegraf container. I pushes the metrics. I am having following configurations -

[[inputs.kafka_consumer]]
  brokers = ["${IIT_BROKER_ONE}","${IIT_BROKER_TWO}"]
  topics = ["${BROKER_TOPIC}"]
  offset = "newest"
  balance_strategy = "roundrobin"
  max_message_len = 1000000
  max_undelivered_messages = 1000
  consumer_group = "iit_metrics_consumers"
  ## Data format to consume.
  data_format = "json"
  json_time_key = "beginTime"
  json_time_format = "unix"
  tag_keys = [
   "swVersion",
   "senderType",
   "VNFID",
   "RUMAC",
   "GNBNAME",
   "cluster_id"
  ]
  json_string_fields = ["counters_*"]
  name_override = "${Measurement}"
  interval = "3s"
  [inputs.kafka_consumer.tags]
    setup = "${SETUP_TAG_ONE}"

[[inputs.kafka_consumer]]
  brokers = ["${SVT_BROKER_ONE}","${SVT_BROKER_TWO}"]
  topics = ["Onecell_PM_Data_Stream"]
  offset = "newest"
  balance_strategy = "roundrobin"
  consumer_group = "svt_metrics_consumers"
  max_message_len = 1000000
  max_undelivered_messages = 1000
  ## Data format to consume.
  data_format = "json"
  json_time_key = "beginTime"
  json_time_format = "unix"
  tag_keys = [
   "swVersion",
   "senderType",
   "VNFID",
   "RUMAC",
   "GNBNAME",
   "cluster_id"
  ]
  json_string_fields = ["counters_*"]
  name_override = "${Measurement}"
  interval = "6s"
  [inputs.kafka_consumer.tags]
    setup = "${SETUP_TAG_TWO}"
  

What changes should i do here to make sure data is going, without being need to restart container. I am using ec2 instance with telegraf, influxdb as docker container. I am confused with offset and balance strategy, can someone explain for my current configurations how data will be going to influxdb and what changes i need to make there in order to solve my problem.

Hello @Pratik_Das_Baghel,
What errors are you getting when telegraf doesn’t push data?

It doesn’t show any error. Can you explain how these offset = oldest and newest behaves.

Hello @Pratik_Das_Baghel,
I’m not sure. @popey can you help here please?

Hi. I was getting this error -

abandoned subscription to topic because consuming was taking too long.

Any solution for this?

@Pratik_Das_Baghel maybe something here could help?

Does it help if you set metric_batch_size = 1000 in the agent and max_undelivered_messages = 2000 in the kafka_consumer?
Maybe try commenting there as well?