I have a personal project to gather data from an earthquake monitoring API and put it in real time on a kafka topic and then retrieve it on grafana.
For that I use telegraf and this plugin allowing to listen and to recover data of a topic, before sending them to an influxdb bucket and use it as a source for grafana
I did some tests and I know for sure that the data is received on the topic.
{
"type": "FeatureCollection",
"metadata": {
"generated": "1662815108000",
"url": "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=2022-09-10T12:05:07",
"title": "USGS Earthquakes",
"api": "1.13.6",
"count": "4",
"status": "200"
},
"bbox": [
-155.46483,
19.169,
0.4,
-139.4593,
62.2374,
75.9
],
"features": [
{
"type": "Feature",
"properties": {
"mag": "1.4",
"place": "29 km NNE of Skwentna, Alaska",
"time": "1662814254355",
"updated": "1662814370191",
"tz": "0",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ak022bmmeu6z",
"detail": "https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak022bmmeu6z&format=geojson",
"felt": "0",
"cdi": "0.0",
"mmi": "0.0",
"alert": "null",
"status": "automatic",
"tsunami": "0",
"sig": "30",
"net": "ak",
"code": "022bmmeu6z",
"ids": ",ak022bmmeu6z,",
"sources": ",ak,",
"types": ",origin,phase-data,",
"nst": "0",
"dmin": "0.0",
"rms": "0.25",
"gap": "0.0",
"magType": "ml",
"type": "earthquake"
},
"geometry": {
"type": "Point",
"coordinates": [
-151.2157,
62.2374,
75.9
]
},
"id": "ak022bmmeu6z"
},
{
"type": "Feature",
"properties": {
"mag": "2.1",
"place": "34 km NNE of Yakutat, Alaska",
"time": "1662812661453",
"updated": "1662812817687",
"tz": "0",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ak022bmm971d",
"detail": "https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak022bmm971d&format=geojson",
"felt": "0",
"cdi": "0.0",
"mmi": "0.0",
"alert": "null",
"status": "automatic",
"tsunami": "0",
"sig": "68",
"net": "ak",
"code": "022bmm971d",
"ids": ",ak022bmm971d,",
"sources": ",ak,",
"types": ",origin,phase-data,",
"nst": "0",
"dmin": "0.0",
"rms": "1.12",
"gap": "0.0",
"magType": "ml",
"type": "earthquake"
},
"geometry": {
"type": "Point",
"coordinates": [
-139.469,
59.8254,
47.2
]
},
"id": "ak022bmm971d"
},
{
"type": "Feature",
"properties": {
"mag": "2.2",
"place": "63 km NNE of Yakutat, Alaska",
"time": "1662812499261",
"updated": "1662812657494",
"tz": "0",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/ak022bmm8l9i",
"detail": "https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak022bmm8l9i&format=geojson",
"felt": "0",
"cdi": "0.0",
"mmi": "0.0",
"alert": "null",
"status": "automatic",
"tsunami": "0",
"sig": "74",
"net": "ak",
"code": "022bmm8l9i",
"ids": ",ak022bmm8l9i,",
"sources": ",ak,",
"types": ",origin,phase-data,",
"nst": "0",
"dmin": "0.0",
"rms": "0.49",
"gap": "0.0",
"magType": "ml",
"type": "earthquake"
},
"geometry": {
"type": "Point",
"coordinates": [
-139.4593,
60.1014,
0.4
]
},
"id": "ak022bmm8l9i"
},
{
"type": "Feature",
"properties": {
"mag": "2.15",
"place": "4 km SSE of P?hala, Hawaii",
"time": "1662812008820",
"updated": "1662812205480",
"tz": "0",
"url": "https://earthquake.usgs.gov/earthquakes/eventpage/hv73136307",
"detail": "https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=hv73136307&format=geojson",
"felt": "0",
"cdi": "0.0",
"mmi": "0.0",
"alert": "null",
"status": "automatic",
"tsunami": "0",
"sig": "71",
"net": "hv",
"code": "73136307",
"ids": ",hv73136307,",
"sources": ",hv,",
"types": ",origin,phase-data,",
"nst": "44",
"dmin": "0.0",
"rms": "0.12",
"gap": "100.0",
"magType": "md",
"type": "earthquake"
},
"geometry": {
"type": "Point",
"coordinates": [
-155.46483,
19.169,
31.82
]
},
"id": "hv73136307"
}
]
}
Here is my telegraf.conf that i use for my influxdb data source:
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
## Method of translating SNMP objects. Can be "netsnmp" which
## translates by calling external programs snmptranslate and snmptable,
## or "gosmi" which translates using the built-in gosmi library.
# snmp_translator = "netsnmp"
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# # Configuration for sending metrics to InfluxDB 2.0
[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
## ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://127.0.0.1:8086"]
## Token for authentication.
token = $INFLUX_TOKEN
## Organization is the name of the organization you wish to write to.
organization = "earthWatch"
## Destination bucket to write into.
bucket = "telegraf"
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["localhost:9092"]
## Topics to consume.
topics = ["general-events"]
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
max_message_len = 0
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
tag_keys = ["place", "url", "detail", "alert", "status", "net", "code", "ids", "sources", "types", "magType", "type"]
##json_query = "features.#.properties"
##json_string_fields = ["place", "url", "detail", "alert", "status", "net", "code", "ids", "sources", "types", "magType", "type"]
After multiple attempts and modification of the telegraf.conf file, I was able to retrieve some fields from the data, but some fields are missing and I don’t know what configuration allowed me to do this.
the image above shows a request just after the topic has received the data (so <1 minutes ago), but still impossible to display all the fields, as if they were not properly parsed which is strange because some floats are showed, others don’t and no strings fields whatsoever.
Any idea what might be missing?
Thank you