How to aggregate & rename fields of an input

Hello,

I am trying to figure out a way to run an aggregation & processor on a specific input.

Here’s an example of the current output

> cpu,cpu=cpu1,host=somehost,index=telegraf,sourcetype=telegraf-hec usage_active=0,usage_guest=0
> cpu,cpu=cpu0,host=somehost,index=telegraf,sourcetype=telegraf-hec usage_active=0,usage_guest=0

Desired output
> cpu, host=somehost,index=telegraf,sourcetype=telegraf-hec cpu0-usage_active=0,cpu0-usage_guest=0,cpu1-usage_active=0,cpu1-usage_guest=0

I have seen examples that allows me to create tags out of fields, but not the other way around. Any pointers are appreciated.

Thank you!
b

Hello @b1scu1t,
Welcome!
I think for this type of work your best bet is the execd processor plugin which makes telegraf extensible in any language.

Hello @Anaisdg,

Thank you very much for that suggestion, I did give it a whirl but I am stumbling somewhere. Here are the gory details.

Here we go. I would appreciate any pointers on where I am messing up.

Further the way I am calling processors.execd is it going to be executed only for the inputs.cpu or every possible inputs?

b1scu1t

ps: for anybody else reading this and plan to use python, please look into the line-protocol-parser · PyPI package, once I figure out my issues with execd I plan to use that to.

I test my configs and it looks good.

telegraf --config telegraf.conf  --test
    2021-04-24T15:24:36Z I! Starting Telegraf 1.18.1
    2021-04-24T15:24:36Z I! [processors.execd] Starting process: python3 [cpu.py]
    > cpu,host=storage cpu0_usage_active=0,cpu0_usage_guest=0,cpu0_usage_guest_nice=0,cpu0_usage_idle=100,cpu0_usage_iowait=0,cpu0_usage_irq=0,cpu0_usage_nice=0,cpu0_usage_softirq=0,cpu0_usage_steal=0,cpu0_usage_system=0,cpu0_usage_user=0,cpu1_usage_active=0,cpu1_usage_guest=0,cpu1_usage_guest_nice=0,cpu1_usage_idle=100,cpu1_usage_iowait=0,cpu1_usage_irq=0,cpu1_usage_nice=0,cpu1_usage_softirq=0,cpu1_usage_steal=0,cpu1_usage_system=0,cpu1_usage_user=0,cpu2_usage_active=1.960784293136852,cpu2_usage_guest=0,cpu2_usage_guest_nice=0,cpu2_usage_idle=98.03921570686315,cpu2_usage_iowait=0,cpu2_usage_irq=0,cpu2_usage_nice=0,cpu2_usage_softirq=0,cpu2_usage_steal=0,cpu2_usage_system=0,cpu2_usage_user=1.9607843141800627,cpu3_usage_active=4.00000000372529,cpu3_usage_guest=0,cpu3_usage_guest_nice=0,cpu3_usage_idle=95.99999999627471,cpu3_usage_iowait=0,cpu3_usage_irq=0,cpu3_usage_nice=0,cpu3_usage_softirq=0,cpu3_usage_steal=0,cpu3_usage_system=0,cpu3_usage_user=4.0000000000873115 1619277877000000000
    2021-04-24T15:24:37Z I! [processors.execd] Process /usr/bin/python3 shut down

And now when I actually run it, it just sits there.

telegraf --config telegraf.conf  
2021-04-24T15:25:06Z I! Starting Telegraf 1.18.1
2021-04-24T15:25:06Z I! Loaded inputs: cpu
2021-04-24T15:25:06Z I! Loaded aggregators: 
2021-04-24T15:25:06Z I! Loaded processors: execd
2021-04-24T15:25:06Z I! Loaded outputs: file
2021-04-24T15:25:06Z I! Tags enabled: host=storage
2021-04-24T15:25:06Z I! [agent] Config: Interval:2s, Quiet:false, Hostname:"storage", Flush Interval:1s
2021-04-24T15:25:06Z I! [processors.execd] Starting process: python3 [cpu.py]

telegraf.conf

[global_tags]
[agent]
  interval = "2s"
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 1000
  collection_jitter = "0s"
  flush_interval = "1s"
  flush_jitter = "0s"
  precision = ""
  debug = false
  hostname = ""
  omit_hostname = false

##[[inputs.system]]

# Read metrics about cpu usage
[[inputs.cpu]]
  ## Whether to report per-cpu stats or not
  percpu = true
  ## Whether to report total system cpu stats or not
  totalcpu = false
  ## If true, collect raw CPU time metrics
  collect_cpu_time = false
  ## If true, compute and report the sum of all non-idle CPU states
  report_active = true
    [[processors.execd]]
        command = ["python3","cpu.py"]
       
[[outputs.file]]
    splunkmetric_multimetric = true
    splunkmetric_hec_routing = true
    data_format = "influx"
    files = ["stdout"]

cpu.py

import sys
    def main():

        final_metrics_dict = dict()
        
        for line in sys.stdin:
            header_list = line.rstrip().split(" ")[0].split(",")
            metrics_list = line.rstrip().split(" ")[1].split(",")
            footer = line.rstrip().split(" ")[2]
            metric_name = header_list[0]
            dimensions_dict = dict(s.split("=") for s in header_list[1:])
            metrics_dict = dict(s.split("=") for s in metrics_list)
            cpuid = dimensions_dict['cpu']
            dimensions_dict.pop('cpu')
            for k,v in metrics_dict.items():
                new_key = cpuid + "_" + k
                metrics_dict[new_key] = metrics_dict.pop(k)

            final_metrics_dict.update(metrics_dict)

        results = ''
        results = results + metric_name + ','
       
        for k,v in dimensions_dict.items():
            results = results + k + '=' + v + ','

        results = results + ' '
        for k,v in final_metrics_dict.items():
            results = results + k + '=' + v + ','

        results = results + ' ' + footer
        results = results.replace(', ', ' ')
        print(results)
        sys.stdout.flush()
     
    if __name__ == '__main__':
        main()

Hello @b1scu1t,
Thanks for sharing the line protocol parser as well as your solution. Indeed–it’s super useful. I’m not sure what the problem is here. I’ll share your question with the telegraf team.

I hate to offer a completely new solution as you’ve invested time in the execd processor plugin, but I might also suggest writing your data to InfluxDB as is and then using a task to transform your data.
For example this query would do what you want:

import "strings"
from(bucket: "telegraf")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_system")
  |> filter(fn: (r) => r["cpu"] == "cpu-total" or r["cpu"] == "cpu1" or r["cpu"] == "cpu2")
  |> map(fn: (r) => ({ r with _field: strings.joinStr(arr: [r._field, r.cpu], v: "_") }))
  |> yield(name: "field rename")

You could write data to InfluxDB to a bucket with a very small retention policy, use a task to transform it, and then keep the processed data in a new bucket with a longer retention policy.