Can telegraf be used to aggregate max iops?

Hello and thanks for telegraf v1.17.3,

I am trying to capture the peak/max IOPS seen over a minute, not the mean as my use case is bursty, but the highest 1s (or 2s) rate recorded over the last minute.

I am using telegraf on linux with the diskio input to collect the reads and writes at 1s interval:

[[inputs.diskio]]
  interval = "1s"
  fieldpass = ["reads", "writes"]

Then calculating the rate over a 2s period (1s did not work) using:

[[aggregators.basicstats]]
  period = "2s"
  drop_original = true
  stats = ["rate"]

when combined with the “file” outputs plugin and files = ["stdout"] this produces:

diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=17 1614101894000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=82 1614101896000000000
diskio,host=X,name=nvme1n1p1 reads_rate=64,writes_rate=502 1614101898000000000
diskio,host=X,name=nvme1n1p1 writes_rate=138,reads_rate=1 1614101900000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=1 1614101902000000000

from these 2s read and write rates I would then like to aggregate to find the max over 60s. I do not think it is possible to feed the output from one aggregator into another, I tried using the minmax aggregator, but it did not operate on the aggregate fields.

I would appreciate any feedback on if this is possible or not without writing a new telegraf aggregator or enhancing an existing one.

Thanks.

I would also be interested to know if and how you can chain processor or aggregator plugins in series!?


If that is not possible, the following two options would come to mind:

  1. Option: Postpone the maximum search over the 60 seconds until later and do it in InfluxDB with a flux query.

  2. Option: First take a processors.starlark plugin with which you calculate the reads/writes per second and then the aggregators.basicstats plugin for the maximum values. Approximately as outlined here, but I have only written it theoretically, it is not tested, no guarantee that it works… :wink:

[[inputs.diskio]]
  interval = "1s"
  fieldpass = ["reads", "writes"]

[[processors.starlark]]
  interval = "1s"
  drop_original = true
  source = '''
state = {
  "last": None
}
def apply(metric):
    # Load from the shared state the metric assigned to the key "last"
    last = state["last"]
    # Store the deepcopy of the new metric into the shared state and assign it to the key "last"
    # NB: To store a metric into the shared state you have to deep copy it
    state["last"] = deepcopy(metric)
    if last != None:
        # Create the new metrics
        readssec = Metric("readssec")
        writessec = Metric("writessec")
        # Calculate reads/writes per second
        readssec.fields["readssec"] = metric.fields["reads"] - last.fields["reads"]
        writessec.fields["writessec"] = metric.fields["writes"] - last.fields["writes"]
        return [readssec, writessec]
'''

[[aggregators.basicstats]]
  interval = "60s"
  drop_original = true
  stats = "max"
1 Like

Thank you very much Franky1.
After keying off the disk name and adding it and the host as tags I got something working:

[[inputs.diskio]]
  interval = "1s"
  fieldpass = ["reads", "writes"]

[[processors.starlark]]
  interval = "1s"
  drop_original = true
  source = '''
state = { }
def apply(metric):
    # Needed to stop processing attempt on output of aggregators.basicstats
    if None ==  metric.fields.get("reads"):
        return [metric]
    name = metric.tags["name"]
    # Load from the shared state the metric assigned to the last key for the disk name
    last = state.get(name)
    # Store the deepcopy of the new metric into the shared state and assign it to the "last" state for the disk name
    # NB: To store a metric into the shared state you have to deep copy it
    state[name] = deepcopy(metric)
    if last != None:
        # Create the new metrics
        diskiops = Metric("diskiops")
        # Calculate reads/writes per second
        diskiops.fields["readssec"] = metric.fields["reads"] - last.fields["reads"]
        diskiops.fields["writessec"] = metric.fields["writes"] - last.fields["writes"]
        diskiops.tags["name"] = name
        diskiops.tags["host"] = metric.tags["host"]
        return [diskiops]
'''

[[aggregators.basicstats]]
  period = "60s"
  drop_original = true
  stats = ["max"]

[[outputs.file]]
  files = ["stdout"]

produces:

diskiops,host=X,name=nvme1n1p1 readssec_max=141,writessec_max=409 1614124500000000000
diskiops,host=X,name=nvme1n1p1 readssec_max=338,writessec_max=436 1614124560000000000
diskiops,host=X,name=nvme1n1p1 readssec_max=80,writessec_max=406 1614124620000000000
diskiops,host=X,name=nvme1n1p1 writessec_max=400,readssec_max=161 1614124680000000000
diskiops,host=X,name=nvme1n1p1 readssec_max=80,writessec_max=553 1614124740000000000

For my bursty workload this is going to be helpful in viewing and provisioning IOPS, compared to 60s mean IO rates per second.

X> iostat -dmx 60 999
Device            r/s     w/s
nvme1n1          3.03   72.92

Updated example:

[agent]
  interval = "60s"
  flush_interval = "60s"

[[inputs.diskio]]
  # default configuration

[[inputs.diskio]]
  alias = "diskio1s"
  interval = "1s"
  fieldpass = ["reads", "writes"]
  name_suffix = "1s"

[[processors.starlark]]
  namepass = ["diskio1s"]
  interval = "1s"
  drop_original = true
  source = '''
state = { }
def apply(metric):
    name = metric.tags["name"]
    # Load from the shared last_state the metric for the disk name
    last = state.get(name)
    # Store the deepcopy of the new metric into the shared last_state and assign it to the key "last"
    # NB: To store a metric into the shared last_state you have to deep copy it
    state[name] = deepcopy(metric)
    if last != None:
        # Create the new metrics
        diskiops = Metric("diskiops")
        # Calculate reads/writes per second
        reads_delta = metric.fields["reads"] - last.fields["reads"]
        writes_delta = metric.fields["writes"] - last.fields["writes"]
        io_delta = reads_delta + writes_delta
        delta_seconds = ( metric.time - last.time ) / 1000000000
        diskiops.fields["readsps"] = ( reads_delta / delta_seconds )
        diskiops.fields["writesps"] = ( writes_delta / delta_seconds )
        diskiops.fields["iops"] = ( io_delta / delta_seconds )
        diskiops.tags["name"] = name
        diskiops.tags["host"] = metric.tags["host"]
        return [diskiops]
'''

[[aggregators.basicstats]]
  namepass = ["diskiops"]
  period = "60s"
  drop_original = true
  stats = ["max"]

[[outputs.file]]
  files = ["stdout"]

Produces:

diskiops,host=X,name=nvme1n1 readsps_max=16,writesps_max=429,iops_max=429 1614178320000000000
diskio,host=X,name=nvme1n1 merged_writes=0i,writes=249720653i,read_bytes=4837247875072i,write_bytes=59582546882560i,read_time=67412298i,write_time=620906884i,io_time=112965696i,weighted_io_time=550023159i,reads=18873944i,merged_reads=0i,iops_in_progress=0i 1614178320000000000

diskiops,host=X,name=nvme1n1 readsps_max=16,writesps_max=416,iops_max=416 1614178380000000000
diskio,host=X,name=nvme1n1 merged_reads=0i,writes=249730575i,read_time=67412445i,weighted_io_time=550044662i,iops_in_progress=0i,io_time=112969467i,merged_writes=0i,reads=18873999i,read_bytes=4837260531712i,write_bytes=59585046716416i,write_time=620933253i 1614178380000000000

I created:
Add a starlark example showing how to obtain IOPS (to aggregate, to produce max_iops). · Issue #8903 · influxdata/telegraf · GitHub
to request adding this as an example.

1 Like

See telegraf/AGGREGATORS_AND_PROCESSORS.md at master · influxdata/telegraf · GitHub

You can chain multiple processors first (by using order) and then you can chain multiple aggregators, but you cannot mix this.

So aggregator1 → aggregator2 should be possible. Starlark is a good and powerful tool, but should only be used as a last resort.

And even if you would want 2 times the same plugin to work, you just duplicate it’s config section:

[[aggregators.basicstats]]
  order = 1
  period = "2s"
  drop_original = true
  stats = ["rate"]

[[aggregators.basicstats]]
  order = 2
  interval = "60s"
  drop_original = true
  stats = ["max"]
1 Like

Thank you for the reply Hipska,

I spent some time attempting to use order (and alias) to “daisy chain” a _rate metric into a basicstats max aggegator, but I could not get it to work. I tried using the same and different intervals and also using delay but everytime when the basicstats max aggegator did operate it operated on the original metric and not the one produced by the previous basicstats rate aggegator. So I never get reads_rate_max. If you have a working example then I would be interested in it.

When I run:

[[inputs.diskio]]
  interval = "1s"
  fieldpass = ["reads", "writes"]

[[aggregators.basicstats]]
  order = 1
  period = "2s"
  drop_original = true
  stats = ["rate"]

[[aggregators.basicstats]]
  order = 2
  interval = "10s"
  drop_original = true
  stats = ["max"]

[[outputs.file]]
  files = ["stdout"

I get:

diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=17 1614166828000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=53 1614166830000000000
diskio,host=X,name=nvme1n1p1 reads_max=18857956,writes_max=248039759 1614166830000000000
diskio,host=X,name=nvme1n1p1 writes_rate=53,reads_rate=0 1614166832000000000
diskio,host=X,name=nvme1n1p1 writes_rate=9,reads_rate=0 1614166834000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=4 1614166836000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=18 1614166838000000000
diskio,host=X,name=nvme1n1p1 writes_rate=104,reads_rate=0 1614166840000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=389 1614166842000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=389 1614166844000000000
diskio,host=X,name=nvme1n1p1 writes_rate=349,reads_rate=0 1614166846000000000
diskio,host=X,name=nvme1n1p1 writes_rate=333,reads_rate=0 1614166848000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=296 1614166850000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=444 1614166852000000000
diskio,host=X,name=nvme1n1p1 writes_rate=353,reads_rate=0 1614166854000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=364 1614166856000000000
diskio,host=X,name=nvme1n1p1 reads_rate=1,writes_rate=189 1614166858000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=0 1614166860000000000
diskio,host=X,name=nvme1n1p1 writes_max=248046577,reads_max=18857958 1614166860000000000
diskio,host=X,name=nvme1n1p1 writes_rate=23,reads_rate=0 1614166862000000000
diskio,host=X,name=nvme1n1p1 reads_rate=0,writes_rate=89 1614166864000000000
diskio,host=X,name=nvme1n1p1 writes_rate=3,reads_rate=0 1614166866000000000

Looking at:

Support ordering of processor plugins · influxdata/telegraf@b03c1d9 · GitHub
telegraf/config.go at 71a3a3cf20182c4537ce832bc7ca212870e7254f · influxdata/telegraf · GitHub and line 1181 and line 1137

it looks like order is only used by Processors and not Aggregators, the docs do mention it is used with Processor Plugins:

telegraf/blob/master/docs/CONFIGURATION.md#processor-plugins
.) order: The order in which the processor(s) are executed. If this is not specified then processor execution order will be random.

Yes indeed, I see that now as well… Yeah I think it will not work like this for now then :wink: Sorry.

I think, when this is solved, it can work that way: aggregators should not re-run processors · Issue #7993 · influxdata/telegraf · GitHub

1 Like