Mqtt publishing the messages but telegraf cannot send it to the influxdb

Here,
if telegraf to influxdb connection is successful because when i i restart the telegraf it creates the db i mentioned in the config file. But mqtt publihsed messages are not recieved by the telegraf I even tried to put it into file but it’s empty.
so something is wrong.

config of mqtt

[[inputs.mqtt_consumer]]
  ## MQTT broker URLs to be used. The format should be scheme://host:port,
  ## schema can be tcp, ssl, or ws.
  servers = ["tcp://localhost:1883"]

  ## QoS policy for messages
  ##   0 = at most once
  ##   1 = at least once
  ##   2 = exactly once
  ##
  ## When using a QoS of 1 or 2, you should enable persistent_session to allow
  ## resuming unacknowledged messages.
  qos = 0

  ## Connection timeout for initial connection in seconds
  connection_timeout = "30s"

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Topics to subscribe to
  topics = [
    "telegraf/sensors/#",
  ]

  # if true, messages that can't be delivered while the subscriber is offline
  # will be delivered when it comes back (such as on service restart).
  # NOTE: if true, client_id MUST be set
  persistent_session = false
  # If empty, a random client ID will be generated.
  client_id = ""

  ## username and password to connect MQTT server.
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  data_format = "influx"



-------------------------------------------------------------------------------------------------------------------
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
  ## The full HTTP or UDP URL for your InfluxDB instance.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  # urls = ["unix:///var/run/influxdb.sock"]
  # urls = ["udp://127.0.0.1:8089"]
  # urls = ["http://127.0.0.1:8086"]

  ## The target database for metrics; will be created as needed.
  # database = "telegraf"

  ## If true, no CREATE DATABASE queries will be sent.  Set to true when using
  ## Telegraf with a user without permissions to create databases or when the
  ## database already exists.
  # skip_database_creation = false

  ## Name of existing retention policy to write to.  Empty string writes to
  ## the default retention policy.  Only takes effect when using HTTP.
  # retention_policy = ""

  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
  ## Only takes effect when using HTTP.
  # write_consistency = "any"

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## HTTP Basic Auth
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## UDP payload size is the maximum packet size to send.
  # udp_payload = "512B"

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "identity"

  ## When true, Telegraf will output unsigned integers as unsigned values,
  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
  ## integer values.  Enabling this option will result in field type errors if
  ## existing data has been written.
  # influx_uint_support = false

and this is the code for publishing

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import  json

influxclient = InfluxDBClient(host='localhost', port=8086)
# This is the Publisher

dict_msg={"temperature":"20.5"}
msg = json.dumps(dict_msg)

MQTT_HOST = "iot.eclipse.org"
MQTT_PORT = 1883
MQTT_KEEPALIVE_INTERVAL = 45
MQTT_TOPIC = "sensors"

count = 0


# Define on_publish event function
def on_publish(client, userdata, mid):
    print("Message Published..")


# Initiate MQTT Client
mqttc = mqtt.Client()

# Register publish callback function
mqttc.on_publish = on_publish

# Connect with MQTT Broker
mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)

x = 0
while x <= 100000:
    # Publish message to MQTT Broker
    mqttc.publish(MQTT_TOPIC,msg)
    # influx_line_protocol = ("published,counts" = count)
    # print(count)
    x += 1

mqttc.loop(30)
# Disconnect from MQTT_Broker
mqttc.disconnect()

I don’t know what is wrong. i’m new in this.
help me out.

In your code, you’re publishing messages to iot.eclipse.org, not localhost; which telegraf is monitoring.

I changed it to localhost as well as tried with 127.0.0.1
No changed.
the console log freezes at

2019-02-07T10:23:39Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:“shekhar-Inspiron-3441”, Flush Interval:10s
2019-02-07T10:23:39Z I! [inputs.mqtt_consumer] Connected [tcp://127.0.0.1:1883]

That’s good, that means telegraf has connected to mqtt.

You’ll need to run your code to publish messages

Yeah. That’s what even after publishing as well I’m not receiving anything.
not even to file.
what might be the problem?

Is there anything that i have send specific formatted messages??
or make changes in templates?

any help appreciated.

Hi @shekhar-joshi,

There were 2 things wrong:

  1. You didn’t configure your InfluxDB output in telegraf.conf
  2. Your temperature reading is a string, not a number type

I’ve provided a working example of using the MQTT plugin for you here: