Predefine fields type - How to avoid type mismatch when not using line protocol


For interoperability reasons, I’m transporting messages using MQTT in Json format, in the format of

  "name": "measurement",
  "fields": {
    "somefields": 1
  "tags": {
    "sometag": "somevalue"
  timestamp: 4123412341234

And then I use telegraf to ingest in influxDB, using starlake plugin for json parsing.
The problem I have is that at first measurement post, influxdb is creating the field and assigning a type by guessing based on the data received, hence if there is decimal it’s a float, if not it’s an integer.
So I the data source dealt with data without decimal for the first sending, but after that there is decimal and it adds something after the coma, it’s too late, influxdb has created the fields and type is integer, metrics import will fail.

My question, what is the recommendation to avoid this kind of issues ? Is it possible to send some templates to influxDB so it can creates measurements with field with wanted types (same way we do with standard RDB) ?


This would not be strictly necessary, as there is already a json parser in Telegraf.
There is the json parser and the newer json_v2 parser, which also allows an explicit type conversion.
Furthermore there is the possibility in Starlark to do an explicit type conversion:

# pseudo code:
metric.fields[key] = int(value)
metric.fields[key] = float(value)

If it does not work, please show us real json data, the Telegraf config and the Starlark script.

Thank you for your answer
Here is a real word json

  "name": "state",
  "fields": {
    "running": false,
    "shiftOperatingTime": 21629,
    "shiftDowntime": 7171,
    "OEEperformance": 0,
    "OEEavailability": 0.7510069444444445,
    "OEEquality": 1,
    "OEE": 0
  "tags": {
    "area": "MPTEAM",
    "site": "MEUDON",
    "equipment": "MX004"
  "timestamp": 1639655875836000000

This is just a sample there are many more different measurements

I didn’t find with the json standard parser how to identify what is a tag and what is a field.
Your solution is interesting but it means I need to setup the json parser with hundreds of different values

Here is the starlake

def parseStdMsg(metric):
    j = json.decode(metric.fields.get("value"))
    for field in j["fields"].keys():
        metric.fields[field] = j["fields"][field]
    for tag in j["tags"].keys():
        metric.tags[tag] = j["tags"][tag] = j["name"]
    metric.time = j["timestamp"]

    return metric

Just to prevent misunderstandings:
Are the keys (for example running, shiftOperatingTime etc.) below the fields and tags brackets always the same for each json payload or can they change arbitrarily?

This is the point, all measurement content can be predefined (ie. a state will contain running, OEE, …), but the messages can add the fields or not depending on what needs to be updated.

I don’t know how different your data actually is, but this might be a generic solution that might work. I have extended your Starlark script a bit to include explicit type conversions.

[[inputs.file]] # only for testing
  files = ["predefined.json"]
  data_format = "value"
  data_type = "string"

  source = '''
load("", "json")

def apply(metric):
    j = json.decode(metric.fields.get("value"))

    for field in j["fields"].keys():
        value = j["fields"][field]
        if type(value) == "bool":
            metric.fields[field] = bool(value)
        elif type(value) == "string":
            metric.fields[field] = str(value)
        elif type(value) == "int":
            if "time" in field.lower():
                # we convert only keys with time in its name to int:
                metric.fields[field] = int(value)
                # all other keys are treated as float:
                metric.fields[field] = float(value)
        elif type(value) == "float":
            metric.fields[field] = float(value)

    for tag in j["tags"].keys():
        metric.tags[tag] = j["tags"][tag] = j["name"]
    metric.time = j["timestamp"]

    return metric

[[outputs.file]] # only for debugging purposes
  files = ["predefined.out"]
  influx_sort_fields = true

Thks a lot this is very interesting. I was considering using json_v2 and declare all the tags manually…
I will try your solution. What are the drawbacks of having all numeric data as float in influxdb ?

This would also be a possibility, but if there are hundreds of keys, a rather tedious matter. :smirk:

I think this would have no particular disadvantages. But i don’t know the internals of InfluxDB.
Depends on what you want to do with the data, though. For example, if there are states behind the numbers, then an int would make more sense than a float?