Merge not working with pivot

I’m using Telegraf 1.27.2-alpine Docker container to connect to a Juniper network device. I was initially receiving data like this:

{
  "fields": {
    "component/properties/property/state/value": 56030424
  },
  "name": "memory",
  "tags": {
    "/components/component/properties/property/name": "mem-util-packet-dma-bytes-allocated",
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125760
}
{
  "fields": {
    "component/properties/property/state/value": 49
  },
  "name": "memory",
  "tags": {
    "/components/component/properties/property/name": "mem-util-packet-dma-utilization",
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125760
}

I then applied the pivot processor to map the “…/property/name” to “…/property/value” which then gives me data like this:

{
  "fields": {
    "mem-util-kernel-fpb-bytes-allocated": 56
  },
  "name": "memory",
  "tags": {
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125850
}
{
  "fields": {
    "mem-util-kernel-fpb-allocations": 4
  },
  "name": "memory",
  "tags": {
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125850
}

Now I would like to use the merge aggregator to merge the fields together so it should look like this:

{
  "fields": {
    "mem-util-kernel-fpb-bytes-allocated": 56,
    "mem-util-kernel-fpb-allocations": 4
  },
  "name": "memory",
  "tags": {
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125850
}

However, merge is not working.
This is what my Telegraf conf looks like:

[global_tags]
[agent]
  round_interval = true
  metric_batch_size = 1000
  metric_buffer_limit = 150000
  flush_interval = "10s"
  debug = true
  quiet = false
  hostname = "$containerName-telegraf-agent"
  omit_hostname = false

[[inputs.gnmi]]
  addresses = ["device.mgt.net:50051"]
  username = "$user"
  password = "$password"
  redial = "10s"
  enable_tls = true
  tls_ca = "/etc/telegraf/router_ca.pem"
  insecure_skip_verify = true

  tagexclude = ["path"]

  [inputs.gnmi.aliases]
     memory = "/components"

  [[inputs.gnmi.subscription]]
    name = "memory"
    origin = "openconfig"
    path = "/junos/system/linecard/cpu/memory"
    subscription_mode = "sample"
    sample_interval = "10s"
  

# Rotate a single valued metric into a multi field metric
[[processors.pivot]]
  namepass = ["memory"]
  ## Tag to use for naming the new field.
  tag_key = "/components/component/properties/property/name"
  ## Field to use as the value of the new field.
  value_key = "component/properties/property/state/value"

# Perform some field and tag name changes  
[[processors.rename]]
  [[processors.rename.replace]]
    field = "name"
    dest = "field-name"
  [[processors.rename.replace]]
    tag = "name"
    dest = "tag-name"


# Set timestamp to nano second format so merge processor doesn't throw errors.
[[processors.starlark]]
namepass = ["qmon", "routingEngine", "memory"]
  source = '''
load('time.star', 'time')
def apply(metric):
  metric.time = time.now().unix_nano
  return metric
'''


# Aggreate fields for following sensors
[[aggregators.merge]]
  namepass = ["memory"]
  drop_original = true
  grace = "10s"


[[outputs.file]]
  ## Files to write to, "stdout" is a specially handled file.
  files = ["stdout"]
  data_format = "json"

Not sure why merge isn’t merging the events together.

Hello @mohsin106,
What are you getting instead?
@jpowers does anything stick out to you? Thank you.

When you say not working can you elaborate a bit? As in, not merging the items at all? No metrics show up at all?

With the somewhat strange behavior of aggregators you should get additional output via --debug or debug=true in your config. Do you see metrics in the ranges it lists?

Thanks!

My apologies, I am getting metrics returned but they are not being aggregated into one event. With the merge aggregator enabled as listed above I still get documents like this:

{
  "fields": {
    "mem-util-kernel-fpb-bytes-allocated": 56
  },
  "name": "memory",
  "tags": {
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125850
}
{
  "fields": {
    "mem-util-kernel-fpb-allocations": 4
  },
  "name": "memory",
  "tags": {
    "host": "telegraf-agent",
    "source": "device.mgt.net",
    "tag-name": "FPC0:CPU0"
  },
  "timestamp": 1689125850
}

I tried the following config:

[agent]
  debug = true

[[outputs.file]]
 data_format = "influx"

[[inputs.exec]]
  commands = ["echo memory,host=telegraf-agent,source=device.mgt.net,tag-name=FPC0:CPU0 mem-util-kernel-fpb-allocated=56"]
  data_format = "influx"

[[inputs.exec]]
  commands = ["echo memory,host=telegraf-agent,source=device.mgt.net,tag-name=FPC0:CPU0 mem-util-kernel-fpb-allocations=4"]
  data_format = "influx"

[[aggregators.merge]]
  drop_original = true

And as expected I got three metrics every 30 seconds (collect every 10 seconds, aggregate over 30):

2023-07-12T21:37:50Z D! [outputs.file] Buffer fullness: 0 / 10000 metrics
2023-07-12T21:38:00Z D! [aggregators.merge] Updated aggregation range [2023-07-12 15:38:00 -0600 MDT, 2023-07-12 15:38:30 -0600 MDT]
memory,host=telegraf-agent,source=device.mgt.net,tag-name=FPC0:CPU0 mem-util-kernel-fpb-allocations=4,mem-util-kernel-fpb-allocated=56 1689197850000000000
memory,host=telegraf-agent,source=device.mgt.net,tag-name=FPC0:CPU0 mem-util-kernel-fpb-allocations=4,mem-util-kernel-fpb-allocated=56 1689197860000000000
memory,host=telegraf-agent,source=device.mgt.net,tag-name=FPC0:CPU0 mem-util-kernel-fpb-allocations=4,mem-util-kernel-fpb-allocated=56 1689197870000000000
2023-07-12T21:38:00Z D! [outputs.file] Wrote batch of 3 metrics in 36.55µs

I wondered if you were hitting either an issue with processors because they are run again after a processor. There was a good intention in doing so, but it causes a lot of issues. As soon as a I add a starlark processor like you have I stop seeing the combined metrics.

The reason why I added the Starlark processors was because I was getting this error message from the merge aggregator:

2023-07-13T11:21:20Z D! [aggregators.merge] Metric is outside aggregation window; discarding. 2023-07-13 10:51:49.19 +0000 UTC: m: 2023-07-13 11:21:00 +0000 UTC e: 2023-07-13 11:21:30 +0000 UTC g: 10s
2023-07-13T11:21:20Z D! [aggregators.merge] Metric is outside aggregation window; discarding. 2023-07-13 10:51:49.19 +0000 UTC: m: 2023-07-13 11:21:00 +0000 UTC e: 2023-07-13 11:21:30 +0000 UTC g: 10s

When I disabled merge and checked the timestamp, I discovered I was receiving data from 30 min ago. To overcome that I added the Starlark processor to update the timestamp to the current timestamp. But when enabled Starlark I get into the position that I’m currently in.

Any way around this? Is it possible to tell merge to allow the merge with timestamps greater than 30 min?

The aggregators work on recent time ranges while telegraf is running, otherwise you could get into some very unexpected behavior with recent vs slight-not-so recent data.

You could modify the starlark processor to only run if both fields are not present in the metrics, for example:

  if "mem-util-kernel-fpb-allocations" not in metric.fields and "mem-util-kernel-fpb-allocated" not in metric.fields:
    metric.time = time.now().unix_nano

That seemed to work. However, I don’t know if you have a much larger metric that you are actually using and that would become unwieldy.

I was able to confirm that the time on the network device was off by 30 min. I had it corrected and now the data is coming back realtime. I no longer see those merge aggregator errors, however, that data is still not being merged.

Debug output does not show any errors. If I run your config where you’re echoing the data set then that works for me as well.

Not sure why merge is not working when the data is coming from the device.

Here is what the line protocol output of the data set looks like from the container output:

memory,device=device.mgt.net,host=telegraf-agent,interface-name=FPC0:CPU0 mem-util-kernel-utilization=28i 1689257788768000000
memory,device=device.mgt.net,host=telegraf-agent,interface-name=FPC0:CPU0 mem-util-dma-utilization=6i 1689257788768000000
memory,device=device.mgt.net,host=telegraf-agent,interface-name=FPC0:CPU0 mem-util-pkt-dma-desc-utilization=81i 1689257788768000000
memory,device=device.mgt.net,host=telegraf-agent,interface-name=FPC0:CPU0 mem-util-bcm-sdk-utilization=58i 1689257788768000000
memory,device=device.mgt.net,host=telegraf-agent,interface-name=FPC0:CPU0 mem-util-packet-dma-utilization=49i 1689257788768000000

I think I got it. Just added order = 1

[[processors.pivot]]
  order = 1
  namepass = ["memory"]
  ## Tag to use for naming the new field.
  tag_key = "property-name"
  ## Field to use as the value of the new field.
  value_key = "property-value"

Can I use order = 1 for multiple processors that I want to be executed first?

Can I use order = 1 for multiple processors that I want to be executed first?

I would highly suggest setting the order to unique values on each and set the value on all processors.

@jpowers I’m testing the routing-engine sensor and trying to test merge as you did above, but getting a different error.

This is what my Telegarf conf:

[agent]
  debug = true

[[outputs.file]]
 data_format = "influx"

[[inputs.exec]]
    commands = ["echo routingEngine,device=device.mgt.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 state=OnlineMaster"]
    data_format = "influx"

[[inputs.exec]]
    commands = ["echo routingEngine,device=device.mgt.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 mastership-state=Master"]
    data_format = "influx"

[[aggregators.merge]]
  drop_original = true

This is the error I’m getting:

2023-07-17T18:40:00Z E! [inputs.exec] Error in plugin: metric parse error: expected field at 1:153: "routingEngine,device=nep6rpaj01.mgt.cox.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 mastership-state=Master"
2023-07-17T18:40:00Z E! [inputs.exec] Error in plugin: metric parse error: expected field at 1:142: "routingEngine,device=nep6rpaj01.mgt.cox.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 state=OnlineMaster"

Is there something in the data set that is causing this error?

You can have string fields, but you need to quote them. For example:

commands = ['echo routingEngine state=\"OnlineMaster\"']

Will produce:

routingEngine,host=ryzen state="OnlineMaster" 1689620266000000000
1 Like

I just noticed if I replace the value for the fields state and mastership-state with an integer then the merge works.

This works:

commands = ["echo routingEngine,device=device.mgt.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 state=100"]
commands = ["echo routingEngine,device=device.mgt.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 mastership-state=200"]

Output:

routingEngine,device=nep6rpaj01.mgt.cox.net,host=lab-qfx6gnmi-deployment-5598c576bf-j9q4v-telegraf-agent,routing-engine=RoutingEngine0 state=100,mastershipState=200 1689620300000000000

@jpowers So, I’m not sure why I’m not able to get merge to work when I receive data from the device but it works when I use your config and pass the data via shell commands.

This is the data coming back from the device when I implement the pivot processor:

routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 state="Online Master" 1689789216186397796
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 mastership-state="Master" 1689789216186527155
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 temperature="36" 1689789216186692113
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 temperature-cpu="36" 1689789216186800824
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 cpu-utilization-user="0" 1689789216186923927
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 cpu-utilization-background="0" 1689789216187029978
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 cpu-utilization-kernel="1" 1689789216187135302
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 cpu-utilization-interrupt="0" 1689789216187240533
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 cpu-utilization-idle="98" 1689789216187368385
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 memory-dram-used="655" 1689789216187477680
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 memory-dram-installed="4096" 1689789216187582731
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 memory-utilization-buffer="16" 1689789216187706428
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 uptime="8979797" 1689789216187812676
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 reboot-reason="0x4000:VJUNOS reboot" 1689789216187918438
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 fru-model-number="QFX5110-48S-AFO" 1689789216188027664
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 firmware-rev="rev:0.0.0" 1689789216188155119
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 manufacture-date="date:2017-04-19" 1689789216188272325
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 temperature-back-to-normal="TRUE" 1689789216188400006
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 over-temperature="FALSE" 1689789216188506719
routingEngine,host=lab-qfx6gnmi-deployment-5598c576bf-x6p2l,path=/components/component/properties/property,source=device1.mgt.net,tag-name=Routing\ Engine0 fru-failed="FALSE" 1689789216188610992

This is my Telegraf conf:

[global_tags]
[agent]
  debug = true

[[inputs.gnmi]]
  addresses = ["device1.mgt.net:50051"]
  username = "$user"
  password = "$password"
  encoding = "proto"
  redial = "10s"
  enable_tls = true
  tls_ca = "/etc/telegraf/router_ca.pem"
  insecure_skip_verify = false
  fielddrop = ["property/state/configurable"]

  [[inputs.gnmi.subscription]]
    name = "routingEngine"
    origin = "openconfig-interfaces"
    path = "/components/component[name=Routing Engine0]/properties/"
    subscription_mode = "sample"
    sample_interval = "60s"
    

# Rotate a single valued metric into a multi field metric
[[processors.pivot]]
  namepass = ["routingEngine"]
  ## Tag to use for naming the new field.
  tag_key = "/components/component/properties/property/name"
  ## Field to use as the value of the new field.
  value_key = "property/state/value"

# Rename the "name" field to something else as well as the "name" tag to something else
[[processors.rename]]
  [[processors.rename.replace]]
    field = "name"
    dest = "field-name"
  [[processors.rename.replace]]
    tag = "name"
    dest = "tag-name"

# Aggreate
[[aggregators.merge]]
  drop_original = true

[[outputs.file]]
  files = ["stdout"]
  data_format = "influx"

However, the data is not being merged. I don’t understand why it’s not working when retrieving data directly from the device.

Use this plugin when fields are split over multiple metrics, with the same measurement, tag set and timestamp.

Your data does not have the same timestamp.

Hmm…if I convert the timestamps from the first two log entries (1689789216186397796 and 1689789216186527155) they both convert to (Wednesday, July 19, 2023 1:53:36.186 PM). Even the last timestamp (1689789216188610992) is converting to the same timestamp when using the online Epoch converter.

At the millisecond level they are, but at the microsecond and nanosecond level they are not.

Is it possible to have Telegraf log everything at the millisecond level?

There is a flow in how timestamps are set on metrics. In this case, the GNMI input sets timestamps based on the GNMI response itself.

The input-level precision option may cut off the excess values, so I would try adding precision = "1s" to your [[inputs.gnmi]] and see if that produces metrics with the same timestamp.