Parse JSON from Flux Query

Hi all,

I have an InfluxDB bucket which stores JSON-formatted data in a field “Fehler”.

I read out the table by:

from(bucket: "Testdaten_1")
  |> range(start: -2d, stop: now() )
  |> filter(fn: (r) => r["Artikel"] == "Board-Typ 1")
  |> filter(fn: (r) => r["Status"] == "failed" or r["Status"] == "aborted")
  |> filter(fn: (r) => r["_field"] == "Fehler")
  |> keep(columns: ["_value"])

This returns following result:

table _value
0	{"Results":[{"MesswertNr":4,"Titel":"Spannung","MessschrittNr":2,"Min":30,"Max":50,"Actual":13},{"MesswertNr":5,"Titel":"Strom","MessschrittNr":2,"Min":65,"Max":100,"Actual":57}]}

Inspired by the example 2 from Anais’ blog entry, I have written a function which pipes in a table from a query, parse the JSON array and returns a corresponding table:

//returns table with columns "MesswertNr", "NessschrittNr", "Titel", "Min", "Max and "Actual" - values
//see: https://github.com/InfluxCommunity/JSON_Flux/blob/main/example2.Flux
getTableFromJSON = (jsonTable=<-) => jsonTable 
  |> map(fn: (r) => {
    jsonData = json.parse(data: bytes(v: r._value)) 
    // extract the list that we want to map across with array.map 
    listData = jsonData.Results
    // map across each complex type in the array named "Results" 
    errors = array.map(
      arr: listData,
      fn: (x) => ({
        "MessschrittNr": x.MessschrittNr,
        "MesswertNr": x.MesswertNr,
        "Titel": x.Titel,
        "Min": x.Min,
        "Max": x.Max,
        "Actual": x.Actual,
      })
    )
    // finally convert that flattened list into a table with array.from 
    return array.from(rows: errors)
}
)

Piping the query into the function

from(bucket: "Testdaten_1")
  |> range(start: -2d, stop: now() )
  |> filter(fn: (r) => r["Artikel"] == "Board-Typ 1")
  |> filter(fn: (r) => r["Status"] == "failed" or r["Status"] == "aborted")
  |> filter(fn: (r) => r["_field"] == "Fehler")
  |> keep(columns: ["_value"])
  |> getTableFromJSON()

returns

 error calling function "getTableFromJSON" @38:6-38:24: error calling function "map" @10:6-29:2: name {"x" ""} does not exist in scope

The part 10:6-29:2 is the map()-function in my function getTableFromJSON ():

  |> map(fn: (r) => {
    jsonData = json.parse(data: bytes(v: r._value)) 
    // extract the list that we want to map across with array.map 
    listData = jsonData.Results
    // map across each complex type in the array named "Results" 
    errors = array.map(
      arr: listData,
      fn: (x) => ({
        "MessschrittNr": x.MessschrittNr,
        "MesswertNr": x.MesswertNr,
        "Titel": x.Titel,
        "Min": x.Min,
        "Max": x.Max,
        "Actual": x.Actual,
      })
    )
    // finally convert that flattened list into a table with array.from 
    return array.from(rows: errors)
}
)

and 38:6-38:24 is the pipe to the function in the flux query

  |> getTableFromJSON()

What is going wrong here?

Is there another way to parse JSON from flux queries?

thx and regards

Idk, let me try it too :slight_smile: Trying it right now.

Hello @domsch,
I see that the following works for me:

import "array"
import "experimental/json"

myJSON = "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}"

jsonData = json.parse(data: bytes(v: myJSON))
 listData = jsonData.Results

// array.from(rows: [{_value: display(v:listData)}])

 errors = array.map(
      arr: listData,
      fn: (x) => ({
        "MessschrittNr": x.MessschrittNr,
        "MesswertNr": x.MesswertNr,
        "Titel": x.Titel,
        "Min": x.Min,
        "Max": x.Max,
        "Actual": x.Actual,
      })
    )

array.from(rows: errors)

I think the issue here is you need a way to name each table stream for each table that’s produced from array.from(). The map function must return an object, not a stream.

Can you share two rows of your original data? Berfore you apply your function in annotated csv please?

I’m not sure how to solve this.
@scott do you have any ideas?

Here’s an example script to work with:

import "array"
import "experimental/json"

array.from(rows: [{_value: "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}"}, {_value: "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}"}])


 |> map(fn: (r) => {
    jsonData = json.parse(data: bytes(v: r._value)) 
    // extract the list that we want to map across with array.map 
    listData = jsonData.Results
    // map across each complex type in the array named "Results" 
//     errors = array.map(
//       arr: listData,
//       fn: (x) => ({
//         "MessschrittNr": x.MessschrittNr,
//         "MesswertNr": x.MesswertNr,
//         "Titel": x.Titel,
//         "Min": x.Min,
//         "Max": x.Max,
//         "Actual": x.Actual,
//       })
//     )
// 
//     final =  array.from(rows: errors) |> group() 
    // finally convert that flattened list into a table with array.from 
    return array.from(rows:[{value: display(v: listData)}]) |> group() 
}

@domsch @Anaisdg

The map function must return an object, not a stream.

:point_up: This is the issue in your original attempt. I used the sample data you provided in your original post and updated the getTableFromJSON() definition. You actually don’t need to iterate over the array at all. You can just use dot and bracket notation (essentially JSONPath) to reference the JSON data:

import "array"
import "experimental/json"

data =
    array.from(
        rows:
            [
                {
                    _value:
                        "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}",
                },
            ],
    )

getTableFromJSON = (jsonTable=<-) =>
    jsonTable
        |> map(
            fn: (r) => {
                jsonData = json.parse(data: bytes(v: r._value))
                listData = jsonData.Results[0]

                return {
                    "MessschrittNr": listData.MessschrittNr,
                    "MesswertNr": listData.MesswertNr,
                    "Titel": listData.Titel,
                    "Min": listData.Min,
                    "Max": listData.Max,
                    "Actual": listData.Actual,
                }
            },
        )

data |> getTableFromJSON()

This returns the following table:

Actual Max MessschrittNr MesswertNr Min Titel
13 50 2 4 30 Spannung
1 Like

Or you can also do the following but I like @scott’s solution better:

import "array"
import ejson "experimental/json"
import "json"


data = array.from(rows: [{_value: "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}"}, {_value: "{\"Results\":[{\"MesswertNr\":4,\"Titel\":\"Spannung\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},{\"MesswertNr\":5,\"Titel\":\"Strom\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}]}"}])
// data |> yield(name: "raw")

items = data
   |> findColumn(
        fn: (key) => true,
        column: "_value",
    )
// array.from(rows:[{value: display(v: items)}]) |> yield(name: "items")
parsed = items |> array.map(fn: (x) => ejson.parse(data: bytes(v: x)))
// array.from(rows:[{value: string(v: json.encode(v: {"combined": parsed}))}]) |> yield(name:"parsed")
listData = parsed[0].Results 
    errors = array.map(
      arr: listData,
      fn: (x) => ({
        "MessschrittNr": x.MessschrittNr,
        "MesswertNr": x.MesswertNr,
        "Titel": x.Titel,
        "Min": x.Min,
        "Max": x.Max,
        "Actual": x.Actual,
      })
    )
array.from(rows: errors) 
|> yield(name: "final")

Hi @Anaisdg and @scott , many thanks for your support.

Why not?

This returns only the first index of the Results[]-Array, but i need all elements of them.

In @Anaisdg 's solution, both Results[]-rows will be returned, but only the first _value, and this one twice:

import "array"
import ejson "experimental/json"
import "json"


data = array.from(rows: [
{_value: 
"{\"Results\":[
  {\"MesswertNr\":4,\"Titel\":\"Spannung4\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":5,\"Titel\":\"Strom5\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}"
}, 

{_value: 
"{\"Results\":[
  {\"MesswertNr\":14,\"Titel\":\"Spannung14\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":15,\"Titel\":\"Strom15\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}"
}
])
// data |> yield(name: "raw")

items = data
   |> findColumn(
        fn: (key) => true,
        column: "_value",
    )
// array.from(rows:[{value: display(v: items)}]) |> yield(name: "items")
parsed = items |> array.map(fn: (x) => ejson.parse(data: bytes(v: x)))
// array.from(rows:[{value: string(v: json.encode(v: {"combined": parsed}))}]) |> yield(name:"parsed")
listData = parsed[0].Results 
    errors = array.map(
      arr: listData,
      fn: (x) => ({
        "MessschrittNr": x.MessschrittNr,
        "MesswertNr": x.MesswertNr,
        "Titel": x.Titel,
        "Min": x.Min,
        "Max": x.Max,
        "Actual": x.Actual,
      })
    )
array.from(rows: errors) 
|> yield(name: "final")
table final Actual no groupdouble Max no groupdouble MessschrittNr no groupdouble MesswertNr no groupdouble Min no groupdouble Titel no groupstring
0 13 50 2 4 30 Spannung4
0 57 100 2 5 65 Strom5
table _result Actual no groupdouble Max no groupdouble MessschrittNr no groupdouble MesswertNr no groupdouble Min no groupdouble Titel no groupstring
0 13 50 2 4 30 Spannung4
0 57 100 2 5 65 Strom5

Hi @scott

Can you give me another tip on how to iterate the Result-Array?

THX

@domsch This is a pretty hacky way to accomplish what you’re hoping for, but it works. You essentially need to merge all the JSON values into a single value. I do this with reduce() and a few other functions. You then extract the merged value and parse it to Flux types, then output a table:

import "array"
import "experimental/json"
import "regexp"
import "strings"

data =
    array.from(
        rows:
            [
                {
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":4,\"Titel\":\"Spannung4\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":5,\"Titel\":\"Strom5\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}",
                },
                {
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":14,\"Titel\":\"Spannung14\",\"MessschrittNr\":2,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":15,\"Titel\":\"Strom15\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}",
                },
            ],
    )

merged =
    data
        |> group()
        |> reduce(
            identity: {_value: ""},
            fn: (r, accumulator) => {
                trimmed = regexp.replaceAllString(v: r._value, r: /\{"Results":\[\s|\s]\}/, t: "")

                return {_value: accumulator._value + ",
" + trimmed}
            },
        )
        |> map(
            fn: (r) => {
                list = strings.trimPrefix(prefix: ",
", v: r._value)
                asArray = "[${list}]"

                return {r with _value: asArray}
            },
        )
        |> findColumn(column: "_value", fn: (key) => true)

array.from(rows: json.parse(data: bytes(v: merged[0])))

This returns:

MesswertNr Titel MessschrittNr Min Max Actual
4 Spannung4 2 30 50 13
5 Strom5 2 65 100 57
14 Spannung14 2 30 50 13
15 Strom15 2 65 100 57

It’s not a very clean way of doing it and it will not scale well, but, with your data as it is, it should work.

Hello @scott

This works as expected! Perfect :+1:

Assuming that the input stream not only contains the JSON data in the _value column, but also further information like a product name and a task number:

data =
    array.from(
        rows:
            [
                { Name: "Product 1",
                  Task: 1,
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":4,\"Titel\":\"Spannung4\",\"MessschrittNr\":1,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":5,\"Titel\":\"Strom5\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}",
                },
                { Name: "Product 2",
                  Task: 5,
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":14,\"Titel\":\"Spannung14\",\"MessschrittNr\":1,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":15,\"Titel\":\"Strom15\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57},
  {\"MesswertNr\":16,\"Titel\":\"Strom16\",\"MessschrittNr\":3,\"Min\":65,\"Max\":100,\"Actual\":40}
 ]}",
                },
            ],
    )

How we can inlcude this in the resulting table?

Product Task MesswertNr Titel MessschrittNr Min Max Actual
Product 1 1 4 Spannung4 1 30 50 13
Product 1 1 5 Strom5 2 65 100 57
Product 2 5 14 Spannung14 1 30 50 13
Product 2 5 15 Strom15 2 65 100 57
Product 2 5 16 Strom16 3 65 100 40

@domsch To do that, you’ll need to map those columns/values into the JSON. While totally possible, this begs the question–why not just write the data to InfluxDB differently? This seems like a pretty combersome workaround.

import "array"
import "experimental/json"
import "regexp"
import "strings"

data =
    array.from(
        rows:
            [
                {
                    Name: "Product 1",
                    Task: 1,
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":4,\"Titel\":\"Spannung4\",\"MessschrittNr\":1,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":5,\"Titel\":\"Strom5\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57}
 ]}",
                },
                {
                    Name: "Product 2",
                    Task: 5,
                    _value:
                        "{\"Results\":[
  {\"MesswertNr\":14,\"Titel\":\"Spannung14\",\"MessschrittNr\":1,\"Min\":30,\"Max\":50,\"Actual\":13},
  {\"MesswertNr\":15,\"Titel\":\"Strom15\",\"MessschrittNr\":2,\"Min\":65,\"Max\":100,\"Actual\":57},
  {\"MesswertNr\":16,\"Titel\":\"Strom16\",\"MessschrittNr\":3,\"Min\":65,\"Max\":100,\"Actual\":40}
 ]}",
                },
            ],
    )

merged =
    data
        |> group()
        |> map(
            fn: (r) =>
                ({r with _value:
                        regexp.replaceAllString(
                            v: r._value,
                            r: /\s\{/,
                            t: "{\"Name\": \"${r.Name}\",\"Task\": ${r.Task},",
                        ),
                }),
        )
        |> reduce(
            identity: {_value: ""},
            fn: (r, accumulator) => {
                trimmed = regexp.replaceAllString(v: r._value, r: /\{"Results":\[\s|\s]\}/, t: "")

                return {_value: accumulator._value + ",
" + trimmed}
            },
        )
        |> map(
            fn: (r) => {
                list = strings.trimPrefix(prefix: ",
", v: r._value)
                asArray = "[${list}]"

                return {r with _value: asArray}
            },
        )
        |> findColumn(column: "_value", fn: (key) => true)

array.from(rows: json.parse(data: bytes(v: merged[0])))

Hello @scott

I had to add some more steps to handle the empty Result-array and the regex for JSON strings with and without whitespaces. This is the final flux script:

getTableFromJSON2 = (jsonTable=<-) => {
  merged =
    jsonTable
        |> group()
        |> map( fn: (r) => {

          wert =
          if r._value == "{\"Results\":[]}" then
            r._value
          else
            regexp.replaceAllString( v: r._value, r: /\[\s*\{/, t: "[{\"Name\":\"${r.Name}\",\"Task\":${r.Task},")

          return { r with "_value": wert }
          }
        )
        |> map( fn: (r) => {
          
          wert = 
          if r._value == "{\"Results\":[]}" then
            r._value
          else
            regexp.replaceAllString( v: r._value, r: /,\s*\{/, t: ",{\"Name\":\"${r.Name}\",\"Task\":${r.Task},")

          return { r with "_value": wert }
          }
        )
        |> reduce(
            identity: {_value: ""},
            fn: (r, accumulator) => {
                trimmed = 
                  if r._value == "{\"Results\":[]}" 
                  then ""
                  else regexp.replaceAllString(v: r._value, r: /\{"Results":\[\s*|\s*]\}/, t: "")

                return {_value: accumulator._value + "," + trimmed}
            },
        )
        |> map(
            fn: (r) => {
                list = strings.trimPrefix(prefix: ",", v: r._value)
                asArray = "[${list}]"

                return {r with _value: asArray}
            },
        )
        |> map( fn: (r) => ({
            r with "_value": regexp.replaceAllString( v: r._value, r: /,{2,}/, t: ",")
          })
          )
        |> findColumn(column: "_value", fn: (key) => true)

  resultstable = array.from(rows: json.parse(data: bytes(v: merged[0])))

  return resultstable
}

Because the data can vary between different systems, the json principle was an easy way to keep the database independent of the incoming data types.

But it shows that this script does not perform so well in grafana and displaying the data takes a few seconds even for a smaller amout of data (<1000 entries). There your question is certainly right, why not write the data differently and so maybe also achieve a performace profit from it.
So I’m going to start from scratch, but it’s working well enough for now.

Many thanks for your support!

@domsch I don’t know how you’re currently ingesting data, but you may give the Telegraf JSON parser plugin a look. They may allow you to take the JSON as you’re currently collecting it, process it into line protocol, and write to InfluxDB.