hello people, i’m new to influx world and wasn’t able to find any solution for my problem neither in forum or using gpt.
i get my data as a json form using mqtt,
i want to make such structure for my data while i’m sending them to influx
-
device ID’s are dynamic and all devices has that inputs,
-
inputs are as 4 digit number and i have to division them to 100, like : 2947 => 29.47
-
there’s opportunity that in future i get dynamic topics so instead it would be like as: topic as Measurement, device_id as field key,
i want device_id be as Field key
mybucket:
device_id:
├inputA
├inputB
├inputC
├inputD
├inputE
├inputF
├inputG
├inputH
├protocol
├date
device_id:
├inputA
├inputB
├inputC
├inputD
├inputE
├inputF
├inputG
├inputH
├protocol
├date
one json sample of what i get:
msg send {"msg":"Hello","device_id":1964178551,"inputA":603,"inputB":-2200,"inputC":806,"inputD":226,"inputE":50,"inputF":556,"inputG":0,"inputH":-10,"protocol":"tcp","date":"2024-10-10 09:16:46"}
and at last, i need the guidance for my telegraf.conf
I’m not sure where the problem is.
I do expect you are using the mqtt_consumer input plugin? There you can subscribe to a “sub-tree” via
topics = ["mybucket/#"]
which will subscribe to all messages below mybucket
and you don’t need to care about dynamic device IDs.
Then add parsing which I would do using the XPath parser as it’s easiest:
[[inputs.mqtt_consumer]]
topics = ["mybucket/#"]
...
data_format = "xpath_json"
xpath_native_types = true
fieldexclude = ["date", "device_id"]
[[inputs.mqtt_consumer.xpath]]
metric_name = "'mydevices'"
field_selection = "/*"
timestamp = "date"
timestamp_format = "2006-01-02 15:04:05"
[inputs.mqtt_consumer.xpath.tags]
device = "string(device_id)"
which results in
> mydevices,device=1964178551,host=Hugin inputA=603,inputB=-2200,inputC=806,inputD=226,inputE=50,inputF=556,inputG=0,inputH=-10,msg="Hello",protocol="tcp" 1728551806000000000
Is this what you want @alireza_j?
1 Like
hello, thank you so much for your time, it’s kind of getting close to what i want
let me give more detail for better understanding:
- first of all, is there any way to remove host=vps-somethingsomething. com? or instead setting a specific name for host?
this is data that gets send
> mydevices,device=1964178551,host=vps-somethingsomething.com,topic=emqx/test inputA=603,inputB=-2200,inputC=806,inputD=226,inputE=50,inputF=556,inputG=0,inputH=-10,msg="Hello",protocol="tcp" 1728551806000000000
- this is the structure i’m getting rn:
and this is the way i want to get them in data explorer:
as i’m new member can’t post two pics, so upload it separately
and if it helps, this is my config:
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
hostname = ""
omit_hostname = false
[[inputs.mqtt_consumer]]
servers = ["tcp://*****:1883"]
topics = [
"emqx/#",
]
# qos = 1
username = "*****"
password = "*****"
client_id = "*****"
data_format = "xpath_json"
xpath_native_types = true
fieldexclude = ["date", "device_id"]
[[inputs.mqtt_consumer.xpath]]
metric_name = "'mydevices'"
field_selection = "/*"
timestamp = "date"
timestamp_format = "2006-01-02 15:04:05"
[inputs.mqtt_consumer.xpath.tags]
device = "string(device_id)"
[[outputs.influxdb_v2]]
urls = ["http://*****:8086"]
token = "*****"
organization = "****"
bucket = "test2"
timeout = "5s"
thank you again for your time❤
and this is the way i want to get them in data explorer:(btw i can’t see any data when i filter it like this)
or let me say like this:
i have some topics, i select one of them,
then i have list of devices(device ids’) that are publishing to that topic,
so i pick one and now i should see multiple graphs for my inputA … inputH
but i can’t do so in Influx data browser.
again, thanks for your time.
i got it working like this( i have no clue what’s happening)
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 2500 #1000
metric_buffer_limit = 100000 #10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
hostname = ""
omit_hostname = true
[[inputs.mqtt_consumer]]
servers = ["tcp://****:1883"]
topics = ["emqx/#"]
username = "****"
password = "****"
client_id = "*****"
data_format = "json"
tag_keys = ["device_id"]
qos = 0
[[processors.regex]]
[[processors.regex.tags]]
key = "topic"
pattern = "^emqx/(.*)$"
replacement = "${1}"
[[processors.converter]]
[processors.converter.fields]
float = ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]
[[processors.starlark]]
source = '''
def apply(metric):
for field in ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]:
if field in metric.fields:
metric.fields[field] = float(metric.fields[field]) / 100 # Remove this line to keep original values
metric.name = metric.tags["topic"]
return metric
'''
[[outputs.influxdb_v2]]
urls = ["http://*******:8086"]
token = "*****"
organization = "****"
bucket = "test"
timeout = "5s"
but now i have a bigger problem:)) and i post in new topic…
This part
[[processors.regex]]
[[processors.regex.tags]]
key = "topic"
pattern = "^emqx/(.*)$"
replacement = "${1}"
can be replaced by using topic parsing.
This part
[[processors.converter]]
[processors.converter.fields]
float = ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]
is unnecessary when using the XPath parser as I suggested.
And this part
[[processors.starlark]]
source = '''
def apply(metric):
for field in ["inputA", "inputB", "inputC", "inputD", "inputE", "inputF", "inputG", "inputH"]:
if field in metric.fields:
metric.fields[field] = float(metric.fields[field]) / 100 # Remove this line to keep original values
metric.name = metric.tags["topic"]
return metric
'''
can also be covered with the XPath parser, that’s why I suggested it in the first place.
But anyway, glad you got it working
1 Like