Telegraf mqtt json into influxdb type abnormal

######### into json msg ######################
{
“y”: 1111111.354243171,
“x”: 1111111.9858582001,
“speed”: [0.0, 1.0, 2.0, 4.0],
“angle”: [
0.35431448844127544, 0.09063859006636221,
0.35431448844127544, 0.09063859006636221,
-0.08514534218357994, 0.35431448844127544,
-0.2609292744335221, 0.1785305561913333
],
“name”: “A001”,
“left_behind_at”: 1634607917.8445928,
“odometer”: [9.9, 9.1, 9.5, 6.6000000000000005, 9.9, 10.100000000000001, 9.5, 7.9],
“distance”: 0.0,
“gy”: 957004.5313795683,
“current_throttle”: 0.0,
“points”: [{“type”: 0, “x”: -27.771453857421875, “y”: 7.960874557495117},
{“type”: 0, “x”: -26.048484802246094, “y”: 7.958527565002441},
{“type”: 16, “x”: 14.540033340454102, “y”: -25.705089569091797}
],
“time”: str(datetime.datetime.utcnow().strftime("%Y-%m-%dT %H:%M%SZ"))
}

############### telegraf config #########################
[[inputs.mqtt_consumer]]
data_format = “json_v2”
[[inputs.mqtt_consumer.json_v2]]
measurement_name = “vehicle_status”
[[inputs.file.json_v2.tag]]
path = “name”
rename = “vehicle”
[[inputs.mqtt_consumer.json_v2.object]]
path = “@this
disable_prepend_keys = false
excluded_keys = [“points”]
[[inputs.mqtt_consumer.json_v2.object.field]]
path = “speed.0”
rename = “speed_1”
type = “float”
############# objective ############
filter “points” field
“odometer” into influxdb array type
“speed” field parser as follows :
field speed_1, speed_2,speed_3,speed_4

But Store results “speed”, “odometer” array type field
There is only the last value

pivot after

:pray: :pray: :pray: :pray: :thinking: :thinking: :thinking: :thinking:

Hello @loneWolf666,
Can you please share your results before the pivot? and your full flux query? Thank you.

                from(bucket: "vehicle_status")
                |> range(start: {start}, stop: {end})
                |> filter(fn: (r) => r["_measurement"] == "*****")
                |> pivot(
                  rowKey:["_time"],
                  columnKey: ["_field"],
                  valueColumn: "_value"
                )

Hi,@Anaisdg
Can you answer my question,
If multiple servers use the same configuration, each of the same indicators collected by the servers will have the same measurement, and the data will overlap each other

Thanks @loneWolf666,
Can you please share your results before the pivot?
It appears that you’re only querying for one timestamp which is why you get one row.

I don’t see any question about multiple servers in your original post. You can include the host so that you avoid overlap.

I don’t quite understand what you mean. Is it convenient to describe it again

The following configuration:

It’s no bother at all. I was wondering if you can include a screenshot of what your data looks like before you apply the pivot function.

Thank you for your reply, pivot The operation is preceded by regular column processing,I wonder if I answered your question!

Hi,

“odometer” into influxdb array type

Telegraf will produce output in Influx Line Protocol, which does not have an array type. As a result, you are limited to those data types.

There is only the last value

Yes because of the above reason.

Next, looking at the json the time field isn’t valid JSON from what I see. However, I ignored that and looked at the data you are passing in:

{
  "y": 1111111.354243171,
  "x": 1111111.9858582001,
  "speed": [
    0,
    1,
    2,
    4
  ],
  "angle": [
    0.35431448844127544,
    0.09063859006636221,
    0.35431448844127544,
    0.09063859006636221,
    -0.08514534218357994,
    0.35431448844127544,
    -0.2609292744335221,
    0.1785305561913333
  ],
  "name": "A001",
  "left_behind_at": 1634607917.8445928,
  "odometer": [
    9.9,
    9.1,
    9.5,
    6.6000000000000005,
    9.9,
    10.100000000000001,
    9.5,
    7.9
  ],
  "distance": 0,
  "gy": 957004.5313795683,
  "current_throttle": 0,
  "points": [
    {
      "type": 0,
      "x": -27.771453857421875,
      "y": 7.960874557495117
    },
    {
      "type": 0,
      "x": -26.048484802246094,
      "y": 7.958527565002441
    },
    {
      "type": 16,
      "x": 14.540033340454102,
      "y": -25.705089569091797
    }
  ]
}

Based on this what do you expect your data in InfluxDB to look like? Which fields are important to capture? Please do take a look at the Line Protocol doc above as well.

Hi,@jpowers
Thank you for your support!& Merry Christmas to you :santa:

###############  mqtt josn  msg  #####################
{
  "y": 1111111.354243171,
  "x": 1111111.9858582001,
  "speed": [
    0,
    1,
    2,
    4
  ],
  "angle": [
    0.35431448844127544,
    0.09063859006636221,
    0.35431448844127544,
    0.09063859006636221,
    -0.08514534218357994,
    0.35431448844127544,
    -0.2609292744335221,
    0.1785305561913333
  ],
  "name": "A001",
  "left_behind_at": 1634607917.8445928,
  "odometer": [
    9.9,
    9.1,
    9.5,
    6.6000000000000005,
    9.9,
    10.100000000000001,
    9.5,
    7.9
  ],
  "distance": 0,
  "gy": 957004.5313795683,
  "current_throttle": 0,
  "points": [
    {
      "type": ”line“,
      "x": -27.771453857421875,
      "y": 7.960874557495117
    },
    {
      "type":”line“,
      "x": -26.048484802246094,
      "y": 7.958527565002441
    },
    {
      "type": ”curve“,
      "x": 14.540033340454102,
      "y": -25.705089569091797
    }
  ]
}

################## update ###################

I changed the ”points“  ”type“   int => string 

################## update ###################

#############  target ###################
The above data is stored in the same data bucket of incluxdb, and the odometer field is filtered out. The angle field is stored in incluxdb as an array type field, and each hash element under the points field is stored as an object hash structure. Can it be implemented?
################### telegraf.conf -- json  #############
[[outputs.influxdb_v2]]
namepass = ["vehicle_state"]
  ## The URLs of the InfluxDB cluster nodes.
  urls = ["http://172.60.10.9:8086"]

  ## Token for authentication.
  token = "ASzi0c38b0_3H-mwsO8tlE_Wwovtlxu1Mvw39tyjKKj2zvsR611VolXfsxfoc4FzCKu-55cSSJ2G75w4ltLyKA=="

  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "trunkport"

  ## Destination bucket to write into.
  bucket = "vehicle_cmd"


###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a seperate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  ## servers = ["tcp://172.29.60.10:1883"]
  servers = ["tcp://172.29.60.9:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "vdcs/vehicle-cmd/+"
  ]

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"

  tag_keys = ["name"]

  json_string_fields = ["points_type"] 
 
  name_override = "vehicle_cmd"

######################## Problem ################
json_string_fields : Resolution does not take effect

Influxdb has no field storage of string type :weary: :weary: :weary: :weary: :pray:

So I use Josn_ V2 parsing

######################## telegraf. conf  json_v2 ################

###############################################################################
#                            INPUT PLUGINS                                    #
###############################################################################
[[inputs.mqtt_consumer]]
  ## Broker URLs for the MQTT server or cluster.  To connect to multiple
  ## clusters or standalone servers, use a seperate plugin instance.
  ##   example: servers = ["tcp://localhost:1883"]
  ##            servers = ["ssl://localhost:1883"]
  ##            servers = ["ws://localhost:1883"]
  ## servers = ["tcp://172.29.60.10:1883"]
  servers = ["tcp://172.29.60.9:1883"]

  ## Topics that will be subscribed to.
  topics = [
    "vdcs/vehicle-cmd/+"
  ]

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json_v2"
    [[inputs.mqtt_consumer.json_v2]]
        measurement_name = "vehicle_cmd"
        [[inputs.file.json_v2.tag]]
            path = "name"
        [[inputs.mqtt_consumer.json_v2.field]]
            path = "@this.points"
            rename = "points_list"
        [[inputs.mqtt_consumer.json_v2.field]]
            path = "@this.points.type"
            type = "string"

############################################################################

But it's still not the effect I want :weary: :weary: :weary: :weary: