Telegraf MQTT topic and String parsing

Hello everyone,

I am a newbie at Telegraf, as a matter of fact this is the first time I am trying to use it. Please be gentle…
Background: I have a Blue Iris server which holds several cameras. Each camera is configured to send a MQTT payload whenever an alert is triggered. The payload format is the same for each camera, generic payload format is listed below:

Camera=&CAM,Alert_Type=&TYPE,AI_Finding=&MEMO,Server_Name=&SERVER,Timestamp=&ALERT_TIME

Here’s a few examples of how that looks like, as a MQTT payload:

Camera=Garaj,Alert_Type=,AI_Finding=,Server_Name=Blue Iris,Timestamp=2024-08-13T22:23:32.542Z
Camera=Gate,Alert_Type=Group,AI_Finding=car:61%,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:44.502Z
Camera=Roof,Alert_Type=Group,AI_Finding=,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:42.073Z
Camera=Street4K,Alert_Type=Motion_A,AI_Finding=car:86%,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:42.072Z		

As such:
$CAM = Camera Name
&TYPE = Alert Type (‘Motion_A’, ‘Group’, ‘Motion_AB’, ‘Group, Motion_A’, etc.) - this also can be an empty string.
&MEMO = The alert’s AI findings, e.g., person:81%, car:92% - this part could also be an empty string if camera was triggered by a group trigger, but it didn’t recognize motion or object.
&SERVER = this is the server name. Blue Iris uses a variable to populate it. I don’t intend on having more than one server, but I would like this to be parsed anyway, to make it applicable to various situations.
“at” = standard string, never changes
&ALERT_TIME = The alert record’s creation time in ISO8601 format.

Note: I have control over the payload formatting, therefore if there is a better way to present these strings to Telegraf for processing, please let me know.

Scope:
I would like to parse these payloads (values represented by strings) via Telegraf in such a way that they make sense in InfluxDB. Ultimately, they would be aggregated in a Grafana dashboard. Both InfluxDB and Grafana are installed and working on my main server. I had tried MQTT as a data source in Grafana, but the data is obviously transient and that doesn’t help.

Status:

After reading documentation and politely asking for help from LLM chatbots, I ended up with the following configuration file:

[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Log at debug level.
  debug = true
  ## Log only error level messages.
  quiet = false

[[processors.regex]]
  namepass = ["mqtt_consumer"]

  [[processors.regex.fields]]
    key = "_value"
    pattern = "Camera=(?P<camera>[^,]+),Alert_Type=(?P<alert_type>[^,]*),AI_Finding=(?P<ai_finding>[^,]*),Server_Name=(?P<server>[^,]+),Timestamp=(?P<alert_time>[^,]+)"
    result_key = "parsed_fields"

[[processors.regex]]
  namepass = ["mqtt_consumer"]

  [[processors.regex.fields]]
    key = "parsed_fields"
    pattern = "Alert_Type=(?P<alert_type>[^,]*),AI_Finding=(?P<ai_finding>[^,]*)"
    result_key = "fields"

[[processors.regex]]
  namepass = ["mqtt_consumer"]

  [[processors.regex.fields]]
    key = "fields"
    pattern = "Alert_Type=(?P<alert_type>[a-zA-Z]*),AI_Finding=(?P<object_type>[a-zA-Z]*)(?::(?P<confidence>\\d+))?%?"
    result_key = "final_fields"

[[processors.converter]]
  [processors.converter.fields]
    string = ["camera", "alert_type", "object_type", "server", "alert_time"]
    float = ["confidence"]

  [processors.converter.tags]
    tagpass = ["alert_type", "confidence"]

[[inputs.mqtt_consumer]]
  client_id = "telegraf"
  username = "__redacted__"
  password = "__redacted__"
  servers = ["tcp://192.168.2.11:1883"]
  topics = ["BlueIris/telegraf/#"]
  qos = 0
  connection_timeout = "30s"
  persistent_session = false
  
  ## Data format to use for messages
  data_format = "value"
  data_type = "string"
  
[[processors.parser]]
  parse_fields = ["_value"]
  data_format = "value"

[[outputs.influxdb_v2]]
  urls = ["http://192.168.2.10:8086"]
  token = "_redacted__"
  organization = "Home"
  bucket = "MQTT_DEV"
  
 [[outputs.file]]  # Log output to file for debugging
  files = ["stdout", "/tmp/telegraf_parsed_output.log"]
  data_format = "json"
  

InfluxDB is populated, but with the raw content of the MQTT payload, no regex processing occurs.

Checking the output log from Telegraf processing yields the same result.

# tail -4 telegraf_parsed_output.log
{"fields":{"value":"Camera=Garaj,Alert_Type=,AI_Finding=,Server_Name=Blue Iris,Timestamp=2024-08-13T22:23:32.542Z"},"name":"mqtt_consumer","tags":{"host":"Tower","topic":"BlueIris/telegraf/Garage"},"timestamp":1723587812}
{"fields":{"value":"Camera=Roof,Alert_Type=Group,AI_Finding=,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:42.073Z"},"name":"mqtt_consumer","tags":{"host":"Tower","topic":"BlueIris/telegraf/Roof"},"timestamp":1723587881}
{"fields":{"value":"Camera=Street4K,Alert_Type=Motion_A,AI_Finding=car:86%,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:42.072Z"},"name":"mqtt_consumer","tags":{"host":"Tower","topic":"BlueIris/telegraf/Street4K"},"timestamp":1723587884}
{"fields":{"value":"Camera=Gate,Alert_Type=Group,AI_Finding=car:61%,Server_Name=Blue Iris,Timestamp=2024-08-13T22:24:44.502Z"},"name":"mqtt_consumer","tags":{"host":"Tower","topic":"BlueIris/telegraf/Gate"},"timestamp":1723587886}

I know something’s missing in my regex, most likely robust behavior for possible empty strings, but I am not able to fix it.
Any help would be much appreciated!

Solved within InfluxDB via processing.

In case anyone is interested, here’s how:

I changed the MQTT payload format in such a way that it sends the array separated by pipes (“|”). Blue Iris MQTT payload is in this format:
&CAM|&TYPE|&MEMO|&SERVER|&ALERT_TIME

Output is a string looking like this:

Court180|Motion_A|person:91%|Blue Iris|2024-08-14T08:51:04.286Z

Then, I processed the MQTT payload in InfluxDB directly:

//necessary libraries to import
import "strings"
import "experimental/array"
from(bucket: "MQTT_DEV1")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "mqtt_consumer")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["host"] == "Tower")
  |> filter(fn: (r) => r["topic"] == "BlueIris/telegraf/Court180" or r["topic"] == "BlueIris/telegraf/Garage" or r["topic"] == "BlueIris/telegraf/Gate" or r["topic"] == "BlueIris/telegraf/Roof" or r["topic"] == "BlueIris/telegraf/Street4K")
  |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
  //table formatting - columns for each variable after parsing string
  |> map(fn: (r) =>
  {
  parts = strings.split(v: r["_value"], t: "|")
  return
  {
  Camera: parts[0],
  Alert_Type: parts[1],
  AI_Finding: parts[2],
  Server_Name: parts[3],
  Timestamp: parts[4],
  }
}
)
  |> yield(name: "last")

Todo:

  1. The “AI_Finding” field sometimes contains an empty string (AI found nothing), sometimes contains one finding (“person:83%”), and sometimes contains multiple findings (“car:86%,person:77%”). I was able to use a Grafana transform to split into columns named “person” and “car”, but I’d rather do that at source to avoid garbage data propagation.
  2. I will need to add a measure field to express counts (persons per hour, cars per hour…)

Hi,

If you have control over the payload, then why not send proper JSON or some other proper format that could easily be parsed by Telegraf? See telegraf/docs/PARSING_DATA.md at master · influxdata/telegraf · GitHub

I don’t have total control over how data is generated, but I believe I can build a flat JSON payload. I’ll have to check whether Blue Iris MQTT engine is compatible with some special characters.

I’ll come back with an update :slight_smile:

1 Like

I seem to have hit a snag, and I think I know what causes it, but I’m not sure how to fix it.

The Blue Iris MQTT payload has been altered to this format:

{ "Camera": "&CAM", "Alert Type": &TYPE, "AI Findings": &MEMO, "Server": &SERVER, "Timestamp": "&ALERT_TIME"}

The actual MQTT payload, once variables are resolved internally, looks like this:

{ "Camera": "Gate", "Alert Type": "Motion_A", "AI Findings": "car:64%", "Server": "Blue Iris", "Timestamp": "2024-08-14T18:59:10.766Z"}

MQTT explorer shows new data going into “BlueIris/telegraf2”
This looks good, as far as I could tell. Of course, later on I will need some sort of processing for the “AI Findings” value, but one thing at a time.

And my config looks OK, as far as I could tell, however there is no output to InfluxDB.

[[inputs.mqtt_consumer]]
  client_id = "telegraf"
  username = "mosquitto"
  password = "********"
  servers = ["tcp://192.168.2.11:1883"]
  topics = ["BlueIris/telegraf2/#"]
  qos = 0
  connection_timeout = "30s"
  persistent_session = false
  
  ## Data format to use for messages
  data_format = "json"
  data_type = "string"

  tag_keys = ["Camera"]
  json_time_key = "Timestamp"
  json_time_format = "2006-01-02T15:04:05Z07:00"
  
  
#[[processors.parser]]
#  parse_fields = ["_value"]
#  data_format = "value"

[[outputs.influxdb_v2]]
  urls = ["http://192.168.2.10:8086"]
  token = "********"
  organization = "Home"
  bucket = "MQTT_DEV2"

Maybe there is something wrong with json_time_format value (it must match ISO8601 format), or something else that I just can’t see? I’m not sure what to do at this point.

Telegraf log output:

2024-08-14T18:57:23Z I! Loading config: /etc/telegraf/telegraf.conf
2024-08-14T18:57:23Z I! Starting Telegraf 1.31.3 brought to you by InfluxData the makers of InfluxDB
2024-08-14T18:57:23Z I! Available plugins: 234 inputs, 9 aggregators, 32 processors, 26 parsers, 60 outputs, 6 secret-stores
2024-08-14T18:57:23Z I! Loaded inputs: mqtt_consumer
2024-08-14T18:57:23Z I! Loaded aggregators: 
2024-08-14T18:57:23Z I! Loaded processors: 
2024-08-14T18:57:23Z I! Loaded secretstores: 
2024-08-14T18:57:23Z I! Loaded outputs: influxdb_v2
2024-08-14T18:57:23Z I! Tags enabled: host=Tower
2024-08-14T18:57:23Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"Tower", Flush Interval:10s
2024-08-14T18:57:23Z D! [agent] Initializing plugins
2024-08-14T18:57:23Z D! [agent] Connecting outputs
2024-08-14T18:57:23Z D! [agent] Attempting connection to [outputs.influxdb_v2]
2024-08-14T18:57:23Z D! [agent] Successfully connected to outputs.influxdb_v2
2024-08-14T18:57:23Z D! [agent] Starting service inputs
2024-08-14T18:57:23Z I! [inputs.mqtt_consumer] Connected [tcp://192.168.2.11:1883]
2024-08-14T18:57:33Z D! [outputs.influxdb_v2]  Buffer fullness: 0 / 10000 metrics
2024-08-14T18:57:43Z D! [outputs.influxdb_v2]  Buffer fullness: 0 / 10000 metrics

And then it keeps repeating the Buffer Fullness line.

I have also enabled writing of Telegraf output to a local log, there is no data being processed.
This means Telegraf, for some reason, doesn’t look into the MQTT topics.
Is this the culprit?

topics = ["BlueIris/telegraf2/#"]

The topics in MQTT explorer look like this:

BlueIris/telegraf2/Roof_Test
BlueIris/telegraf2/Street4K_TEST
BlueIris/telegraf2/Gate_TEST
BlueIris/telegraf2/Court180_Test
BlueIris/telegraf2/Garage_TEST

It doesn’t look like any data came in, as you said. You could always test this with the value data type to return the raw data. If the parser was not creating any metrics you would get a warning about no metrics created when the first messages came in.

One thing to note, is your JSON is all string data, so you will need to specify the fields using the json_string_fields = [] config option. See the json parser readme for more details.

Also if you enable client_trace = true in your mqtt_consumer config AND enable debug mode via the --debug CLI option or debug = true in the agent config, you will get a lot of debug information about the connection with the mqtt server. It is a nice way to see if messages are coming in or not.

Thank you for your help!

Debug was enabled, except the “client_trace = true” part.
I somehow missed this part from the documentation:

JSON strings and booleans are ignored unless specified in the tag_key or json_string_fields options.

Duh.
Data is now coming in.
My next step is figuring out how to further split the “AI Findings” field, which may be blank, may contain one pair (“car:73%”) or more than one pair (“car:79%,person:73%”).

Data is now coming in.

Awesome

My next step is figuring out how to further split the “AI Findings” field, which may be blank, may contain one pair (“car:73%”) or more than one pair (“car:79%,person:73%”).

Is the AI finding field going to be valid JSON or the key:value format you have here:

“AI Findings”: “car:64%”,

If it is valid JSON, then this is when I would look to the xpath parser or json_v2 parser. The original JSON parser is good with very basic data and data that is flat. If you have a full example we can take a look.

Is the AI finding field going to be valid JSON or the key:value format you have here:

That part is a string which I have no control over. It comes from a single variable internal to Blue Iris called &MEMO.

And it is a string variable which can be:

  1. Blank (empty string)
  2. A single AI finding (“car:64%”)
  3. Multiple AI findings, separated by commas such as (“car:85%,person:93%”) or (“car:85%, person:93%,motorcycle:56%”)

If you have a full example we can take a look.

I can provide examples of MQTT payloads in JSON format, if that’s what you need. Here’s a couple, see at the end how they are and how they should become.

If you only need examples of the actual string value of the “AI Findings” field, I have picked a few below:

""
"person:90%"
"car:96%"
"person:66%,bus:57%"
"car:73%,truck:53%,person:89%"
"car:94%,person:94%"
"person:66%,bus:57%"

I am looking for the following processing steps:

Step 1: In case of an empty string, ideally, the empty string should be replaced with the following string: "none:0%" - this is for data consistency, for further filtering (of alerts with no AI detection)
Step 2: If the AI findings field contains two or more findings, such as car:94%,person:94% should generate multiple rows, one containing car:94% and the second containing person:94%, all other JSON values for that row should remain identical.
Step 3: The processed entries, now all having a single value pair, should be split into two columns, named “AI Object Type” and “AI Confidence” respectively, while eliminating the “%” sign from the string. Therefore, the final flat JSON which replaces the JSON examples above should be transformed:

Here’s three relevant examples, first one with no AI findings, the second one with one AI finding and the third one with two AI findings. They should be processed as follows.

From:

{ "Camera": "Roof", "Alert Type": "Motion_A", "AI Findings": "", "Server": "Blue Iris", "Timestamp": "2024-08-15T16:42:03.277Z"}
{ "Camera": "Roof", "Alert Type": "Group,Motion_A", "AI Findings": "car:83%", "Server": "Blue Iris", "Timestamp": "2024-08-15T16:18:03.540Z"}
{ "Camera": "Gate", "Alert Type": "Motion_A", "AI Findings": "person:66%,bus:57%", "Server": "Blue Iris", "Timestamp": "2024-08-15T16:19:25.973Z"}

To:

{ "Camera": "Roof", "Alert Type": "Motion_A", "AI Object Type": "none", "AI Confidence": 0, "Server": "Blue Iris", "Timestamp": "2024-08-15T16:42:03.277Z"}
{ "Camera": "Roof", "Alert Type": "Group,Motion_A", "AI Object Type": "car", "AI Confidence": 83, "Server": "Blue Iris", "Timestamp": "2024-08-15T16:18:03.540Z"}
{ "Camera": "Gate", "Alert Type": "Motion_A", "AI Object Type": "person", "AI Confidence": 66, "Server": "Blue Iris", "Timestamp": "2024-08-15T16:19:25.973Z"}
{ "Camera": "Gate", "Alert Type": "Motion_A", "AI Object Type": "bus", "AI Confidence":  57, "Server": "Blue Iris", "Timestamp": "2024-08-15T16:19:25.973Z"}

Note that the three JSON entries have become four, with the third original JSON now being displayed as two, one for “person” and one for “bus”.

I would look at the starlark processor. You could pass the AI findings string to it, and write some logic to parse out the various scenarios, even setting the default value you need. I’d look at the split option and then create new fields based off of the results.

The starlark processor is a python-like scripting language: telegraf/plugins/processors/starlark at master · influxdata/telegraf · GitHub

There are some examples in there as well under testdata, and here is an intro guide: How to Use Starlark with Telegraf | InfluxData

The other option is to write your own code to parse these and use an exec processor.

Thank you.
I will now spend some time with reading and learning it, therefore I might not be able to update this topic for a while.
But I think I am on the right track now.
Thanks again for being patient with me!

1 Like