Teletgraf reads metric.out file, filter out UUID, and then use telegraf’s count mechanism to automatically identify the data with consistent duplicate data, and then automatically merge. Finally, output the merged data to influxdb.
How to realize it
Hi @bay,
I think we were talking in the Community Slack.
The thing that I noticed is your current order is:
- Reads data
- Updates count as a part of that pluign
- Remove UUID - this causes duplicate metrics
- Send metrics to InfluxDB where duplicate metrics get dropped.
Are you reading in from a file or is this the statsd plugin that you referenced in the Slack? My only thought right now is that you need to remove the UUID earlier or somehow use a starlark processor to combine metrics.
thanks for yoyr reply.
I think that set the statsd plugin’s time precision will do it. how to set precision to ns…
we have try set ‘precision’=‘ns’. but it didn’t take effect
You can override precision on a specific input plugin by adding:
precision = "1ns"
However, because statsd is a service input, this setting will not affect it. Instead it is expected that your incoming data use that level of precision.
I have tried the way -“precision”=“1ms”, and confirmed incoming data use 'ms’level.
but failed. The data output to metric.out and influxdb all ‘m’ level.
how can I set the statsd plugin’s precision , beside set “precision”=“1ms”,because it didn’t take effect
Right this should not work since statsd is a service plugin. The timestamps are based on the data you are sending Telegraf.
What you might be able to do is:
- Reads data
- Updates count as a part of that pluign
- Update the timestamp of the metric
- Remove UUID
- Send metrics the metrics
You could first use a processor to update the timestamp of all the metrics to the current time using nano second precision. Then remove the UUID.
hi jpowers
can you provide demo for us? I am using statsd-plugin as inputs .
I hope more detail examples to reference which can realize the pricision or ‘counter’ auto-increased after drop uuid tag
‘counter’ auto-increased
This would require a much more custom starlark script that would require you to keep track of every metric you come across and compare it to all the previous ones. This is not the route I would go down.
on this configuration @jpowers
statsd.buffer = test,apiName=testApi,uri=/api/testApi, time|“ms”
statsd.flush(“udp”, “127.0.0.1”,8126)
my telegraf.conf
[[outputs.infulxdb]]
url = [http://127.0.0.1:8086]
[[outputs.file]]
files = [/log/metrics.log]
data_format = “json”
[[inputs.statsd]]
protocol = udp
service_address = “:8126”
metrics.log
{“fields”:{“count”:1,“lower”:0.05622322323234,“mean”:0.05622322323234,“stddev”:0,“sum”:0.05622322323234,“upper”:0.05622322323234},“name”:“test”,“tags”:{“apiName”:“testApi”,“uri”:“/api/testApi”,“uuid”:“shj-dsa-dsad”},“timestamp”:1658214837}
hava example of case?
@jpowers the config see @smallbirdwy
we are the same team, and met the same issue.
hope your reply. thanks
Let’s pretend the below is my data and your goal is to have both entries get recorded without the uuid
tag.
test,apiName=testApi,uri=/api/testApi,uuid=shj-dsa-dsad count=1
test,apiName=testApi,uri=/api/testApi,uuid=shj-dsa-dddd count=1
The below will remove the uuid
tag and run a starlark processor which updates each metric’s timestamp to the current time using nanoseconds.
[[inputs.file]]
files = ["data.json"]
data_format = "influx_upstream"
tagexclude = ["uuid"]
[[processors.starlark]]
order = 1
source='''
load('time.star', 'time')
def apply(metric):
metric.time = time.now().unix_nano
return metric
'''
This will produce two valid metrics that can be pushed to InfluxDB:
test,apiName=testApi,uri=/api/testApi count=1 1658849357523545498
test,apiName=testApi,uri=/api/testApi count=1 1658849357523552978
my config
[[inputs.file]]
files = ["metrics.log"]
data_format = "json"
json_name_key = "name"
tagexclude = ["uuid"]
tag_keys = ["apiName","uri","uuid"]
json_time_key = "timestamp"
my metrics.log
{“fields”:{“count”:1,“lower”:0.05622322323234,“mean”:0.05622322323234,“stddev”:0,“sum”:0.05622322323234,“upper”:0.05622322323234},“name”:“test”,“tags”:{“apiName”:“testApi”,“uri”:“/api/testApi”,“uuid”:“shj-dsa-dsad1”},“timestamp”:1658214837}
{“fields”:{“count”:1,“lower”:0.05622322323234,“mean”:0.05622322323234,“stddev”:0,“sum”:0.05622322323234,“upper”:0.05622322323234},“name”:“test”,“tags”:{“apiName”:“testApi”,“uri”:“/api/testApi”,“uuid”:“shj-dsa-dsad4”},“timestamp”:1658214837}
{“fields”:{“count”:1,“lower”:0.05622322323234,“mean”:0.05622322323234,“stddev”:0,“sum”:0.05622322323234,“upper”:0.05622322323234},“name”:“test”,“tags”:{“apiName”:“testApi”,“uri”:“/api/testApi”,“uuid”:“shj-dsa-dsad9”},“timestamp”:1658214837}
Error in plugin could not parse “metrics.log” : invalid character ‘’{" after top-level value
Ah, if your metrics are actually in JSON like that then it is invalid JSON. Try running your file through a validator like this or this.
Can’t use tools, can only use code,this JSON from plugin
[[outputs.file]]
files = [/log/metrics.log]
data_format = “json”`
why Error parse?
The original log format read from the file is like this.
Now it needs to be written to influxdb through the plug-in, and make sure the same data cannot be overwritten(by timestamp or count auto-increased).
How to realize it and which plugin can do? hope your detail examples.
why Error parse?
This is not a valid JSON file. What your data should look like is actually an array of objects:
[
{“fields”:{“count”:1,“lower”:0.05622322323234,“mean”:0.05622322323234,“stddev”:0,“sum”:0.05622322323234,“upper”:0.05622322323234},“name”:“test”,“tags”:{“apiName”:“testApi”,“uri”:“/api/testApi”,“uuid”:“shj-dsa-dsad1”},“timestamp”:1658214837}
...
]
If you cannot change that formatting, my suggestion is to write a script in your favorite programming language (python, go, java, etc.) and read these in line by line and send them to InfluxDB using the client libraries.
Here is an example using python:
import json
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
BUCKET = "testing"
with open("data.json") as file:
data_file = file.readlines()
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api(write_options=SYNCHRONOUS) as writer:
for line in data_file:
data = json.loads(line)
# drop the timestamp to get unique measurement points
data.pop("timestamp")
# remove the uuid tag
data["tags"].pop("uuid")
# rename name -> measurement
data["measurement"] = data.pop("name")
result = writer.write(bucket=BUCKET, record=data)
print(result)
querier = client.query_api()
tables = querier.query(f'from(bucket: "{BUCKET}") |> range(start: -1h)')
for record in tables[0].records:
print(f'{record.get_measurement()},apiName={record.values.get("apiName")},uri={record.values.get("uri")} {record.get_field()}={record.get_value()} {record.get_time()}')
thanks a lot.
I think you have understood our issues. so have you any suggestions to realize?
the gateway send data to telegraf by UDP. and telegraf records in metric.out .meanwhile write into influxdb. because the uuid is a tag ,it would make series too big result in error.so I want to drop uuid and ‘count’ auto-increase. but metric.out can print uuid.
hope your demo for us .thanks