Influxdb Kafka monitoring dashboard

Hi everyone. I am using a kafka broker as an input plugin…i wanted to monitor the kafka broker, if data doesnot comes in past 15m i want to setup an alert. For monitoring i found this template - Kafka Monitoring Template | InfluxData. I am facing issue in understanding the usage of template and how to use it. The telegraf config for this template is this -

[[inputs.jolokia2_agent]]
default_tag_prefix = ""
default_field_prefix = ""
default_field_separator = "_"
urls = ["$KAFKA_JOLOKIA_HOSTS"]

# (still have to add Type=DelayedOperationPurgatory)
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_replica_manager"
  mbean        = "kafka.server:type=ReplicaManager,name=*"
  paths        = ["Count"]
  field_name = "$1"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_replica_manager"
  mbean        = "kafka.server:type=ReplicaManager,name=PartitionCount"
  field_name   = "PartitionCount"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_replica_manager"
  mbean        = "kafka.server:type=ReplicaManager,name=LeaderCount"
  field_name   = "LeaderCount"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_replica_manager"
  mbean        = "kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions"
  field_name   = "UnderReplicatedPartitions"

[[inputs.jolokia2_agent.metric]]
  name         = "kafka_broker"
  mbean        = "kafka.server:type=kafka-metrics-count"
  field_name   = "kafka_metrics_count"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_broker"
  mbean        = "kafka.server:type=BrokerTopicMetrics,name=*"
  field_name   = "kafka_metrics_count"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_broker"
  mbean        = "kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=*"
  field_name   = "$1"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_broker"
  mbean        = "kafka.server:type=Produce"
  field_name   = "ProduceQueueSize"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_topics"
  mbean        = "kafka.server:type=BrokerTopicMetrics,name=*,topic=*"
  paths        = ["Count"]
  field_name   = "$1"
  tag_keys     = ["topic"]
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_purgatory"
  mbean        = "kafka.server:name=PurgatorySize,type=DelayedOperationPurgatory,delayedOperation=*"
  tag_keys     = ["delayedOperation"]
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_controller"
  mbean        = "kafka.controller:type=KafkaController,name=*"
  field_name   = "$1"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_controller"
  mbean        = "kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs"
  paths        = ["Count"]
  tag_keys     = ["name"]
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_controller"
  mbean        = "kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
  paths        = ["Count"]
  tag_keys     = ["name"]

[[inputs.jolokia2_agent.metric]]
  name         = "kafka_network"
  mbean        = "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=*,error=*"
  paths        = ["Count"]
  tag_keys     = ["name", "request"]
  field_name   = "$2"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_network"
  mbean        = "kafka.network:type=RequestMetrics,name=TotalTimeMs,request=*"
  paths        = ["Count"]
  tag_keys     = ["name", "request"]

# from kafka.network
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_requests"
  mbean        = "kafka.network:type=RequestMetrics,name=RequestBytes,request=*"
  tag_keys     = ["request"]
  field_prefix = "Bytes"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_requests"
  mbean        = "kafka.network:type=RequestMetrics,name=RequestQueueTimeMs,request=*"
  tag_keys     = ["request"]
  field_prefix = "QueueTime"
[[inputs.jolokia2_agent.metric]]
  name         = "kafka_requests"
  mbean        = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=*,version=*"
  tag_keys     = ["request"]
  paths        = ["Count"]
  field_name   = "Count"

[[inputs.jolokia2_agent.metric]]
  name       = "kafka_partition"
  mbean      = "kafka.log:name=*,partition=*,topic=*,type=Log"
  field_name = "$1"
  tag_keys   = ["topic", "partition"]
[[inputs.jolokia2_agent.metric]]
  name       = "kafka_partition"
  mbean      = "kafka.cluster:name=UnderReplicated,partition=*,topic=*,type=Partition"
  field_name = "UnderReplicatedPartitions"
  tag_keys   = ["topic"]

# JVM
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_runtime"
  mbean = "java.lang:type=Runtime"
  paths = ["Uptime", "StartTime", "VmName", "SpecVersion"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_os"
  mbean = "java.lang:type=OperatingSystem"
  paths = ["AvailableProcessors", "SystemLoadAverage", "FreePhysicalMemorySize", "TotalPhysicalMemorySize", "CommittedVirtualMemorySize", "Name", "Version", "ProcessCpuLoad", "SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_mem"
  mbean = "java.lang:type=Memory"
  paths = ["HeapMemoryUsage", "NonHeapMemoryUsage"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_threading"
  mbean = "java.lang:type=Threading"
  paths = ["TotalStartedThreadCount", "PeakThreadCount", "ThreadCount", "DaemonThreadCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_gc"
  mbean    = "java.lang:name=*,type=GarbageCollector"
  paths    = ["CollectionTime", "CollectionCount"]
  tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
  name  = "java_classes"
  mbean = "java.lang:type=ClassLoading"
  paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_mem_pool"
  mbean    = "java.lang:name=*,type=MemoryPool"
  paths    = ["Usage"]
  tag_keys = ["name"]

#
# KAFKA PRODUCERS
#
[[inputs.jolokia2_agent]]
  default_tag_prefix = ""
  default_field_prefix = ""
  default_field_separator = "_"
  urls = ["$KAFKA_PRODUCERS_JOLOKIA_HOSTS"]
[[inputs.jolokia2_agent.metric]]
  name  = "kafka_producer"
  mbean = "kafka.producer:type=producer-metrics,client-id=*"
  paths = ["compression-rate-avg", "response-rate", "request-rate", "request-latency-avg", "outgoing-byte-rate", "io-wait-ratio", "batch-size-avg"]
  tag_keys = ["client-id"]

# JVM
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_runtime"
  mbean = "java.lang:type=Runtime"
  paths = ["Uptime", "StartTime", "VmName", "SpecVersion"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_os"
  mbean = "java.lang:type=OperatingSystem"
  paths = ["AvailableProcessors", "SystemLoadAverage", "FreePhysicalMemorySize", "TotalPhysicalMemorySize", "CommittedVirtualMemorySize", "Name", "Version", "ProcessCpuLoad", "SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_mem"
  mbean = "java.lang:type=Memory"
  paths = ["HeapMemoryUsage", "NonHeapMemoryUsage"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_threading"
  mbean = "java.lang:type=Threading"
  paths = ["TotalStartedThreadCount", "PeakThreadCount", "ThreadCount", "DaemonThreadCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_gc"
  mbean    = "java.lang:name=*,type=GarbageCollector"
  paths    = ["CollectionTime", "CollectionCount"]
  tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
  name  = "java_classes"
  mbean = "java.lang:type=ClassLoading"
  paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_mem_pool"
  mbean    = "java.lang:name=*,type=MemoryPool"
  paths    = ["Usage"]
  tag_keys = ["name"]

#
# KAFKA CONSUMERS
#
[[inputs.jolokia2_agent]]
  default_tag_prefix = ""
  default_field_prefix = ""
  default_field_separator = "_"
  urls = ["$KAFKA_CONSUMERS_JOLOKIA_HOSTS"]
[[inputs.jolokia2_agent.metric]]
  name  = "kafka_consumer"
  mbean = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*"
  paths = ["bytes-consumed-rate", "records-consumed-rate"]
  tag_keys = ["client-id", "topic"]
[[inputs.jolokia2_agent.metric]]
  name  = "kafka_consumer"
  mbean = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,partition=*,topic=*"
  paths = ["records-lag-avg"]
  tag_keys = ["client-id", "partition", "topic"]
[[inputs.jolokia2_agent.metric]]
  name  = "kafka_consumer"
  mbean = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*"
  paths = ["fetch-rate"]
  tag_keys = ["client-id"]

# JVM
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_runtime"
  mbean = "java.lang:type=Runtime"
  paths = ["Uptime", "StartTime", "VmName", "SpecVersion"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_os"
  mbean = "java.lang:type=OperatingSystem"
  paths = ["AvailableProcessors", "SystemLoadAverage", "FreePhysicalMemorySize", "TotalPhysicalMemorySize", "CommittedVirtualMemorySize", "Name", "Version", "ProcessCpuLoad", "SystemCpuLoad"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_mem"
  mbean = "java.lang:type=Memory"
  paths = ["HeapMemoryUsage", "NonHeapMemoryUsage"]
[[inputs.jolokia2_agent.metric]]
  name  = "jvm_threading"
  mbean = "java.lang:type=Threading"
  paths = ["TotalStartedThreadCount", "PeakThreadCount", "ThreadCount", "DaemonThreadCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_gc"
  mbean    = "java.lang:name=*,type=GarbageCollector"
  paths    = ["CollectionTime", "CollectionCount"]
  tag_keys = ["name"]
[[inputs.jolokia2_agent.metric]]
  name  = "java_classes"
  mbean = "java.lang:type=ClassLoading"
  paths = ["LoadedClassCount", "UnloadedClassCount", "TotalLoadedClassCount"]
[[inputs.jolokia2_agent.metric]]
  name     = "java_mem_pool"
  mbean    = "java.lang:name=*,type=MemoryPool"
  paths    = ["Usage"]
  tag_keys = ["name"]


I am confused which plugin to choose and what value should i put there. I have kafka broker and topic name and i only want to see whether data loss is there recently in past 15 min. Can anybody help me here in dealing with this config.

Hello @Pratik_Das_Baghel,
What are you having trouble understanding specifically?
How to run telegraf?
How to edit the config?
Or how to create a deadman alert?
The easiest way to create this type of alert is through the UI. You’ll have to configure a deadman check (to check if the data hasn’t been reporting for 15 min), a notification endpoint (where you configure the endpoint where you want to receive your alert) and finally a notification rule (the configuration details for what you want the notification to look like). The Alerts page will walk you through this.
Here’s a detailed blog on how to configure a slack notification endpoint for example: