Here,
if telegraf to influxdb connection is successful because when i i restart the telegraf it creates the db i mentioned in the config file. But mqtt publihsed messages are not recieved by the telegraf I even tried to put it into file but it’s empty.
so something is wrong.
config of mqtt
[[inputs.mqtt_consumer]]
## MQTT broker URLs to be used. The format should be scheme://host:port,
## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
## QoS policy for messages
## 0 = at most once
## 1 = at least once
## 2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
qos = 0
## Connection timeout for initial connection in seconds
connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Topics to subscribe to
topics = [
"telegraf/sensors/#",
]
# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = ""
## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
data_format = "influx"
-------------------------------------------------------------------------------------------------------------------
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
## The target database for metrics; will be created as needed.
# database = "telegraf"
## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
# skip_database_creation = false
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy. Only takes effect when using HTTP.
# retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
## Only takes effect when using HTTP.
# write_consistency = "any"
## Timeout for HTTP messages.
# timeout = "5s"
## HTTP Basic Auth
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## HTTP User-Agent
# user_agent = "telegraf"
## UDP payload size is the maximum packet size to send.
# udp_payload = "512B"
## Optional TLS Config for use on HTTP connections.
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
and this is the code for publishing
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json
influxclient = InfluxDBClient(host='localhost', port=8086)
# This is the Publisher
dict_msg={"temperature":"20.5"}
msg = json.dumps(dict_msg)
MQTT_HOST = "iot.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC = "sensors"
count = 0
# Define on_publish event function
def on_publish(client, userdata, mid):
print("Message Published..")
# Initiate MQTT Client
mqttc = mqtt.Client()
# Register publish callback function
mqttc.on_publish = on_publish
# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)
x = 0
while x <= 100000:
# Publish message to MQTT Broker
mqttc.publish(MQTT_TOPIC,msg)
# influx_line_protocol = ("published,counts" = count)
# print(count)
x += 1
mqttc.loop(30)
# Disconnect from MQTT_Broker
mqttc.disconnect()
I don’t know what is wrong. i’m new in this.
help me out.