Telegraf config - getting substrings and converting hexadecimal to int

Hello,

I have incoming bluetooth beacon data as per the below (where the obj array will contain one or more entries depending on the number of beacons that are proximity to the gateway):

{"msg":"advData","gmac":"94A408B02508","obj":
  [
    {"type":32,"dmac":"BC57290123B5","data1":"0201060DFF530ABC57290123B51A5A0C63","rssi":-44,"time":"2022-10-17 05:48:05"}
  ]
}

I need to extract the gmac attribute and associate it with the dmac and rssi attributes for each entry in the obj array. The last part of my telegraf config is below which is correctly ingesting the data into a bucket in InfluxDBCloud:

## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json_v2"
  tagexclude = ["topic"]
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "a"
    timestamp_path = "obj.#.time"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "gmac"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "obj.#.dmac"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.rssi"
        data_type = "integer"

I also need to extract some values from the data1 attribute, where I need to get characters 13 and 14, and 15-16 and then convert them from hexadecimal to integer values and then store these as two separate fields in influxDB.

I started by trying to extract the data1 attribute and store this in influxDB, but I noticed the incoming data was now being split across two tables (I want each entry to have gmac, dmac, rssi, and data1 together):

## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json_v2"
  tagexclude = ["topic"]
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "a"
    timestamp_path = "obj.#.time"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "gmac"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "obj.#.dmac"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.rssi"
        data_type = "integer"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.data1"
        data_type = "string"

I also tried using the regex processor but found this wasn’t working as I couldn’t see the new batt field in influxDB:

## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json_v2"
  tagexclude = ["topic"]
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "a"
    timestamp_path = "obj.#.time"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "gmac"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "obj.#.dmac"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.rssi"
        data_type = "integer"
    [[processors.regex]]
        [[processors.regex.fields]]
        key = "obj.#.data1"
        pattern = ".{2}$"
        result_key = "batt"

Can anyone help me as I’m not sure how to properly combine multiple plugins in my telegraf config or why my first attempt saw data split across two tables?

Many thanks.

Processors work on all plugins unless you specify a tag or metric name to pass into them. They also run after an input plugin is run, so passing them the gjson syntax is not something they will recognize.

If I run only your one input I get a single metric:

a,dmac=BC57290123B5,gmac=94A408B02508,host=ryzen rssi=-44,data1="0201060DFF530ABC57290123B51A5A0C63" 1666101895000000000

I believe your next step is to parse data1 to get chars 13-14 and 15-16 and then convert them from hexadecimal to integer values?

I would do something like the following:

[[processors.starlark]]
  namepass = ["a"]

  source = '''
def apply(metric):
  data1 = metric.fields.pop('data1')
  metric.fields["data_13"] = int("0x" + data1[13:15], 0)
  metric.fields["data_15"] = int("0x" + data1[15:17], 0)
  return metric
'''

This is a starlark processor that grabs only metrics called “a”, rips out the data1 field, and converts some strings parts from hex to an int and saves them to the metric.

Give that a try!

Hi jpowers,

Many thanks for your help with this, I’m still getting to grips with Telegraf and understanding how to utilise the plugins correctly to achieve the desired result. I added what you suggested to my existing telegraf config which now looks as follows:

data_format = "json_v2"
  tagexclude = ["topic"]
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "a"
    timestamp_path = "obj.#.time"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "gmac"
        # g is gateway MAC address
        rename = "g"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "obj.#.dmac"
        # d is beacon MAC address
        rename = "d"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.rssi"
        type = "int"
        # r is RSSI of beacon
        rename = "r"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.data1"
        data_type = "string"

  [[processors.starlark]]
    namepass = ["a"]
    source = '''
def apply(metric):
    data1 = metric.fields.pop("data1")
    tempWhole = int("0x" + data1[26:28], 0)
    tempDecimal = int("0x" + data1[28:30], 0)
    tempDecimal = tempDecimal / 100
    # t is temperature of chip to two decimal points precision
    metric.fields["t"] = tempWhole + tempDecimal
    # b is battery level in mV
    metric.fields["b"] = int("0x" + data1[30:34], 0)

    return metric

'''

This is working as I need it to which is great :slight_smile: (I forgot to mention in my first post that in the data1 attribute, two bytes are used to represent the whole number and decimal component of the temperature respectively, with the last two bytes representing battery level in mV (e.g. 3000 mV).

One further question - is it possible to round my final value for t to two decimal places? From what I have read, starlark provides access to a math package but the round function returns the nearest integer.

Thanks again!

I actually don’t. As you said, round provides the nearest int.

@srebhan do you know of a way via a processor or starlark to return round to 2 decimal places?

Thanks jpowers, one other thing I noticed is that every so often the telegraf service will stop working with the following output:

2022-10-20T01:45:05Z E! [processors.starlark] Error in int: int: invalid literal with base 0: 0x
2022-10-20T01:45:05Z E! [processors.starlark] Error in plugin: int: invalid literal with base 0: 0x
2022-10-20T01:45:05Z E! [processors.starlark] Traceback (most recent call last):
2022-10-20T01:45:05Z E! [processors.starlark] :3:20: in apply

panic: negative refcount

goroutine 2457 [running]:
github.com/influxdata/telegraf/metric.(*trackingMetric).decr(0x14000e236c8?)
	github.com/influxdata/telegraf/metric/tracking.go:157 +0xd0
github.com/influxdata/telegraf/metric.(*trackingMetric).Accept(0x10c8a3660?)
	github.com/influxdata/telegraf/metric/tracking.go:142 +0x5c
github.com/influxdata/telegraf/models.(*Buffer).metricWritten(0x14000040c80, {0x109d0d490, 0x140004af5a8})
	github.com/influxdata/telegraf/models/buffer.go:98 +0x6c
github.com/influxdata/telegraf/models.(*Buffer).Accept(0x14000040c80, {0x14000e2e580?, 0x58, 0x58?})
	github.com/influxdata/telegraf/models/buffer.go:183 +0x12c
github.com/influxdata/telegraf/models.(*RunningOutput).Write(0x140000d2480)
	github.com/influxdata/telegraf/models/running_output.go:206 +0x194
github.com/influxdata/telegraf/agent.(*Agent).flushOnce.func1()
	github.com/influxdata/telegraf/agent/agent.go:837 +0x30
created by github.com/influxdata/telegraf/agent.(*Agent).flushOnce
	github.com/influxdata/telegraf/agent/agent.go:836 +0x98

I’m assuming that int() is receiving an inappropriate argument type but I’m not sure how this is possible given that the incoming data1 fields is always in the same format?

Oh! That string is in hex isn’t it? If so it is base 16, not 10!

Regarding rounding, the starlark math library provides a round() function, so two decimal places would be

out = round(in*100)/100.0

Does that help?

1 Like

Many thanks @jpowers , I’ve now amended my telegraf config as follows:

## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json_v2"
  tagexclude = ["topic"]
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "c"
    timestamp_path = "obj.#.time"
    timestamp_format = "unix"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "gmac"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "msg"
        data_type = "string"
    [[inputs.mqtt_consumer.json_v2.tag]]
        path = "obj.#.dmac"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.rssi"
        type = "int"
    [[inputs.mqtt_consumer.json_v2.field]]
        path = "obj.#.data1"
        data_type = "string"

  [[processors.starlark]]
    namepass = ["c"]
    source = '''
def apply(metric):
    # check
    dataType = metric.fields.pop("msg")

    # tags
    gateway = metric.tags.pop("gmac")
    beacon = metric.tags.pop("dmac")

    # fields
    rssi = metric.fields.pop("rssi")
    data1 = metric.fields.pop("data1")
    if dataType == "advData":

      # truncate gateway field (last 4 characters to save on space)
      tempgw = str(gateway[8:12])
      metric.tags["g"] = tempgw

      # truncate beacon field (last 4 characters to save on space)
      tempb = str(beacon[8:12])
      metric.tags["b"] = tempb

      # rssi field
      metric.fields["r"] = rssi

      # temperature field
      tempWhole = int(data1[26:28], 16)
      tempDecimal = int(data1[28:30], 16)
      tempDecimal = tempDecimal / 100
      # t is temperature of chip to two decimal points precision
      metric.fields["t"] = tempWhole + tempDecimal

      # battery field
      # b is battery level in mV
      metric.fields["b"] = int(data1[30:34], 16)

    return metric

'''

However, after leaving telegraf running for some time, I still sometimes get the following error and telegraf stops:

2022-10-21T06:30:01Z E! [processors.starlark] Error in plugin: int: invalid literal with base 16: 
2022-10-21T06:30:03Z E! [processors.starlark] Traceback (most recent call last):
2022-10-21T06:30:03Z E! [processors.starlark]   :26:22: in apply
2022-10-21T06:30:03Z E! [processors.starlark] Error in int: int: invalid literal with base 16: 
2022-10-21T06:30:03Z E! [processors.starlark] Error in plugin: int: invalid literal with base 16: 
panic: negative refcount

goroutine 32 [running]:
github.com/influxdata/telegraf/metric.(*trackingMetric).decr(0x109d200f8?)
	github.com/influxdata/telegraf/metric/tracking.go:157 +0xd0
github.com/influxdata/telegraf/metric.(*trackingMetric).Drop(0x14000cffa70?)
	github.com/influxdata/telegraf/metric/tracking.go:151 +0x1c
github.com/influxdata/telegraf/agent.(*Agent).runProcessors.func1(0x14000c8cd20)
	github.com/influxdata/telegraf/agent/agent.go:560 +0x138
created by github.com/influxdata/telegraf/agent.(*Agent).runProcessors
	github.com/influxdata/telegraf/agent/agent.go:553 +0x3c

Do you know why this might be happening?

Also, @srebhan how do I import the math package correctly for starlark within my telegraf config (I tried usual import statements as you would use in python but this didn’t seem to work).

Thanks again!

I think this can mean that you have other non hex characters or a space/punctuation in the string.

Thanks @jpowers, is there an established best practice for dealing with unexpected inputs within a telegraf config file?

In my case, all the incoming data will be from BLE beacons and expected to be highly standardised but is there something I can implement to deal with unexpected cases so that it doesn’t stop telegraf unexpectedly?

best practice for dealing with unexpected inputs within a telegraf config file?

What I would do is try to catch the error in the starlark processor. I would have it print out what I got to see what unexpected data you were getting. Then try to ignore that type of data when you get it.

I think you are using KG02 gateway ? Could we connect on the same ,I am experiencing the same issue when parsing the raw data adv