Output to Kafka

Hello, we are trying to configure Telegraf to connect to Confluent Cloud using OAUTHBEARER. I see this is supported in the plugins/output/kafka readme but have been unsuccessful in this.

I see these configs here but why is there a token configuration? telegraf/plugins/outputs/kafka/README.md at master · influxdata/telegraf (github.com)

Is this connection expected to be manually updated with a new token each time it is expired or is there a way to handle this automatically through additional configs?

Thanks,

@grades212 as the documentation says, you can use secret-stores with the sasl_access_token option. This allows for example to refresh oauth2 tokens automatically without manual interaction…

How to you get your token? Which issues do you see with using secret-stores in this use-case?

@srebhan, secret-stores do not do much more than hiding the secrets value from the config. Typically, Kafka clients can handle the token refresh on their own. In this case what I believe you are suggesting is an external application/process that updates a secrets value on some interval.

Here is an example config for a Kafka Producer to connect to Kafka using OAUTH. This does not seem supported in telegraf since it does not offer the required config parameters.

bootstrap.servers=<BROKER LIST>
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=<TOKEN_URL>
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='<USERNAME>' \
    scope='api://<TOKEN SCOPE>/.default' \
    clientSecret='<PASSWORD>' \
    extension_logicalCluster='<CLUSTER_ID>' \
    extension_identityPoolId='<POOL_ID>';

@srebhan, secret-stores do not do much more than hiding the secrets value from the config.

Ok then obviously I don’t have any idea about secret-stores despite I wrote that stuff and the documentation says otherwise. Anyway, thanks for letting me know!

Typically, Kafka clients can handle the token refresh on their own. In this case what I believe you are suggesting is an external application/process that updates a secrets value on some interval.

So does Telegraf through secret-stores, because we do not want to scatter token retrival and refresh across 200+ plugins.

Sorry for the sarcasm but you could have just read the documents I linked to or have asked on how to use it…

The original comment and documentation links do not have information to configure this. We will try to implement this using the information in your latest link.

@srebhan, we did get this to work with OAUTH. Thank you for sharing the additional documentation.

One thing we did find is that there is a small typo here telegraf/plugins/outputs/kafka/README.md at master · influxdata/telegraf (github.com). Note, this could be a typo specific to connecting to Confluent Cloud.

To successfully connect to Confluent Cloud using OAUTH the sasl_extensions config needs to contain ‘identityPoolId’ instead of just ‘poolId’ as the documentation mentions.

Again,
Thanks for your help we got this to work.

Cool. For the typo, would you be willing to put up a pull-request to correct it? This would give you the credits you deserve. :slight_smile: