Hi,
I have scenario where shell script prints the data and this data being written into cloud watch. Once this data is being written into Cloudwatch, I don’t want to keep ingesting the same data over again and again, this would be fill up the storage in Cloudwatch.
How to achieve this please? or any preprocessor can used to control output data into datasource.
thanks
You could potentially use a starlark processor to compare the current processor to the previous metric. If there is no difference, then exit.
Here is an example showing a calculation storing and then using the previous metric.
1 Like
thanks for your inputs. I will try this example.
@jpowers I have tried the below example. but still data is being written into cloudwatch.
load("logging.star", "log")
state = {
"last": None
}
def apply(metric):
last = state["last"]
state["last"] = deepcopy(metric)
for k,v in metric.fields.items():
if k == "file_storage_size":
log.debug("debug: key is {} and the value {}".format(k,v))
if last.fields["file_storage_size"] != None:
log.debug("debug: value is {}".format(last.fields["file_storage_size"]))
if last.fields["file_storage_size"] == metric.fields["file_storage_size"]:
log.debug("values are matched")
pass
Can you please help with this ?
Let’s pretend I have the following metric in a file:
example temp=2
And I only want to send the metric if the new value is different. I can use the starlark processor to compare the temp
field with the previous value and if they are the same, return None, otherwise return the current metric:
[[processors.starlark]]
source = '''
state = {
"last": None
}
def apply(metric):
last = state["last"]
state["last"] = deepcopy(metric)
if last == None:
return metric
# if the temp fields are identical, delete metric
# otherwise, return the new value
if last.fields["temp"] == metric.fields["temp"]:
return None
return metric
'''
example temp=2 1660580580000000000
2022-08-15T16:23:03Z D! [outputs.file] Wrote batch of 1 metrics in 33.66µs
2022-08-15T16:23:03Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2022-08-15T16:23:13Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
# the value has not changed for a cycle, now I update it to 3
example temp=3 1660580600000000000
2022-08-15T16:23:23Z D! [outputs.file] Wrote batch of 1 metrics in 23.77µs
2022-08-15T16:23:23Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
# now I update it again to 4, and leave it alone
example temp=4 1660580610000000000
2022-08-15T16:23:33Z D! [outputs.file] Wrote batch of 1 metrics in 23.08µs
2022-08-15T16:23:33Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2022-08-15T16:23:43Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2022-08-15T16:23:53Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2022-08-15T16:24:03Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
thank you very much @jpowers .
Hi @jpowers.
Looks like I have hit the problem again with the starlark plugin
I have an scenario where telegraf contains multiple inputs.exec plugin calling different python programs and each one of the sends the metrics to cloudwatch.
How to get the last metric based on specific inputs.exec plugin so that last metric is always compared against the specific [[inputs.exec]] entry in order avoid duplicate entries in datasource cloudwatch
[[inputs.exec]]
commands = [
"/usr/bin/python3 /etc/telegraf/sample.py"
]
data_format = "influx"
[[inputs.exec]]
commands = [
"/usr/bin/python3 /etc/telegraf/sample2.py"
]
data_format = "influx"
[[processors.starlark]]
script = "/etc/telegraf/dont_update.star"
Any thoughts on this problem please ?
thanks
I’m not sure I understand the situation.
Can you share the contents of your starlark and what it is trying to do?
@jpowers
Sorry about this.
Contents of starlark are as below. I have 2 entries of [inputs.exe] which is looking up for same starlark file to avoid the duplicate entries in cloud watch.
if the both inputs.exec polling at same interval, how does starlark knows which is the last metric coming from input.exec1 or input.exec2 entry?. thanks
I hope have clarified this time better way
how to solve this problem ?
load("logging.star", "log")
state = {
"last": None
}
def apply(metric):
last = state["last"]
state["last"] = deepcopy(metric)
for k,v in metric.fields.items():
if k == "file_storage_size":
log.debug("debug: key is {} and the value {}".format(k,v))
if last.fields["file_storage_size"] != None:
log.debug("debug: value is {}".format(last.fields["file_storage_size"]))
if last.fields["file_storage_size"] == metric.fields["file_storage_size"]:
return None
return metric
Thanks for the background.
if the both inputs.exec polling at same interval, how does starlark knows which is the last metric coming from input.exec1 or input.exec2 entry?. thanks
By default, all the metrics collected will go through the starlark processor together. If you want to differentiate between collections, you can set the metric name to something different, via the name_override
option to give it a custom name. And then the namepass
option on the starlark processor to control which ones go through with processors.
Do you want the starlark processor to run against all the metrics captured from both sample.py and sample2.py:
exec for sample.py --\
--> starlark --> output
exec for sample2.py --/
This is the current behavior and all metrics will go in together and get parsed by that logic.
Or do you want each exec
plugin to use the starlark processor independently like:
exec for sample.py --> starlark --\
--> output
exec for sample2.py --> starlark --/
Let me know, and I hope that helps lay out the current flow.
@jpowers
Thanks a lot for your quick response
I wanted use the option exec plugin to use starlark independently as processor.
Can you please provide me an example of name_override And namepass. This will use full to me
. Also I will try out tomorrow with name_override and namepass option. I will keep you posted with my issues if any.
Can you please provide me an example of name_override And namepass. This will use full to me
Something like the following:
[[inputs.exec]]
name_override = "sample"
commands = [
"/usr/bin/python3 /etc/telegraf/sample.py"
]
data_format = "influx"
[[processors.starlark]]
namepass = ["sample"]
script = "/etc/telegraf/dont_update.star"
[[inputs.exec]]
name_override = "sample2"
commands = [
"/usr/bin/python3 /etc/telegraf/sample2.py"
]
data_format = "influx"
[[processors.starlark]]
namepass = ["sample2"]
script = "/etc/telegraf/dont_update.star"
1 Like
@jpowers above solution that you recommended works like a charm. thanks a lot again.
Thanks for following up! Glad it worked