MQTT consumer connection timeout setting not working

Hi,

currently I’m testing a MQTT setup with mosquitto as MQTT broker and Arduinos as clients.
Simulating connection drops, e.g. WiFi and internet unavailability I want to make sure that buffered data on the client side is still making its way into the influxdb database. Even if the MQTT broker is unavailable for some time (maintenance).
Just reading messages via mosquitto_sub -L mqtt://user:pw@host:1883/topic makes sure connection never times out and if the client is able to re-connect to the broker it receives all the buffered messages.

Telegraf however always gives me tcp connection timeouts, e.g.

[inputs.mqtt_consumer] Error in plugin: connection lost: EOF
[inputs.mqtt_consumer] Error in plugin: network Error : dial tcp 127.0.0.1:1883: connect: connection refused
[inputs.mqtt_consumer] Connected [mqtt://127.0.0.1:1883]

I want to understand the setting connection_timeout within the [[inputs.mqtt_consumer]] section, as I thought this would give me some time until the tcp connection eventually times out.

This is my current config:

[[inputs.mqtt_consumer]]
  servers = ["mqtt://127.0.0.1:1883"]
  username = "user"
  password = "pw"
  topics = ["topic/#"]
  data_format = "json_v2"
  qos = 2
  connection_timeout = "55s"
  max_undelivered_messages = 1000
  persistent_session = true
  client_id = "influx-telegraf"

Can somebody give me some hints or tell me what I’m missing out?
Thanks a lot!

I would not say those log messages are timeout. The connection realizes that the entire network has gone down and the connection severed, hence the “connection lost” message.

I want to understand the setting connection_timeout within the

The connection_timeout setting is only used when making the initial connection, see here.

If you want to know even more about the client itself, then add client_trace = true to your mqtt config, assuming you are on a new-enough version. It will show you every debug message from the client library that Telegraf users and should provide even more details as to what the client is seeing and reacting to.

Thanks for taking your time to reply.

Unfortunately setting client_trace = true does not give me more insights, just more Error in plugin: network Error : dial tcp 127.0.0.1:1883: connect: connection refused messages.
Do I also need to change overall verbosity levels / logging settings in order to see more verbose log output?

This error is just due to the unavailability of the MQTT broker, which was shut down for a couple of seconds for testing connection reliability.

Once the MQTT broker is back alive Telegraf of course automatically reconnects, however it missed all those messages which were sent by the client in the meantime.
I would have assumed that I can realize the same behaviour with Telegraf, such as with mosquitto_sub -L mqtt://user:pw@host:1883/topic, which just blocks and never times out, hence it’s always able to receive all messages the client buffered until the MQTT broker is back alive.

Do you have any idea how I could realize the same behaviour with Telegraf by blocking on the network socket to not miss any messages immediately sent after MQTT broker is back alive?

Apparently I had to enable more verbose logging by debug = true to get a more verbose output.
This is the output once the MQTT broker is shut down:

[inputs.mqtt_consumer] [client]  Connect comms goroutine - error triggeredEOF
[inputs.mqtt_consumer] [client]  internalConnLost called
[inputs.mqtt_consumer] [client]  stopCommsWorkers called
[inputs.mqtt_consumer] [router]  matchAndDispatch exiting
[inputs.mqtt_consumer] [pinger]  keepalive stopped
[inputs.mqtt_consumer] [client]  startCommsWorkers output redirector finished
[inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
[inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
[inputs.mqtt_consumer] [net]     outgoing comms stopping
[inputs.mqtt_consumer] [net]     startComms closing outError
[inputs.mqtt_consumer] [client]  incoming comms goroutine done
[inputs.mqtt_consumer] [client]  stopCommsWorkers waiting for workers
[inputs.mqtt_consumer] [client]  stopCommsWorkers waiting for comms
[inputs.mqtt_consumer] [client]  stopCommsWorkers done
[inputs.mqtt_consumer] [client]  internalConnLost waiting on workers
[inputs.mqtt_consumer] [client]  internalConnLost workers stopped
[inputs.mqtt_consumer] [client]  BUG BUG BUG reconnection function is nil<nil>
[inputs.mqtt_consumer] [msgids]  cleaned up subs
[inputs.mqtt_consumer] [client]  internalConnLost complete
[inputs.mqtt_consumer] [client]  status is already disconnected
[inputs.mqtt_consumer] Error in plugin: connection lost: EOF
[inputs.mqtt_consumer] Disconnected [mqtt://127.0.0.1:1883]
[outputs.influxdb_v2] Wrote batch of 4 metrics in 7.077679ms
[outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
[inputs.mqtt_consumer] Connecting [mqtt://127.0.0.1:1883]
[inputs.mqtt_consumer] [client]  Connect()
[inputs.mqtt_consumer] [store]   memorystore initialized
[inputs.mqtt_consumer] [client]  about to write new connect msg
[inputs.mqtt_consumer] [client]  dial tcp 127.0.0.1:1883: connect: connection refused
[inputs.mqtt_consumer] [client]  failed to connect to broker, trying next
[inputs.mqtt_consumer] [client]  Failed to connect to a broker
[inputs.mqtt_consumer] [store]   memorystore closed
[inputs.mqtt_consumer] Error in plugin: network Error : dial tcp 127.0.0.1:1883: connect: connection refused
[outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics
[inputs.mqtt_consumer] Connecting [mqtt://127.0.0.1:1883]
[inputs.mqtt_consumer] [client]  Connect()
[inputs.mqtt_consumer] [store]   memorystore initialized
[inputs.mqtt_consumer] [client]  about to write new connect msg
[inputs.mqtt_consumer] [client]  dial tcp 127.0.0.1:1883: connect: connection refused
[inputs.mqtt_consumer] [client]  failed to connect to broker, trying next
[inputs.mqtt_consumer] [client]  Failed to connect to a broker
[inputs.mqtt_consumer] [store]   memorystore closed
[inputs.mqtt_consumer] Error in plugin: network Error : dial tcp 127.0.0.1:1883: connect: connection refused
[outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

I started experimenting on my MQTT client, by delaying message transmission by 7s once a connection was lost and re-established, to give all subscribers enough time to re-connect and resume / re-subscribe.
This is the Telegraf log output I get once it is reconnecting:

[inputs.mqtt_consumer] Connecting [mqtt://127.0.0.1:1883]
[inputs.mqtt_consumer] [client]  Connect()
[inputs.mqtt_consumer] [store]   memorystore initialized
[inputs.mqtt_consumer] [client]  about to write new connect msg
[inputs.mqtt_consumer] [client]  socket connected to broker
[inputs.mqtt_consumer] [client]  Using MQTT 3.1.1 protocol
[inputs.mqtt_consumer] [net]     connect started
[inputs.mqtt_consumer] [net]     received connack
[inputs.mqtt_consumer] [client]  startCommsWorkers called
[inputs.mqtt_consumer] [client]  client is connected/reconnected
[inputs.mqtt_consumer] [net]     incoming started
[inputs.mqtt_consumer] [net]     startIncomingComms started
[inputs.mqtt_consumer] [net]     outgoing started
[inputs.mqtt_consumer] [net]     startComms started
[inputs.mqtt_consumer] [client]  startCommsWorkers done
[inputs.mqtt_consumer] [store]   enter Resume
[inputs.mqtt_consumer] [store]   exit resume
[inputs.mqtt_consumer] [client]  exit startClient
[inputs.mqtt_consumer] Connected [mqtt://127.0.0.1:1883]
[inputs.mqtt_consumer] Session found [mqtt://127.0.0.1:1883]
[inputs.mqtt_consumer] [pinger]  keepalive starting
[inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
[inputs.mqtt_consumer] [net]     startIncomingComms: inboundFromStore complete
[inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
[inputs.mqtt_consumer] [net]     outgoing waiting for an outbound message
[inputs.mqtt_consumer] [net]     startIncoming Received Message
[inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
[inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
[inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
[inputs.mqtt_consumer] [net]     startIncoming Received Message
[inputs.mqtt_consumer] [net]     startIncoming Received Message
[inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
[inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
[inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
[inputs.mqtt_consumer] [net]     startIncomingComms: got msg on ibound
[inputs.mqtt_consumer] [net]     startIncomingComms: received publish, msgId:0
[inputs.mqtt_consumer] [net]     logic waiting for msg on ibound
[inputs.mqtt_consumer] [net]     startIncoming Received Message
...
[inputs.mqtt_consumer] [pinger]  ping check5.007868486
[outputs.influxdb_v2] Wrote batch of 54 metrics in 29.922363ms
[outputs.influxdb_v2] Buffer fullness: 0 / 10000 metrics

Seems to work now, as all the 54 queued messages were read and not a single message was lost.

However, artificially delaying msg transmission by 7s just to make sure Telegraf reconnected successfully is quite a lot of time IMHO. While other clients reconnect immediately and don’t lose a single message once the MQTT broker is back alive.
Also this BUG BUG BUG reconnection function is nil<nil> log message seems a little awkward for me :sweat_smile:

Update: maybe it helps stating the Telegraf version Im using: Telegraf 1.30.3.

@jpowers what’s your take on this log message? AFAIK this is code which should not be reachable, right?

That log message is from the mqtt client library we use.

It does appear that some connection/reconnection issues existed and were resolved in feat(inputs.mqtt_consumer): Implement startup error behaviors by srebhan · Pull Request #15486 · influxdata/telegraf · GitHub

Feel free to grab the artifacts from that PR and give it another try. It does also expose a new keep_alive = "60s" option, but I’d be curious how it goes without changing that first.

Oh, that’s great, will give it a try and will comment in the PR directly.