Telegraf MQTT Consumer unable to post data to Influxdb

Hello all

Been having trouble with MQTT > Telegraf > Influxdb.
Based on a Telegraf script, I managed to generate the influx line from the incoming MQTT Subs through Telegraf, then send to an output file and influxdb_v2.

The generated Influx line, retrieved from the output file in order to identify what influx has received:

mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-myip,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-32 1622887357373242646

However, checking if influx Data Explorer has anything new, but nothing was there, no new data.

So, I grabbed the above influx line, and sent it back to influx directly through curl:

curl --request POST "http://localhost:8086/api/v2/write?org=myorg&bucket=mqttbucket&precision=ns" --header "Authorization: Token MY_TOKEN==" --data-raw 'mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-myip,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-32 1622887357373242646'

And the post was successful through curl.
My questions are:
1- How do I fix direct output to influx?
2- The enumerated mapping processor with Yes/No is being eliminated from the influx line. Need to understand where I have failed and how to restore as boolean into the influx line?

The Telegraph plugin config/script:

######################################
MQTT ==> Telegraf Consumer
######################################

[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = ["MQTTSensor/type/11/#", "MQTTSensor/type/105/#"]
username = "the user name"
password = "the user password"
data_format = "json"
name_override = "mqttstuff"
json_query = "d"
json_string_fields = ["Sensor_ID", "Values", "Battery_Voltage", "Radio_Signal_Strength", "Device_Type"]
tag_keys = ["Device_Type", "Sensor_ID"]

[[processors.enum]]
namepass = ["mqttstuff"]
[[processors.enum.mapping]]
field = "Values"

dest = "Values"

[processors.enum.mapping.value_mappings]
  "No" = 0
  "Yes" = 1

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Battery_Voltage"
cutset = " V"

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Radio_Signal_Strength"
cutset = " db"

[[processors.converter]]
namepass = ["mqttstuff"]
[processors.converter.fields]
float = ["Values", "Battery_Voltage", "Radio_Signal_Strength"]

[[outputs.file]] # only for debugging
namepass = ["mqttstuff"]
files = ["/tmp/mqtt_telegraf.out"]
influx_sort_fields = true

[[outputs.influxdb_v2]]
namepass = ["mgttstuff"]
urls = ["http://127.0.0.1:8086"]
token = "mytoken=="
organization = "myorg"
bucket = "mqttbucket"
data_format = "influx"

Hello @Shayban_Sawan,
That’s quite odd. Can you please try commenting out the outputs.file section and running telegraf with --test flag and write lines to stdout?

Do you see the lines as expected?
Thank you for trying the cURL to verify that the line protocol is correct.
Can you also set debug=true in the agent portion of your config and share the telegraf logs?
I’ll share this with the telegraf team too.

Hello @Anaisdg

Took some time off to recuperate.

1- In agent, I have already had debug = true
2- After stopping the telegraf service, muting file output in the config. ran telegraf from the terminal:

admin@ip-172-31-6-236:~$ telegraf --config /etc/telegraf/telegraf.conf --test --config-directory /etc/telegraf/telegraf.d $TELEGRAF_OPTS
2021-06-12T14:04:09Z I! Starting Telegraf 1.18.3
2021-06-12T14:04:09Z E! Unable to open /var/log/telegraf/telegraf.log (open /var/log/telegraf/telegraf.log: permission denied), using stderr
2021-06-12T14:04:09Z D! [agent] Initializing plugins
2021-06-12T14:04:09Z D! [agent] Starting service inputs
2021-06-12T14:04:09Z I! [inputs.http_listener_v2] Listening on [::]:8080
2021-06-12T14:04:09Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]
net,host=ip-172-31-6-236,interface=eth0 bytes_recv=99101926i,bytes_sent=187602895i,drop_in=0i,drop_out=0i,err_in=0i,err_out=0i,packets_recv=742059i,packets_sent=673536i 1623506650000000000
net,host=ip-172-31-6-236,interface=all icmp_inaddrmaskreps=0i,icmp_inaddrmasks=0i,icmp_incsumerrors=0i,icmp_indestunreachs=332i,icmp_inechoreps=2i,icmp_inechos=10856i,icmp_inerrors=102i,icmp_inmsgs=11211i,icmp_inparmprobs=0i,icmp_inredirects=0i,icmp_insrcquenchs=0i,icmp_intimeexcds=21i,icmp_intimestampreps=0i,icmp_intimestamps=0i,icmp_outaddrmaskreps=0i,icmp_outaddrmasks=0i,icmp_outdestunreachs=6379i,icmp_outechoreps=10856i,icmp_outechos=0i,icmp_outerrors=0i,icmp_outmsgs=17235i,icmp_outparmprobs=0i,icmp_outredirects=0i,icmp_outsrcquenchs=0i,icmp_outtimeexcds=0i,icmp_outtimestampreps=0i,icmp_outtimestamps=0i,icmpmsg_intype0=2i,icmpmsg_intype11=21i,icmpmsg_intype3=332i,icmpmsg_intype8=10856i,icmpmsg_outtype0=10856i,icmpmsg_outtype3=6379i,ip_defaultttl=64i,ip_forwarding=2i,ip_forwdatagrams=0i,ip_fragcreates=0i,ip_fragfails=0i,ip_fragoks=0i,ip_inaddrerrors=8i,ip_indelivers=8053921i,ip_indiscards=0i,ip_inhdrerrors=0i,ip_inreceives=8054647i,ip_inunknownprotos=718i,ip_outdiscards=0i,ip_outnoroutes=0i,ip_outrequests=7989722i,ip_reasmfails=0i,ip_reasmoks=0i,ip_reasmreqds=0i,ip_reasmtimeout=0i,tcp_activeopens=4785i,tcp_attemptfails=2512i,tcp_currestab=23i,tcp_estabresets=1679i,tcp_incsumerrors=5669i,tcp_inerrs=5703i,tcp_insegs=8023756i,tcp_maxconn=-1i,tcp_outrsts=176622i,tcp_outsegs=7973621i,tcp_passiveopens=15189i,tcp_retranssegs=26673i,tcp_rtoalgorithm=1i,tcp_rtomax=120000i,tcp_rtomin=200i,udp_ignoredmulti=0i,udp_incsumerrors=0i,udp_indatagrams=13229i,udp_inerrors=0i,udp_noports=5725i,udp_outdatagrams=13238i,udp_rcvbuferrors=0i,udp_sndbuferrors=0i,udplite_ignoredmulti=0i,udplite_incsumerrors=0i,udplite_indatagrams=0i,udplite_inerrors=0i,udplite_noports=0i,udplite_outdatagrams=0i,udplite_rcvbuferrors=0i,udplite_sndbuferrors=0i 1623506650000000000
diskio,host=ip-172-31-6-236,name=xvda io_time=966933508i,iops_in_progress=0i,merged_reads=2i,merged_writes=920156i,read_bytes=1548520960i,read_time=56903i,reads=59285i,weighted_io_time=967008740i,write_bytes=10015389184i,write_time=854109i,writes=1016286i 1623506650000000000
diskio,host=ip-172-31-6-236,name=xvda1 io_time=333668i,iops_in_progress=0i,merged_reads=2i,merged_writes=920156i,read_bytes=1543212032i,read_time=56556i,reads=58763i,weighted_io_time=405980i,write_bytes=10015388672i,write_time=854108i,writes=1016285i 1623506650000000000
diskio,host=ip-172-31-6-236,name=xvda14 io_time=20i,iops_in_progress=0i,merged_reads=0i,merged_writes=0i,read_bytes=245760i,read_time=22i,reads=60i,weighted_io_time=20i,write_bytes=0i,write_time=0i,writes=0i 1623506650000000000
diskio,host=ip-172-31-6-236,name=xvda15 io_time=68i,iops_in_progress=0i,merged_reads=0i,merged_writes=0i,read_bytes=3351040i,read_time=271i,reads=383i,weighted_io_time=272i,write_bytes=512i,write_time=0i,writes=1i 1623506650000000000
disk,device=xvda1,fstype=ext4,host=ip-172-31-6-236,mode=rw,path=/ free=4760027136i,inodes_free=459982i,inodes_total=516096i,inodes_used=56114i,total=8255877120i,used=3056291840i,used_percent=39.101421645973524 1623506650000000000
disk,device=xvda15,fstype=vfat,host=ip-172-31-6-236,mode=rw,path=/boot/efi free=129466368i,inodes_free=0i,inodes_total=0i,inodes_used=0i,total=129751040i,used=284672i,used_percent=0.21939862678557334 1623506650000000000
processes,host=ip-172-31-6-236 blocked=0i,dead=0i,idle=24i,paging=0i,running=0i,sleeping=70i,stopped=0i,total=94i,total_threads=120i,unknown=0i,zombies=0i 1623506650000000000
swap,host=ip-172-31-6-236 free=0i,total=0i,used=0i,used_percent=0 1623506650000000000
swap,host=ip-172-31-6-236 in=0i,out=0i 1623506650000000000
system,host=ip-172-31-6-236 load1=0,load15=0,load5=0,n_cpus=1i,n_users=5i 1623506650000000000
system,host=ip-172-31-6-236 uptime=1252856i 1623506650000000000
system,host=ip-172-31-6-236 uptime_format="14 days, 12:00" 1623506650000000000
mem,host=ip-172-31-6-236 active=605364224i,available=590024704i,available_percent=57.21224878862499,buffered=79196160i,cached=456581120i,commit_limit=515645440i,committed_as=1065472000i,dirty=8192i,free=130248704i,high_free=0i,high_total=0i,huge_page_size=2097152i,huge_pages_free=0i,huge_pages_total=0i,inactive=185778176i,low_free=0i,low_total=0i,mapped=180248576i,page_tables=3805184i,shared=10747904i,slab=84529152i,sreclaimable=53829632i,sunreclaim=30699520i,swap_cached=0i,swap_free=0i,swap_total=0i,total=1031290880i,used=365264896i,used_percent=35.41822225752641,vmalloc_chunk=0i,vmalloc_total=35184372087808i,vmalloc_used=0i,write_back=0i,write_back_tmp=0i 1623506650000000000
mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-172-31-6-236,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-34 1623506649777292267
mqttstuff,Device_Type=Ultrasonic\ Ranger,Sensor_ID=730968,host=ip-172-31-6-236,topic=MQTTSensor/type/105/id/730968 Battery_Voltage=3.5,Radio_Signal_Strength=-35,Values_0=138 1623506649777349355
2021-06-12T14:04:10Z D! [agent] Stopping service inputs
2021-06-12T14:04:10Z D! [inputs.mqtt_consumer] Disconnecting [tcp://127.0.0.1:1883]
cpu,cpu=cpu0,host=ip-172-31-6-236 usage_guest=0,usage_guest_nice=0,usage_idle=100,usage_iowait=0,usage_irq=0,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0 1623506650000000000
cpu,cpu=cpu-total,host=ip-172-31-6-236 usage_guest=0,usage_guest_nice=0,usage_idle=100,usage_iowait=0,usage_irq=0,usage_nice=0,usage_softirq=0,usage_steal=0,usage_system=0,usage_user=0 1623506650000000000
2021-06-12T14:04:10Z D! [inputs.mqtt_consumer] Disconnected [tcp://127.0.0.1:1883]
2021-06-12T14:04:10Z D! [agent] Input channel closed
2021-06-12T14:04:10Z D! [agent] Processor channel closed
2021-06-12T14:04:10Z D! [agent] Processor channel closed
2021-06-12T14:04:10Z D! [agent] Processor channel closed
2021-06-12T14:04:10Z D! [agent] Processor channel closed
2021-06-12T14:04:10Z D! [agent] Processor channel closed
2021-06-12T14:04:10Z D! [agent] Stopped Successfully

However, needed to run that multiple times after it exited, to capture something related to that push button sometimes. This caught my eye:
> mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-172-31-6-236,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-36 1623506481573246668

Still, doesn’t seem to coincide with the data from MQTT, the enum processor output for the Yes/No states are still missing, here is a copy of the MQTT topic subs:
{"d":{"Sensor_ID":"837164","Device_Type":"Button","Sensor_Number":"837164","Values":["No"],"Units":["Button pressed"],"Battery_Voltage":"3.03 V","Radio_Signal_Strength":"-24 db","Internal_State":"False"}}

In conclusion, stdout output of the terminal-run telegraf, has shown the same results from the logs collected. Something is stuck at the level between telegraf and influx where in regards of MQTT parsing and formatting before the output level is running fine; however, while the output generates generally well at file output level (minus the status of the button) but not follow through to influx.

Hello @Anaisdg again, forgive me this is a facepalm moment hope you can bear with me.

I got something fixed regarding the Yes/No status, you know how these things are, always incremental in terms of fixes.

{"d":{"Sensor_ID":"837164","Device_Type":"Button","Sensor_Number":"837164","Values":["No"],"Units":["Button pressed"],"Battery_Voltage":"3.03 V","Radio_Signal_Strength":"-24 db","Internal_State":"False"}}

Notice how the Yes/No values are encapsulated by brackets? I found a way to force the omission of the brackets from the source directly, but sadly still stuck with Yes/No rather than true/false or 1/0 etc…

So now after the omission of the brackets:
mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-172-31-6-236,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-35,Values=1 1623525367448333093

However, still the final step of the telegraf config isn’t pushing the influx line. I can’t see any physical evidence that the line is issued except through the --test flag. Which now appears as:

> mqttstuff,Device_Type=Button,Sensor_ID=837164,host=ip-172-31-6-236,topic=MQTTSensor/type/11/id/837164 Battery_Voltage=3.03,Radio_Signal_Strength=-35,Values=1 1623525367448333093

I can’t understand how the same influx line can be sent through curl but not through telegraf outputs.influxdb_v2 plugin. I would really appreciate your feedback.

Facepalm moment #2, I couldn’t see that I replaced a q with g when writing namepass = “mqttstuff”
I only thought to look back here on the snippet i pasted earlier to see if it looked different here.
Lo and Behold. terminal q and g are almost identical. How and what solution should I terminate this lead?

Solution for MQTT consumer in Telegraf:

  1. Assign Topic to filter
  2. Assign Tags, Fields and Timestamp in the JSON MQTT sub message to match Influx Line protocol. The tags are like names to the measurements in the Fields portion of the message. The timestamp is setup by influx automatically. Make sure to set the right timescale in Agent part of the config.
  3. Make sure to Identify parts of the JSON that aren’t considered conventional, for those use processors.
  4. Watch out for namepass.
2 Likes

Thank you for the post, @Shayban_Sawan . Could you post your telegraf config file. I still can’t get the MQTT plugin to read or write data from a HiveMQ broker.

Hi there, it’s been a while since I visited this tool. However, for the respective JSON string in the earlier comments, kindly note that the telegraf config was written to solve the problems regarding the JSON input / conversion to Influx Line / pass to influx accordingly. As such, please find the below script used to solve conversion problems for the above mentioned JSON string:

### Configuration for telegraf agent
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "ns"
debug = true
quiet = false
logfile = "/var/log/telegraf/telegraf.log"
hostname = ""

######################################
### system health
######################################
[[inputs.cpu]]
percpu = true
totalcpu = true
collect_cpu_time = false
report_active = false
[[inputs.disk]]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.mem]]
[[inputs.net]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]

[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "INFLUX TOKEN HERE #1" #### KEEP THE QUOTES
organization = "YOUR ORG HERE"
bucket = "YOUR BUCKET NAME HERE #1"
namedrop = ["other possible name you override your input","mqttstuff"]

######################################
### MQTT ==> Telegraf Consumer
######################################

[[inputs.mqtt_consumer]]
servers = ["tcp://127.0.0.1:1883"]
topics = ["MQTTSensor/type/11/#", "MQTTSensor/type/105/#", "MQTTSensor/type/5/#"]
username = "mqtt username"
password = "mqtt password"
data_format = "json"
name_override = "mqttstuff"
json_query = "d"
json_string_fields = ["Sensor_ID", "Values", "Battery_Voltage", "Radio_Signal_Strength", "Device_Type", "Internal_State"]
tag_keys = ["Device_Type", "Sensor_ID"]

[[processors.enum]]
namepass = ["mqttstuff"]
[[processors.enum.mapping]]
field = "Values"
[processors.enum.mapping.value_mappings]
"No" = 0
"Yes" = 1

[[processors.enum]]
namepass = ["mqttstuff"]
[[processors.enum.mapping]]
field = "Internal_State"
[processors.enum.mapping.value_mappings]
"False" = 0
"True" = 1

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Battery_Voltage"
cutset = " V"

[[processors.strings]]
namepass = ["mqttstuff"]
[[processors.strings.trim_right]]
field = "Radio_Signal_Strength"
cutset = " db"

[[processors.converter]]
namepass = ["mqttstuff"]
[processors.converter.fields]
float = ["Values", "Battery_Voltage", "Radio_Signal_Strength", "Internal_State"]

[[outputs.file]]
namepass = ["mqttstuff"]
files = ["/tmp/mqtt_telegraf.out"]
influx_sort_fields = true

[[outputs.influxdb_v2]]
urls = ["http://127.0.0.1:8086"]
token = "INFLUX TOKEN HERE #2"
organization = "YOUR ORG HERE"
bucket = "YOUR BUCKET NAME HERE #2"
namepass = ["mqttstuff"]

### END OF TELEGRAF CONFIG FILE

For your reference, I will include my respective JSON string that was sent through MQTT publish to the broker, it was a single line, I just cleaned it up to make sense for you:

{
"d":{
"Sensor_ID":"837164",
"Device_Type":"Button",
"Sensor_Number":"837164",
"Values":["No"],
"Units":["Button pressed"],
"Battery_Voltage":"3.03 V",
"Radio_Signal_Strength":"-24 db",
"Internal_State":"False"
}
}

Thank you so much @Shayban_Sawan . It is a tremendous help.