Customization of output data in telegraf

hello people, i’m new to influx world and wasn’t able to find any solution for my problem neither in forum or using gpt.
i get my data as a json form using mqtt,

i want to make such structure for my data while i’m sending them to influx

  • device ID’s are dynamic and all devices has that inputs,

  • inputs are as 4 digit number and i have to division them to 100, like : 2947 => 29.47

  • there’s opportunity that in future i get dynamic topics so instead it would be like as: topic as Measurement, device_id as field key,

i want device_id be as Field key

mybucket:
    device_id:
        ├inputA
        ├inputB
        ├inputC
        ├inputD
        ├inputE
        ├inputF
        ├inputG
        ├inputH
        ├protocol
        ├date     
    device_id:
        ├inputA
        ├inputB
        ├inputC
        ├inputD
        ├inputE
        ├inputF
        ├inputG
        ├inputH
        ├protocol
        ├date

one json sample of what i get:

msg send {"msg":"Hello","device_id":1964178551,"inputA":603,"inputB":-2200,"inputC":806,"inputD":226,"inputE":50,"inputF":556,"inputG":0,"inputH":-10,"protocol":"tcp","date":"2024-10-10 09:16:46"}

and at last, i need the guidance for my telegraf.conf

I’m not sure where the problem is. :wink:

I do expect you are using the mqtt_consumer input plugin? There you can subscribe to a “sub-tree” via

topics = ["mybucket/#"]

which will subscribe to all messages below mybucket and you don’t need to care about dynamic device IDs.
Then add parsing which I would do using the XPath parser as it’s easiest:

[[inputs.mqtt_consumer]]
  topics = ["mybucket/#"]

  ...

  data_format = "xpath_json"
  xpath_native_types = true
  fieldexclude = ["date", "device_id"]

  [[inputs.mqtt_consumer.xpath]]
    metric_name = "'mydevices'"
    field_selection = "/*"
    timestamp = "date"
    timestamp_format = "2006-01-02 15:04:05"

    [inputs.mqtt_consumer.xpath.tags]
      device = "string(device_id)"

which results in

> mydevices,device=1964178551,host=Hugin inputA=603,inputB=-2200,inputC=806,inputD=226,inputE=50,inputF=556,inputG=0,inputH=-10,msg="Hello",protocol="tcp" 1728551806000000000

Is this what you want @alireza_j?

1 Like

hello, thank you so much for your time, it’s kind of getting close to what i want
let me give more detail for better understanding:

  1. first of all, is there any way to remove host=vps-somethingsomething. com? or instead setting a specific name for host?
    this is data that gets send
> mydevices,device=1964178551,host=vps-somethingsomething.com,topic=emqx/test inputA=603,inputB=-2200,inputC=806,inputD=226,inputE=50,inputF=556,inputG=0,inputH=-10,msg="Hello",protocol="tcp" 1728551806000000000
  1. this is the structure i’m getting rn:

and this is the way i want to get them in data explorer:
as i’m new member can’t post two pics, so upload it separately

and if it helps, this is my config:

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false

[[inputs.mqtt_consumer]]
  servers = ["tcp://*****:1883"]
  topics = [
    "emqx/#",
  ]
  # qos = 1
  username = "*****"
  password = "*****"
  client_id = "*****"

  data_format = "xpath_json"
  xpath_native_types = true
  fieldexclude = ["date", "device_id"]
  [[inputs.mqtt_consumer.xpath]]
    metric_name = "'mydevices'"
    field_selection = "/*"
    timestamp = "date"
    timestamp_format = "2006-01-02 15:04:05"

    [inputs.mqtt_consumer.xpath.tags]
      device = "string(device_id)"

[[outputs.influxdb_v2]]
  urls = ["http://*****:8086"]
  token = "*****"
  organization = "****"
  bucket = "test2"
  timeout = "5s"

thank you again for your time❤

and this is the way i want to get them in data explorer:(btw i can’t see any data when i filter it like this)

or let me say like this:

i have some topics, i select one of them,
then i have list of devices(device ids’) that are publishing to that topic,
so i pick one and now i should see multiple graphs for my inputA … inputH
but i can’t do so in Influx data browser.

again, thanks for your time.

i got it working like this( i have no clue what’s happening)

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 2500 #1000
  metric_buffer_limit = 100000 #10000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = true
 
[[inputs.mqtt_consumer]]
  servers = ["tcp://****:1883"]
  topics = ["emqx/#"]
  username = "****"
  password = "****"
  client_id = "*****"
  data_format = "json"
  tag_keys = ["device_id"]
  qos = 0
[[processors.regex]]
  [[processors.regex.tags]]
    key = "topic"
    pattern = "^emqx/(.*)$"
    replacement = "${1}"
[[processors.converter]]
  [processors.converter.fields]
    float = ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]
[[processors.starlark]]
  source = '''
def apply(metric):
    for field in ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]:
        if field in metric.fields:
            metric.fields[field] = float(metric.fields[field]) / 100  # Remove this line to keep original values
    metric.name = metric.tags["topic"]
    return metric
'''
[[outputs.influxdb_v2]]
  urls = ["http://*******:8086"]
  token = "*****"
  organization = "****"
  bucket = "test"
  timeout = "5s"

but now i have a bigger problem:)) and i post in new topic…

This part

[[processors.regex]]
  [[processors.regex.tags]]
    key = "topic"
    pattern = "^emqx/(.*)$"
    replacement = "${1}"

can be replaced by using topic parsing.

This part

[[processors.converter]]
  [processors.converter.fields]
    float = ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]

is unnecessary when using the XPath parser as I suggested.

And this part

[[processors.starlark]]
  source = '''
def apply(metric):
    for field in ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]:
        if field in metric.fields:
            metric.fields[field] = float(metric.fields[field]) / 100  # Remove this line to keep original values
    metric.name = metric.tags["topic"]
    return metric
'''

can also be covered with the XPath parser, that’s why I suggested it in the first place. :wink:

But anyway, glad you got it working

1 Like