Processor.parser embedded CSV in JSON

Hi all,

I’am trying to parse CSV data embedded in JSON file. Here is a sample :

{
    "T" : 1234,
    "csv" : "20200708010001,100\n20200708010002,99\n20200708010003,98\n20200708010004,97\n20200708010005,96"
}

with the following telegraf conf file :

[[outputs.file]]
   files = ["stdout"]
   data_format = "json"
   tagexclude = ["topic"]
   fielddrop = ["csv", "T"]

[[inputs.mqtt_consumer]]
   servers = ["${SERVER_MQTT}"]

   name_override = "elec"
   data_format = "json"

   topics = [
     "elec"
    ]

   tag_keys = ["T"
   ]

   json_string_fields = ["csv"]

[[processors.parser]]
   parse_fields = ["csv"]
   data_format = "csv"
   merge = "override"

   csv_column_names = ["time","value"]
   csv_column_types = ["string","int"]

   csv_timestamp_column = "time"
   csv_timestamp_format = "20060102150405"

Here is the output JSON string:

{"fields":{"value":96},"name":"elec","tags":{"T":"1234"},"timestamp":1598623798}

And the issues :

1/ It seems that only the last line of CSV data is emitted.
2/ The timestamp is the current timestamp not the CSV “time” field

BTW, I previously used the CSV parser (with raw CSV data, no JSON stuff) directly in the inputs.mqtt_consumer plugins without any problem.

Thanks for your help,
BR

@Bob_Morane - Welcome to the community forum!

You’ve got an interesting question here.

For your csv parser processor configuration, merge = "override" will actually merge across all the separate lines of the csv input. This is why you are only getting the very last value of 96 and why the csv timestamps are being ignored. So if remove that merge policy, you will get the result you want (hopefully).

Thanks philjb. That was it !
I though I already tested it …

Now I’am loosing all tags and fields from the original JSON (drop_original is true). Any clue ?

telegraf_1        | {"fields":{"value":100},"name":"elec","tags":{},"timestamp":1594170001}
telegraf_1        | {"fields":{"value":99},"name":"elec","tags":{},"timestamp":1594170002}
telegraf_1        | {"fields":{"value":98},"name":"elec","tags":{},"timestamp":1594170003}
telegraf_1        | {"fields":{"value":97},"name":"elec","tags":{},"timestamp":1594170004}
telegraf_1        | {"fields":{"value":96},"name":"elec","tags":{},"timestamp":1594170005}

I realy don’t know what is going on. Seems that the [[processors.parser]] is “eating” all tags and fields. Adding namepass = ["*"] do not change anything.

Even adding :

[[processors.template]]
  tag = "T"
  template = '{{ .Tag "T" }}'

gives :

telegraf_1        | {"fields":{"value":100},"name":"elec","tags":{"T":""},"timestamp":1594170001}
telegraf_1        | {"fields":{"value":99},"name":"elec","tags":{"T":""},"timestamp":1594170002}
telegraf_1        | {"fields":{"value":98},"name":"elec","tags":{"T":""},"timestamp":1594170003}
telegraf_1        | {"fields":{"value":97},"name":"elec","tags":{"T":""},"timestamp":1594170004}
telegraf_1        | {"fields":{"value":96},"name":"elec","tags":{"T":""},"timestamp":1594170005}

Am I missing something or is it a bug ?

Thanks

Unfortunately, I’m not sure you will be able to do what you want in this way. The tags are associated with a specific row of data (“metric”) and since you are not merging, you won’t be able to get the tags from the json metric into the csv parsed metrics. You’re specifically dropping the json fields with fielddrop = ["csv", "T"] at the end.

Any idea of how I have to proceed ?

I’m not familiar with MQTT. I would look to change its output message configuration so it is not embedding csv fields within json. Try to output the equivalent of one csv line per MQTT message.