Telegraf mqtt msg to influxdb filter

How to filter out odometer and storage field “angle” to influxdb

{

     "updated" : 1634607913.044073,
     "x":566753.985858001,
     "y":566753.9858582001,
     "odometer":[ 9.9,9.1,9.5],
     "angle":[0.35431448844127544, 0.09063859006636221]

}

Hi @loneWolf666,
If I understand you correctly you would like to remove odometer and angle from the metrics being written into InfluxDB. Here is how to do it with the JSON_V2 parsers:

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################
[[inputs.mqtt_consumer]]

  servers = ["tcp://192.168.1.220:1883"]
  topics = [
    "iotgateway",
  ]
  data_format = "json_v2"


 [[inputs.mqtt_consumer.json_v2]]
     measurement_name = "<insert>"
       [[inputs.mqtt_consumer.json_v2.object]]
        path = "@this"
        disable_prepend_keys = true
        excluded_keys = ["odometer", "angle"]

1 Like

Thank you very much, boss. I’m so excited and solved my confusion :100: :heart: :+1: :mechanical_arm: :muscle: But there’s another question, great God

how handler ”odometer“ as array or string into influxdb :pray:

Used before used python client into influxdb of array type but telegraf mqtt plug config ??? :pray:

hahaha, no worries. So an array is difficult like that since you have no timestamp value to associate with the points. Essentially each point would overwrite itself. You could do some post-processing to associate each point with a new time-stamp or field but I’m not sure what is what you are after. How did you process the array before in python and maybe we can work out an alternative in telegraf

1 Like

Thank you very much. I want to “ odometer” as one _field into influxdb :thinking: :pray:

Still use telegraf mqtt plug :thinking:

hi, jay_Clifford config updated filter success but array field Only the last field is written influxdb


:pray: :pray: :pray: :pray: :pray: :pray: :pray:

Could you try this?

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################
[[inputs.mqtt_consumer]]

  servers = ["tcp://192.168.1.220:1883"]
  topics = [
    "iotgateway",
  ]
  data_format = "json_v2"


 [[inputs.mqtt_consumer.json_v2]]
     measurement_name = "<insert>"
       [[inputs.mqtt_consumer.json_v2.object]]
        path = "@this"
        disable_prepend_keys = true
        excluded_keys = ["angle"]
            [[inputs.mqtt_consumer.json_v2.object.field]]
                path = "@this.odometer"
                type = "string" 

I am hoping this will convert the array to a string. Which will deliver the full array to InfluxDB

“ excluded_keys” take effect but I want to write influxdb “ odometer” field

HI, jay_Clifford:
Thank you very much for your prompt, Modified my telegraf configuration


###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################[[inputs.mqtt_consumer]]

  servers = ["tcp://172.17.0.2:1883"]
  topics = [
    "iotgateway",
  ]
  data_format = "json_v2"
  data_format = "json_v2"

  [[inputs.mqtt_consumer.json_v2]]
     measurement_name = "gateway_insert"
       [[inputs.mqtt_consumer.json_v2.object]]
        path = "@this"
        disable_prepend_keys = true
        excluded_keys = ["angle"]
            [[inputs.mqtt_consumer.json_v2.object.field]]
                path = "@this.odometer"
                type = "string" 


  [[processors.printer]]

###############################################################################
#                            INPUT  MQTT MSGS                             #
###############################################################################
{
        "y": 1111111.354243171,
        "x": 1111111.9858582001,
        "speed": [0.0, 1.0, 2.0, 4.0],
        "angle": [
            0.35431448844127544,
            0.09063859006636221,
            0.35431448844127544,
            0.09063859006636221,
            -0.08514534218357994, 0.35431448844127544, -0.2609292744335221, 0.1785305561913333
        ],
        "name": "A001",
        "left_behind_at": 1634607917.8445928,
        "odometer": [9.9,
                     9.1,
                     9.5,
                     6.6000000000000005,
                     9.9,
                     10.100000000000001,
                     9.5,
                     7.9],
        "distance": 0.0,
        "gy": 957004.5313795683,
        "current_throttle": 0.0,
        "points": [{"type": 0, "x": -27.771453857421875, "y": 7.960874557495117},
                   {"type": 0, "x": -26.048484802246094, "y": 7.958527565002441},
                   {"type": 16, "x": 14.540033340454102, "y": -25.705089569091797}
                  ],
        "time": str(datetime.datetime.utcnow().strftime("%Y-%m-%dT %H:%M%SZ"))
    }

influx UI display of written data:

![image|690x340](upload://4o7RXJlOKjEaN8j0rQLiv8emglt.png)


“odometer”  This array writes only the last element of the influxdb  :weary: :pray:
Is there a problem with my telegraf configuration format or pivot syntax problem ???? :pray: :bowing_woman: help me


influx ui data

Hi @loneWolf666,
I have left a comment on your GitHub issue. I agree that this feature should be added to retain array structure when required.

“ excluded_keys” take effect but I want to write influxdb “ odometer” field

Hi,@Jay_Clifford, good morning!
Thank you. Your support!
I hope it doesn’t burden your work :hear_no_evil:

telegraf json

  ## When strict is true and a JSON array is being parsed, all objects within the
  ## array must be valid
  json_strict = true

  ## Query is a GJSON path that specifies a specific chunk of JSON to be
  ## parsed, if not specified the whole document will be parsed.
  ##
  ## GJSON query paths are described here:
  ##   https://github.com/tidwall/gjson/tree/v1.3.0#path-syntax
  json_query = ""

###############  owner  json config ########################
  data_format = "json"

  ## When strict is true and a JSON array is being parsed, all objects within the array must be valid
  json_strict = false

  json_query = "@this.points"

  ## Tag keys is an array of keys that should be added as tags.  Matching keys
  ## are no longer saved as fields. Supports wildcard glob matching.
  tag_keys = ["name"]

  ## Array of glob pattern strings or booleans keys that should be added as string fields.
  json_string_fields = ["name"]

The parse switch on the array also has no effect :weary:

Hi @loneWolf666,
It doesn’t burden me at all :slight_smile:. These conversations allow us to make the product better and support more peoples use cases. You certainly won’t be the last to ask for this. I will raise this issue with our Telegraf meeting today and see what they think.

Will the new functions of telegraf data nested structure array filtering and storage update the latest docker image file?? :thinking:

So these new features have not been looked at by the team yet. Someone will have to look into the issue on GitHub first. Once this happens and the changes are agreed they will make it into a designated release. With each new release, the docker images are updated alongside the main binary.

Hi, @Jay_Clifford
Does it mean that it takes a long cycle to confirm the function development?
My side has urgent requirements for the storage of nested structure filtering and parsing original format. I wonder if we plan to connect with protobuf protocol :thinking:?
I understand that the collection plug-in should be universal. There must be a complex nested structure such as JSON. Don’t we have a scheme to filter the data of the nested structure and store the original structured data?

Hi @loneWolf666,
One of our main Telegraf contributors took a look at your problem and came up with this solution. It will store each array as a string within a field. If you would rather each array item be its own field then you remove the starlark plugin.

Well if you just say data_format = "json" without any other options you get (for the data in the thread linked above)

file,host=Hugin speed_0=0,odometer_2=9.5,points_2_y=-25.705089569091797,points_1_type=0,points_1_y=7.958527565002441,angle_1=0.09063859006636221,angle_2=0.35431448844127544,angle_5=0.35431448844127544,odometer_7=7.9,y=1111111.354243171,left_behind_at=1634607917.8445928,odometer_3=6.6000000000000005,odometer_4=9.9,odometer_5=10.100000000000001,points_2_type=16,odometer_1=9.1,distance=0,points_1_x=-26.048484802246094,speed_2=2,angle_6=-0.2609292744335221,angle_7=0.1785305561913333,odometer_6=9.5,speed_1=1,speed_3=4,angle_0=0.35431448844127544,angle_3=0.09063859006636221,angle_4=-0.08514534218357994,current_throttle=0,points_0_type=0,x=1111111.9858582001,odometer_0=9.9,gy=957004.5313795683,points_0_x=-27.771453857421875,points_0_y=7.960874557495117,points_2_x=14.540033340454102 1638445095000000000

as you can see it contains odometer_0 to odometer_7 . You can now use that metric in a starlark processor to concatenate those points to a string.

[[processors.starlark]]
  source = '''
def apply(metric):
  fields = ["angle", "odometer", "speed"]
  for f in fields:
    key_prefix = f+"_"
    key_len = len(key_prefix)
    keys = [x for x in metric.fields.keys() if x.startswith(key_prefix)]
    keys = sorted(keys, key=lambda x: int(x[key_len:]))
    values = [str(metric.fields.pop(k)) for k in keys]
    new_value = "["+', '.join(values)+"]"
    metric.fields[f] = new_value
  return metric
'''

resulting in

file,host=Hugin points_0_y=7.960874557495117,points_0_x=-27.771453857421875,points_1_type=0,points_1_x=-26.048484802246094,points_2_type=16,points_2_y=-25.705089569091797,x=1111111.9858582001,distance=0,points_0_type=0,points_1_y=7.958527565002441,y=1111111.354243171,left_behind_at=1634607917.8445928,gy=957004.5313795683,points_2_x=14.540033340454102,current_throttle=0,angle="[0.35431448844127544, 0.09063859006636221, 0.35431448844127544, 0.09063859006636221, -0.08514534218357994, 0.35431448844127544, -0.2609292744335221, 0.1785305561913333]",odometer="[9.9, 9.1, 9.5, 6.6000000000000005, 9.9, 10.100000000000001, 9.5, 7.9]",speed="[0.0, 1.0, 2.0, 4.0]" 1638446377000000000

Hi,@Jay_Clifford
Can I see the complete configuration, I found that I didn’t take effect

###################  telegraf config ###############################
  data_format = "json"

[[processors.starlark]]
  source = '''
def apply(metric):
  fields = ["points", "odometer"]
  for f in fields:
    key_prefix = f+"_"
    key_len = len(key_prefix)
    keys = [x for x in metric.fields.keys() if x.startswith(key_prefix)]
    keys = sorted(keys, key=lambda x: int(x[key_len:]))
    values = [str(metric.fields.pop(k)) for k in keys]
    new_value = "["+', '.join(values)+"]"
    metric.fields[f] = new_value
  return metric
'''

  [[processors.printer]]


########### error ####################
2021-12-03T07:16:49Z E! [processors.starlark] Traceback (most recent call last):
2021-12-03T07:16:49Z E! [processors.starlark]   processor.starlark:7:18: in apply
2021-12-03T07:16:49Z E! [processors.starlark]   <builtin>: in sorted
2021-12-03T07:16:49Z E! [processors.starlark]   processor.starlark:7:42: in lambda
2021-12-03T07:16:49Z E! [processors.starlark] Error in int: int: invalid literal with base 10: 0_type
2021-12-03T07:16:49Z E! [processors.starlark] Error in plugin: int: invalid literal with base 10: 0_type
panic: negative refcount

goroutine 77 [running]:
github.com/influxdata/telegraf/metric.(*trackingMetric).decr(0xc0008fff50)
	/go/src/github.com/influxdata/telegraf/metric/tracking.go:157 +0x90
github.com/influxdata/telegraf/metric.(*trackingMetric).Drop(0xc0008fff80)
	/go/src/github.com/influxdata/telegraf/metric/tracking.go:151 +0x19
github.com/influxdata/telegraf/agent.(*Agent).runProcessors.func1(0xc0008c9a40)
	/go/src/github.com/influxdata/telegraf/agent/agent.go:546 +0x16a
created by github.com/influxdata/telegraf/agent.(*Agent).runProcessors
	/go/src/github.com/influxdata/telegraf/agent/agent.go:539 +0x45

@loneWolf666
I have made a working example which you can find here: Telegraf-Community-Configs/telegraf-combine-array-to-string.conf at master · InfluxCommunity/Telegraf-Community-Configs · GitHub
:slight_smile: