MQTT data parsing with Telegraf

Hello everyone,

Please bear with me as I’m still new to Telegraf and figuring things out!

I’m working on an IoT system where each controller sends a binary-encoded MQTT message on a specific topic (temperature, humidity, pressure, etc.). Each message contains:

  • The controller ID (8 bytes)
  • 8 bytes of omitted data
  • A 4-byte float representing the sensor value

I’m using Telegraf with the mqtt_consumer input plugin and the binary parser to extract this data and write it to InfluxDB. My Telegraf conf file looks like this:

[[inputs.mqtt_consumer]]

  servers = ["tcp://myip"]  

  topics = [
  "dioxygen",
  "humidity",
  "luminosity",
  "pressure",
  "temperature"
]
  qos = 1
  connection_timeout = "30s"

## binary_encoding 
  data_format = "binary"
  binary_encoding = "none"
  endianness = "le"

 [[inputs.mqtt_consumer.binary]]

entries = [
  {name = "controller_id", type = "int32", bits = 64, assignment = "tag"}, 
  {bits = 64, omit = true},
  {name = "value", type = "float32"}
]

The issue is that each topic is parsed independently and generates its own measurement with its own timestamp — even when multiple messages arrive at the exact same time. This results in timestamp redundancy and prevents me from having a single row with multiple fields per timestamp.

What I’d like is a way to have:

  • A single timestamped row
  • The sensor type & value as a key=value (tag) in Line protocol/2 columns in Influx

Can this kind of structure be achieved purely through the binary parser in Telegraf? Or is there a better approach to handle this cleanly within Telegraf without restructuring the MQTT payload format?

Thanks in advance!

@Laurence, Welcome to the Influxdata Community!

Looking at your setup, you’re facing a common challenge when working with separate MQTT topics for different sensor types. There are several approaches to achieve your goal of having a single timestamped row with multiple sensor fields. I would recommend -

Topic Parsing with Pivot Processor
This approach uses Telegraf’s topic parsing feature combined with the pivot processor to restructure your data:

[[inputs.mqtt_consumer]]
  servers = ["tcp://myip"]
  topics = [
    "dioxygen",
    "humidity", 
    "luminosity",
    "pressure",
    "temperature"
  ]
  qos = 1
  connection_timeout = "30s"
  
  # Use a generic measurement name
  data_format = "binary"
  binary_encoding = "none"
  endianness = "le"
  
  [[inputs.mqtt_consumer.binary]]
  entries = [
    {name = "controller_id", type = "int32", bits = 64, assignment = "tag"},
    {bits = 64, omit = true},
    {name = "value", type = "float32"}
  ]
  
  # Parse the topic name as a tag
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "dioxygen"
    measurement = "sensors"
    tags = "sensor_type"
    
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "humidity"
    measurement = "sensors"
    tags = "sensor_type"
    
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "luminosity"
    measurement = "sensors"
    tags = "sensor_type"
    
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "pressure"
    measurement = "sensors"
    tags = "sensor_type"
    
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "temperature"
    measurement = "sensors"
    tags = "sensor_type"

# Pivot processor to convert rows to columns
[[processors.pivot]]
  tag_key = "sensor_type"
  value_key = "value"

This will transform your data from:

sensors,controller_id=123,sensor_type=temperature value=25.5
sensors,controller_id=123,sensor_type=humidity value=60.2

To:
sensors,controller_id=123 temperature=25.5,humidity=60.2

The pivot processor is specifically designed for this type of data transformation, where you want to convert tag values into field names, which is exactly what you’re trying to achieve.

Note:
{name = "controller_id", type = "int32", bits = 64, assignment = "tag"}
There’s likely a mismatch here - you’re specifying type = "int32" but bits = 64. For an 8-byte controller ID, you probably want type = "int64" or if it’s actually a string/byte array, you might want type = "bytes".

2 Likes