Mqtt publishing the messages but telegraf cannot send it to the influxdb


#1

Here,
if telegraf to influxdb connection is successful because when i i restart the telegraf it creates the db i mentioned in the config file. But mqtt publihsed messages are not recieved by the telegraf I even tried to put it into file but it’s empty.
so something is wrong.

config of mqtt

[[inputs.mqtt_consumer]]
  ## MQTT broker URLs to be used. The format should be scheme://host:port,
  ## schema can be tcp, ssl, or ws.
  servers = ["tcp://localhost:1883"]

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  qos = 0

  ## Connection timeout for initial connection in seconds
  connection_timeout = "30s"

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Topics to subscribe to
  topics = [
    "telegraf/sensors/#",
  ]

  # if true, messages that can't be delivered while the subscriber is offline
  # will be delivered when it comes back (such as on service restart).
  # NOTE: if true, client_id MUST be set
  persistent_session = false
  # If empty, a random client ID will be generated.
  client_id = ""

  ## username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  data_format = "influx"



-------------------------------------------------------------------------------------------------------------------
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
  ## The full HTTP or UDP URL for your InfluxDB instance.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  # urls = ["unix:///var/run/influxdb.sock"]
  # urls = ["udp://127.0.0.1:8089"]
  # urls = ["http://127.0.0.1:8086"]

  ## The target database for metrics; will be created as needed.
  # database = "telegraf"

  ## If true, no CREATE DATABASE queries will be sent.  Set to true when using
  ## Telegraf with a user without permissions to create databases or when the
  ## database already exists.
  # skip_database_creation = false

  ## Name of existing retention policy to write to.  Empty string writes to
  ## the default retention policy.  Only takes effect when using HTTP.
  # retention_policy = ""

  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
  ## Only takes effect when using HTTP.
  # write_consistency = "any"

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## HTTP Basic Auth
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## UDP payload size is the maximum packet size to send.
  # udp_payload = "512B"

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "identity"

  ## When true, Telegraf will output unsigned integers as unsigned values,
  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
  ## integer values.  Enabling this option will result in field type errors if
  ## existing data has been written.
  # influx_uint_support = false

and this is the code for publishing

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import  json

influxclient = InfluxDBClient(host='localhost', port=8086)
# This is the Publisher

dict_msg={"temperature":"20.5"}
msg = json.dumps(dict_msg)

MQTT_HOST = "iot.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC = "sensors"

count = 0


# Define on_publish event function
def on_publish(client, userdata, mid):
    print("Message Published..")


# Initiate MQTT Client
mqttc = mqtt.Client()

# Register publish callback function
mqttc.on_publish = on_publish

# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)

x = 0
while x <= 100000:
    # Publish message to MQTT Broker
    mqttc.publish(MQTT_TOPIC,msg)
    # influx_line_protocol = ("published,counts" = count)
    # print(count)
    x += 1

mqttc.loop(30)
# Disconnect from MQTT_Broker
mqttc.disconnect()

I don’t know what is wrong. i’m new in this.
help me out.


#2

In your code, you’re publishing messages to iot.eclipse.org, not localhost; which telegraf is monitoring.


#3

I changed it to localhost as well as tried with 127.0.0.1
No changed.
the console log freezes at

2019-02-07T10:23:39Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:“shekhar-Inspiron-3441”, Flush Interval:10s
2019-02-07T10:23:39Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]


#4

That’s good, that means telegraf has connected to mqtt.

You’ll need to run your code to publish messages


#5

Yeah. That’s what even after publishing as well I’m not receiving anything.
not even to file.
what might be the problem?


#6

Is there anything that i have send specific formatted messages??
or make changes in templates?

any help appreciated.


#7

Hi @shekhar-joshi,

There were 2 things wrong:

  1. You didn’t configure your InfluxDB output in telegraf.conf
  2. Your temperature reading is a string, not a number type

I’ve provided a working example of using the MQTT plugin for you here: