Why are my fields from MQTT topics not merged?

I have a Shelly H&T that publishes MQTT messages like this:

shelly/htg3-bathroom/status/devicepower:0 {"id": 0,"battery":{"V":6.22, "percent":100},"external":{"present":false}}
shelly/htg3-bathroom/status/humidity:0 {"id": 0,"rh":42.6}
shelly/htg3-bathroom/status/temperature:0 {"id": 0,"tC":18.0, "tF":64.3}

I would like to store those in InfluxDB, with one row containing all those values. Below is the config I’m currently using. However, when sending it to a file output I get separate rows for each value:

gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_voltage=6.22 1706054192325945773
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_percent=100 1706054192325945773
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom humidity=42.6 1706054192338387778
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom temperature=18 1706054192346695739

But I’d expect something like this:

gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_voltage=6.22,battery_percent=100,humidity=42.6,temperature=18 1706054192325945773

Am I doing something wrong? Here’s my config:

[[inputs.mqtt_consumer]]
  alias = 'shelly-gen3-ht'
  name_override = 'gen3_ht'
  servers = ['tcp://127.0.0.1:1883']
  topics = [
    'shelly/htg3-bathroom/status/devicepower:0',
    'shelly/htg3-bathroom/status/temperature:0',
    'shelly/htg3-bathroom/status/humidity:0',
  ]
  topic_tag = ''

  username = 'telegraf'
  password = 'XXX'

  data_format = 'json_v2'

  [[inputs.mqtt_consumer.json_v2]]
    [[inputs.mqtt_consumer.json_v2.field]]
      path = 'tC'
      rename = 'temperature'
      optional = true
    [[inputs.mqtt_consumer.json_v2.field]]
      path = 'rh'
      rename = 'humidity'
      optional = true
    [[inputs.mqtt_consumer.json_v2.field]]
      path = 'battery.V'
      rename = 'battery_voltage'
      optional = true
    [[inputs.mqtt_consumer.json_v2.field]]
      path = 'battery.percent'
      rename = 'battery_percent'
      optional = true

  [[inputs.mqtt_consumer.topic_parsing]]
    topic = 'shelly/+/status/+'
    tags = '_/shelly_name/_/_'

  [inputs.mqtt_consumer.tags]
    dbname = 'shellies'


[[processors.unpivot]]
  namepass = ['gen3_ht']
  order = 1

[[processors.pivot]]
  namepass = ['gen3_ht']
  order = 2
  tag_key = 'name'
  value_key = 'value'

I don’t think you’re doing anything wrong. If you want to merge points in a series that have the same timestamp, you can use the Merge aggregator plugin or just write them to InfluxDB and let the database handle it. To aggregate points that have the same series and timestamps within a specific time window, you might try the Final aggregator plugin.

Isn’t it super ugly to have 3 rows instead of 1 each time the shelly sends its data, and each of those rows having no data for 2 of the 3 data fields?

The final aggregator doesn’t seem to produce anything useful:

[[aggregators.final]]
  namepass = ['gen3_ht']
  period = '2s'
  series_timeout = '5s'
  output_strategy = 'timeout'
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_voltage=6.22 1706122143548288869
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_percent=100 1706122143548288869
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom humidity=42.6 1706122143556423107
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom temperature=18 1706122143567104106
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom temperature_final=18 1706122143567104106

Isn’t it super ugly to have 3 rows instead of 1

They have different timestamps so they’re in different series. I won’t comment on the aesthetics of your data :slight_smile: If you could assign a new tag for each “run”, then you could use it as a key to merge on.

final aggregator doesn’t seem to produce anything useful

Sorry, I’ll have to look into that. What about setting grace for the Merge plugin: Optimize writes to InfluxDB | InfluxDB Cloud Dedicated Documentation

Sorry again. grace doesn’t help there; Merge always requires them to be in the same series.

I managed to solve it by simply ditching sub-second precision (which I don’t need anyway)! :slight_smile:

[[processors.starlark]]
  namepass = ['gen3_ht']
  order = 3
  source = '''
def apply(metric):
    metric.time = int(metric.time / 1000000000) * 1000000000
    return metric
  '''

I think that really confused me was the fact that he file output shows individual lines:

gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_voltage=6.22 1706124756000000000
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom battery_percent=100 1706124756000000000
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom humidity=42.6 1706124756000000000
gen3_ht,dbname=shellies,host=piefke,shelly_name=htg3-bathroom temperature=18 1706124756000000000

but when I send it to my actual influxdb, it’s a single row:

> select * from gen3_ht
name: gen3_ht
time                 battery_percent battery_voltage host   humidity shelly_name   temperature
----                 --------------- --------------- ----   -------- -----------   -----------
2024-01-24T19:32:36Z 100             6.22            piefke 42.6     htg3-bathroom 18

Perfect when you don’t need that precision - it will make your queries faster, too!

In the Telegraf output, they’re still unique points, but now all in the same series. When you write them to InfluxDB, the database unions points in the same series to optimize for storage and query performance.

See: