Telegraf kafka Output Producer: Does it support batching?


#1

My interpretation of the output plugin is it is making a sendmessage call for each individual metric, but I what I can be sure is if there is any way to govern how the underlying Kafka producer batches these. I am getting the impression we are getting an individual call per metric to our Kafka brokers.

func (k *Kafka) Write(metrics []telegraf.Metric) error {
    if len(metrics) == 0 {
        return nil
    }

    for _, metric := range metrics {
        buf, err := k.serializer.Serialize(metric)
        if err != nil {
            return err
        }

        topicName := k.GetTopicName(metric)

        m := &sarama.ProducerMessage{
            Topic: topicName,
            Value: sarama.ByteEncoder(buf),
        }
        if h, ok := metric.Tags()[k.RoutingTag]; ok {
            m.Key = sarama.StringEncoder(h)
        }

        _, _, err = k.producer.SendMessage(m)

        if err != nil {
            return fmt.Errorf("FAILED to send kafka message: %s\n", err)
        }
    }
    return nil
}```

#2

Seems like this could be re-written to use SendMessages to support batching.


#3

I think this could be helpful, could you test the performance with this change?

There appears to be a few avenues we can pursue for improving the performance of this output, but this would be one of the safest changes. A few other ideas have been batted around such as using the AsyncProducer #4150 or batching multiple metrics per message #4071.