Telegraf deadband for polling plugin using starlark

Hello,

When using an input plugin that is polling based, I would like to implement a deadband on all fields within a metric. In opcua_listener this is part of the protocol, but for other polling based protocols like s7comm or modbus it is not.
Using the dedup processor, you are already able to eliminate a lot of data. But this only works effectively on whole numbers, because there you essentially have deadband of 1.
When polling floating point values this does not work effectively because it these values are very unstable at high precision. I would like to implement the same functionality and preferably have the deadband stored in a tag of the metric. The best solution would be to add it to the dedup function so you are able to define a maximum gap in time and a minimal difference in value, with the time gap as leading property.

The only solution that I came across was rounding the value with a starlark script but this does not feel practical. I created the following script based on the compare_metrics example. But this only works when logging a single metric, because the variable state[“last”] does not have context of the different metrics passed through the function.

Does anybody have a solution for the described deadband feature or a solution to the startlark script?

state = {
  "last": None
}

def apply(metric):
    last = state["last"]
    
    if last != None:
        for k, v in metric.fields.items():
			if abs(last.fields[k] - metric.fields[k]) < int(metric.tags["deadband"]):
				metric.fields.pop(k)
			else:	
				state["last"] = deepcopy(metric)
    else:
        state["last"] = deepcopy(metric)

    return metric

Which plugin are you using for the Starlark script? It seems to me that something like this should be possible with telegraf/plugins/aggregators/starlark at master · influxdata/telegraf · GitHub? The deadband logic can be either in the add or push function.

At the moment I am using the starlark processor and not the aggregator you are suggesting.

Had a look at the min_max.star example and already picked up some useful starlark manipulations, such as the groupID function (that I also already figured out something in my own code but this one is better).

I still don’t quite understand when the add/push/reset is being called though.
With the processor it is kind of straight forward, it just gets called every time there is new data.

I tried actually implementing it in the Starlark aggregator, which was far from successful. It will also not cover your case of multiple metrics, I think. To be clear, you distinguish between metrics by measurement name?

1 Like

After some more trial and error i came up with this script.
It processes every metric that is collected and stores the value of their fields.
The next time a metric with the same hashed ID comes along the saved value is compared with the current value and depending on the deadband:

  • Exceeded: let the field pass and save the new value
  • Not Exceeded: overwrite the fieldvalue with the saved value

This way there still is a contiguous stream of polled metrics and because the value stays the same when the deadband is not exceeded, the dedup function can filter these values out.

Telegraf config:

[[processors.starlark]]
  order = 1
  script = "./deadband.star"
   
[[processors.dedup]]
  order = 2
  dedup_interval = "60s"

deadband.star:

# State contains an object with all metric (hashed) and fields
# {
#   groupID_1: {
#     Field1: 5,
#     Field2: 5,
#   },
#   groupID_2: {
#     ...
#   },
# }
state = {}

def apply(metric):
  if metric.tags.get("deadband") == None: return metric
  
  gId = groupID(metric)
  
  for k, v in metric.fields.items():
    #check for existance of metric field combination
    if state.get(str(gId) + "." + k) == None:
      state[str(gId) + "." + k] = v
    else:
      if abs(state[str(gId) + "." + k] - metric.fields[k]) < float(metric.tags["deadband"]):
        #push out stored value to be captured by dedup
        metric.fields[k] = state[str(gId) + "." + k]
      else:	
        #store value when deadband passed
        state[str(gId) + "." + k] = v
  return metric
  
def groupID(metric):
  key = metric.name
  for k, v in metric.tags.items():
    key = key + "|" + k + "-" + v
  return hash(key)

Can you verify my script to see if it fits your use case as well?
The only thing I might need to add is a check for datatype.

2 Likes

Nice work! I have some minor recommendations, see below.

You can skip metrics without the deadband tag with:

[[processors.starlark]]
  script = "./deadband.star"

  [processors.starlark.tagpass]
    deadband = ["*"]

You can avoid the dedup processor by adding a flag for the metric return. I’ve added some logging to make it clear what happens, feel free to remove this.

# State contains an object with all metric (hashed) and fields
# {
#   groupID_1: {
#     Field1: 5,
#     Field2: 5,
#   },
#   groupID_2: {
#     ...
#   },
# }

state = {}
load("logging.star", "log")


def apply(metric):
    log.debug("metric received")
    gId = groupID(metric)
    returnMetric = False

    for k, v in metric.fields.items():
        # check for existance of metric field combination
        if state.get(str(gId) + "." + k) == None:
            log.debug("new metric")
            state[str(gId) + "." + k] = v
            returnMetric = True
        else:
            if abs(state[str(gId) + "." + k] - metric.fields[k]) >= float(
                metric.tags["deadband"]
            ):
                log.debug("field value outside deadband")
                state[str(gId) + "." + k] = v
                returnMetric = True
                break
            else:
                log.debug("field value inside deadband")

    if returnMetric:
        return metric


def groupID(metric):
    key = metric.name
    for k, v in metric.tags.items():
        key = key + "|" + k + "-" + v
    return hash(key)

1 Like

Tanks for taking a look at the script. I also discovered the log function after my previous post, verry useful indeed.

I added the deadband tag handler just in case, so it is not left to chance in the main processor config.

As for the dedup, in my first draft i just popped the field if there was no change outside the deadband. But the dedup does have the feature that it keeps a metric every x amount of time, but you have to feed it something, thats why i keep sending the stored value.
This saves a lot of work writing code that has to keep track of the time.

Do you think it’s worthwhile to open an issue or pull request at the Telegraf repository to add this Starlark script to the telegraf/plugins/processors/starlark/testdata at master · influxdata/telegraf · GitHub folder?

1 Like

Yes, this will be valuable to many people when I look at the time I have spent on this :smile:
Same goes for the replication topic.
Will do some more tweaking coming week and write up some documentation/comments.