Security between kafka and telegraf

Hi Guys,

I have a question about the security between kafka and telegraf.

Per my investigation, I know we can set up ssl security between kafka producer and consumer, but telegraf kafka consumer plugin currently doesn’t support that kind of security.

I’d like to modify source code of this plugin (telegraf/kafka_consumer.go at master · influxdata/telegraf · GitHub) to support any security (SSL or TSL, etc.). I’m new to kafka and telegraf, is the “config” in below code the correct part I need to modify? Since I see we are able to pass a config param in kafka consumer for the security. --broker-list localhost:9093 --topic test --producer.config --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config

If config is not the correct part, how can I implement the security between Kafka and Telegraf?

func (k *Kafka) Start(acc telegraf.Accumulator) error {
defer k.Unlock()
var consumerErr error
k.acc = acc
    config := consumergroup.NewConfig()
config.Zookeeper.Chroot = k.ZookeeperChroot
switch strings.ToLower(k.Offset) {
case "oldest", "":
	config.Offsets.Initial = sarama.OffsetOldest
case "newest":
	config.Offsets.Initial = sarama.OffsetNewest
	log.Printf("I! WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n",
	config.Offsets.Initial = sarama.OffsetOldest

if k.Consumer == nil || k.Consumer.Closed() {
	k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
	if consumerErr != nil {
		return consumerErr

	// Setup message and error channels = k.Consumer.Messages()
	k.errs = k.Consumer.Errors()

k.done = make(chan struct{})

// Start the kafka message reader
go k.receiver()
log.Printf("I! Started the kafka consumer service, peers: %v, topics: %v\n",
	k.ZookeeperPeers, k.Topics)
return nil


I think there is a pull request open for this feature: If this will work for you it would be great if you could test it out and report back on the pull request.