Jump start needed: InfluxDB 2.7.1 + Telegraf 1.28.1 + MQTT Consumer Input Plugin + MQTT messages with JSON payload

Hi All,

unfortunately I’m still a total newbie in the field of “logging MQTT messages to InfluxDB using Telegraf” and therefore could really benefit from some help with the configuration of the MQTT Consumer Input plugin. I’ve read quite a bit about it on the Internet, but I feel I’m still a long way from being able to use it safely.

Based on this article, I’m starting to understand how to generate tag set and field set for InfluxDB’s line protocol from the components of the topic and the fields of the JSON payload.

Let’s say: In my application I get MQTT telegrams with a topic:
my_smarthome/rooms/bedroom/sensors/sensor0x3E26A
and a JSON payload:

{
    "timestamp": 1695409130000000000
    "type": "BME280"
    "last_calibration": "2022-12-12"
    "temperature_degC": 22.35
    "relative_humidity": 66.28
}

What would be a useful configuration file? Are there any matching examples to this application?

What is the minimum information that must be included?
(I guess: MQTT broker address, port, user name, password, list of subscribed topics, QoS, payload data format…topic-/payload-parsing statements…what else?)

How would you read the field “timestamp” (here practically already suitable for InfluxDB in ns since 1970-01-01) and use it as a timestamp for the LP instead of as a regular field (like a measured value)? None of the examples on the internet address this. There as timestamp for the LP always only the reception time of the MQTT telegram is used, that means the time is not obtained from the MQTT telegram.

What if the timestamp was included in the payload as an ISO8601 string and had to be converted first?

Any tips and links to easy to understand tutorials are greatly appreciated.

Many thanks!

Best,
conne914

Helloooooo,
First let me share some example repos that could be helpful:

For parsing JSON there are great examples here:

I haven’t tested it but I think:

 [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "my measurement"
      [[inputs.mqtt_consumer.json_v2.object]]
          path = "@this"
          disable_prepend_keys = true
          tags = ["type"]
          timestamp_path  = "timestamp"
          timestamp_format = "unix"

Let me know how it goes.
Ps you might want to use the --debug and --test flags.

Hello Anaisdg,
thanks for all the links and examples! :grinning: :+1:

I had some time again today and tried my hand at a similar example. First I wanted to try a sample configuration with Telegraf alone, i.e. without a InfluxDB database.

The topic:

my_system/measurements/temperature/sensor1266/V2.0/087654/E2012KF534

The payload:

{"timestamp_ns": 1696426317451510784, "ch_layout": "A0123", "channel": "Ch_C4", "friendly_name": "line B water out", "sensor_id": "03-2016", "sensor_type": "B1/10,50mm", "calibration_label": "2022-12-07_F", "value_K": 295.356}

So I created the config file (all comment lines removed):

[agent]

  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false

[[outputs.file]]
  files = ["stdout"]

# Read metrics from MQTT topic(s)

[[inputs.mqtt_consumer]]

  servers = ["tcp://127.0.0.1:1883"]
  topics = [
    "my_system/measurements/temperature/#"
  ]
  topic_tag = ""
  qos = 0

  username = [broker_user_name]
  password = [broker_password]

  data_format = "json_v2"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/+/+/+/+/+/+"
    tags = "_/_/_/instrument-type/instrument-version/serial/inventory"

  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "temperatures"
      [[inputs.mqtt_consumer.json_v2.object]]
        path = "@this"
        disable_prepend_keys = true
        tags = [
          "ch_layout",
          "channel",
          "friendly_name",
          "sensor_id",
          "sensor_type",
          "calibration_label",
        ]
        timestamp_key = "timestamp_ns"
        timestamp_format = "unix_ns"
        [[inputs.mqtt_consumer.json_v2.object.field]]
          path = "value_K"
          type = "float"

Now I called Telegraf with the parameter --test and hoped that the line-protocol lines generated by Telegraf would then appear on the standard output:

:~/$ telegraf --config mqtt_input_2023-10-04.conf --test
2023-10-04T16:03:24Z I! Loading config: mqtt_input_2023-10-04.conf
2023-10-04T16:03:24Z I! Starting Telegraf 1.28.2 brought to you by InfluxData the makers of InfluxDB
2023-10-04T16:03:24Z I! Available plugins: 240 inputs, 9 aggregators, 29 processors, 24 parsers, 59 outputs, 5 secret-stores
2023-10-04T16:03:24Z I! Loaded inputs: mqtt_consumer
2023-10-04T16:03:24Z I! Loaded aggregators: 
2023-10-04T16:03:24Z I! Loaded processors: 
2023-10-04T16:03:24Z I! Loaded secretstores: 
2023-10-04T16:03:24Z W! Outputs are not used in testing mode!
2023-10-04T16:03:24Z I! Tags enabled: host=MYHOST
2023-10-04T16:03:24Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]

Although during the whole time the MQTT source was “publishing”, which I could check by calling mosquitto_sub and Telegraf also reported “connected”, no output lines arrived on the console. What is going wrong? The configuration file seems to be at least syntactically correct.

:man_shrugging:

Best,
conne914

I have now tried a much simpler approach:

Topic:
testsystem/measurements/temperature/Thinkpad/L540/0123456/x86_pkg_temp
Payload:
35.0

mqtt_input_2023-10-04.simple.conf :

[[inputs.mqtt_consumer]]

  servers = ["tcp://127.0.0.1:1883"]
  topics = [
    "testsystem/measurements/temperature/#"
  ]
  topic_tag = ""
  qos = 0
  username = [my_username]
  password = [my_password]
  data_format = "value"
  data_type = "float"

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "+/+/+/+/+/+/+"
    tags = "_/_/_/instrument/instrument-version/serial/source"

If I call
telegraf --config ./mqtt_input_2023-10-04.simple.conf --test --debug
I get:

:~/$ telegraf --config ./mqtt_input_2023-10-04.simple.conf --test --debug
2023-10-06T20:46:46Z I! Loading config: ./mqtt_input_2023-10-04.simple.conf
2023-10-06T20:46:46Z I! Starting Telegraf 1.28.2 brought to you by InfluxData the makers of InfluxDB
2023-10-06T20:46:46Z I! Available plugins: 240 inputs, 9 aggregators, 29 processors, 24 parsers, 59 outputs, 5 secret-stores
2023-10-06T20:46:46Z I! Loaded inputs: mqtt_consumer
2023-10-06T20:46:46Z I! Loaded aggregators: 
2023-10-06T20:46:46Z I! Loaded processors: 
2023-10-06T20:46:46Z I! Loaded secretstores: 
2023-10-06T20:46:46Z W! Outputs are not used in testing mode!
2023-10-06T20:46:46Z I! Tags enabled: host=[my_host]
2023-10-06T20:46:46Z D! [agent] Initializing plugins
2023-10-06T20:46:46Z D! [agent] Starting service inputs
2023-10-06T20:46:46Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
2023-10-06T20:46:46Z D! [agent] Stopping service inputs
2023-10-06T20:46:46Z D! [inputs.mqtt_consumer] Disconnecting [tcp://127.0.0.1:1883]
2023-10-06T20:46:46Z D! [inputs.mqtt_consumer] Disconnected [tcp://127.0.0.1:1883]
2023-10-06T20:46:46Z D! [agent] Input channel closed
2023-10-06T20:46:46Z D! [agent] Stopped Successfully

nothing!!! No output on stdout at all! WTF!

Of course, the MQTT messages are available at the broker all the time, every 5sec.

Why does the [agent] stop the “service inputs” immediately:

[inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
[agent] Stopping service inputs
[inputs.mqtt_consumer] Disconnecting [tcp://127.0.0.1:1883]
...

:question: :question: :question:

Any help is greatly appreciated!

conne914

Now I have overcome this hurdle:

I started telegraf without the parameters --test and --debug and immediately I got output lines on stdout! :smile: :+1:

The reason was probably: since I wanted to test first without database whether telegraf does its work correctly, I had commented out section [[outputs.influxdb_v2]] completely and used

[[outputs.file]]
  files = ["stdout"]

instead. Since a warning was issued with --test that “Outputs are not used in testing mode!”, I would not have expected that to be a problem.

Best,
conne914