Telegraf merge plugin

I’m trying to use the “merge” plugin to collect stats together as each values are sent per messages.

From the following:

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=test tps_5xx=0 1604066460811000000
ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=test tps_total=0 1604066460811000000
ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=test kbps=0 1604066460811000000

I’m trying to get this result:
ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=test tps_5xx=0,tps_total=0,kbps=0 1604066460811000000

I’m using the following in my configuration:

[[aggregators.merge]]
period = “10s”

If I add “drop_original = true” then I never get any outputs.

Any feedback would be great.

Thanks, Steve

From the docs looks like “period” is not even supported in this aggregator, in fact, it does not merge point in a range, but point by key. (tags + timestamp)

Do you have other aggregators or filters in your configuration? can you share the whole config file?

I haven’t seen any example or any bug reports about this aggregator.

Here is the full configuration:

[agent]
interval = “10s”
debug = false
round_interval = true
collection_jitter = “2s”
flush_jitter = “5s”
metric_buffer_limit = 15000

[[inputs.influxdb_listener]]
service_address = “:8186”
read_timeout = “10s”
max_body_size = 0
database_tag = “db”

[[processors.starlark]]
source = ‘’’
def apply(metric):
metric.fields[str(metric.name)] = metric.fields[‘value’]
metric.name = ‘ds_stats’
metric.fields.pop(‘value’,None)
metric.tags.pop(‘host’,None)
return metric
‘’’

[[aggregators.merge]]
period = “10s”

[[outputs.file]]
files = [“stdout”]

Try to add/edit the following in your config

[agent]
  {...}
  precision = "1s"

{...}

## Clone existing data
[[processors.clone]]
  namepass = "ds_stats"
  name_override = "ds_stats_merge"

## aggreagte only the cloned data
[[aggregators.merge]]
  namepass = "ds_stats_merge"
  drop_original = false

Then have a look at the output file and look for the “ds_stats_merge” measurement… the data should be there.

Not sure this worked as intended. It created the following:

ds_stats,cachegroup=us-cdn1-edge-docker,cdn=cdn1,deliveryservice=cox-dev-csx-a ds_stats_merge=217 1604501129704000000
ds_stats,cachegroup=cdn1-atlanta,cdn=cdn1,deliveryservice=cox-dev-csx-a ds_stats_merge=0 1604501129704000000
ds_stats,cachegroup=all,cdn=cdn1,deliveryservice=cox-dev-csx-a,type=EDGE ds_stats_merge=22 1604501129704000000
ds_stats,cachegroup=all,cdn=cdn1,deliveryservice=cox-dev-csx-a,type=EDGE ds_stats_merge=0 1604501129704000000

I expected something entirely different tbh…
A full clone of the measurement, plus the merged lines…

If you remove the merge aggregator does it return a full copy of the measurement?

The current result of clone + merge does not make any sense… is not even valid line protocol… if the clone works properly, then maybe there is a problem with the merge and it might be useful to open an issue on GitHub and/or summon a developer here in this thread.

I’ll try something with static data tomorrow and see if I can replicate the issue

I’m getting back to this. Have you had time to look at it?

Steve

I replicated it now and it actually works for me
here is my sample configuration:

[agent]
  interval = "5s"
  flush_interval = "10s"

[[inputs.file]]
  files = ['C:\temp\SampleMetrics.txt']
  data_format = "influx"

[[outputs.file]]
  files = ['C:\temp\Output.txt']
  data_format = "influx"

## aggreagte only the cloned data
[[aggregators.merge]]
  namepass = "ds_stats_merge"
  drop_original = true

  [aggregators.merge.tags]
    CustomTag = "aggregated"

here are all the file I’ve used, config (reanamed to .txt due to upload limitation), input and outputs.

Output.txt (5.6 KB) SampleMetrics.txt (598 Bytes)
telegrafConfig.txt (393 Bytes)

Thanks for the example. Although I didn’t quite understand the namepass while it didn’t match (ds_stats vs ds_stats_merge).

After adding the “time” at the end and changing the plugin from file to tail, I couldn’t get any output from the merge.

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=prd tps_5xx=10 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=prd tps_total=12 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=prd kbps=15 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=tst tps_5xx=22 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=tst tps_total=24 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=tst kbps=28 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=dev tps_5xx=42 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=dev tps_total=46 1610483460000000000

ds_stats,cachegroup=total,cdn=cdn1,deliveryservice=dev kbps=44 1610483460000000000

I tried the same file and added more metrics and used the “tail” plugin instead. That didn’t work either.

Would there be something related to time this “merge” doesn’t like?

Steve

I’ve tried to add the time as well and there is no output, I have no about why.
I’ve also tried to use a static timestamp in the future and waited until that moment but still nothing.

You could consider opening an issue on Github

I think I’m about to figure this out. The merge aggregator needs to be real-time (like other aggregators). There is a grace window which I wasn’t “patient” enough to wait for.

I’ve got other errors to go through because the “starlark” processor goes twice and fails after the merge, it’s causing me some pain.

Here you go… That merges all the fields into a single document.

Output:

deliveryservice_stats,cachegroup=total,cdn=cdn1,deliveryservice=hls tps_5xx=0,tps_4xx=0,tps_3xx=0,tps_total=0,status_5xx=0,status_4xx=0,kbps=0,status_2xx=0,tps_2xx=0 1610494319189000000

Configuration:

[agent]
interval = “5s”
flush_jitter = “10s”
precision = “1s”
debug = true

[[inputs.influxdb_listener]]
service_address = “:8186”
read_timeout = “10s”
max_body_size = 0
database_tag = “db”

[[processors.starlark]]
namedrop = [‘deliveryservice_stats’]
source = ‘’’
def apply(metric):
metric.fields[str(metric.name)] = metric.fields[‘value’]
metric.name = metric.tags.pop(‘db’,None)
metric.fields.pop(‘value’,None)
metric.tags.pop(‘host’,None)
return metric
‘’’
[processors.starlark.tagpass]
deliveryservice = ["*"]

[[aggregators.merge]]
namepass = [‘deliveryservice_stats’]
drop_original = true
grace = “60s”

[[outputs.file]]
namepass = [‘deliveryservice_stats’]
files = [“stdout”]