Multiple JSONs ingestion from Kinesis Data Stream

Hi there,

I am delivered measurement data into a Kinesis Data Stream. Payload of this stream is composed of several concatenated JSONs. An example (a payload contains hundreds of these):

{"tech_source": "S1", "project": "IoT_735_PL029", "asset_id": "IoT_735_PL029_ECP001_S1_TUR002", "WALM1.Alm.par2": "653", "WALM1.Alm.q": "0000000000000", "ts": "2020-09-29 10:17:58.458"}
{"tech_source": "S1", "project": "IoT_735_PL029", "asset_id": "IoT_735_PL029_ECP001_S1_TUR003", "WMET1.EnvTmp.mag.f": "-1.6938934326171875", "WMET1.EnvTmp.q": "0000000000000", "ts": "2020-09-29 10:17:58.461"}
{"tech_source": "S1", "project": "IoT_735_PL016", "asset_id": "IoT_735_PL016_ECP001_S1_TUR004", "WCNV1.GriHz.mag.f": "49.20040512084961", "WCNV1.GriHz.q": "0000000000000", "ts": "2020-09-29 10:17:58.461"}

Here is the part of my telegraf.conf file:

[[inputs.kinesis_consumer]]
  region = "eu-west-1"
  streamname = "mystreamname"
  data_format = "json"
  tag_keys = [
    "project",
    "asset_id",
    "tech_source"
  ]
  json_time_key = "ts"

I get this error when launching Telegraf.

E! [inputs.kinesis_consumer] Scan encountered an error: shard shardId-000000000052 error: invalid character '{' after top-level value

Any idea how I can workaround this?

Thanks,
madamak

Workaround was to use a lambda to convert the incoming messages and write them in another Kinesis Data Stream in Influx data format. Telegraf would then consume them and send to InfluxDB.

1 Like