I need some help with my telegraf configuration. Since I just caught the AI lying, I’m now turning to human experts :
In addition to the global configuration in /etc/telegraf/telegraf.conf, I currently have five individual configuration files in /etc/telegraf/telegraf.d/. Three of these are input plugins (inputs.mqtt_consumer) and each handle related groups of MQTT messages. I also have two output plugins, one writes to a file (outputs.file) and one to an InfluxDB (outputs.influxdb_v2). This applies equally to the total amount of data processed by the three input plugins.
Now I want to create a new processing branch: another MQTT Consumer input plugin should receive an additional group of MQTT messages, process them in a slightly more complex manner (e.g., pivot operations), and the result should end up in a separate file and not in the first-mentioned file nor in the database!
As far as I thought I understood the operating principle, all output plugins process the data from all input/aggregator/processor plugins equally?! How can one filter or branch the data to different output plugins, or is that even possible? In any case, I couldn’t find any filter options in the documentation for the output plugins I use.
To route metrics through specific processors and to outputs, you need to use tagdrop and tagpass. (metric-filtering).
It is up to you to decide on which one to use. Tagpass requires 1 match of the configured tags to run the processor or output. Tagdrop requires 1 match to drop the entire metric.
Additional tip: if you don’t want the routing tags to end up in your outputs, give them a prefix like “internal_” or “routing_” and use tagexclude on the outputs to remove tags with a prefix [“internale_*”].
thanks a lot!
I’ve now used tagpass in the output plugin configuration files to select which data is actually output to the respective destination based on a tag that the MQTT consumer input plugins produce anyway. That works pretty well.
Referring to your extra tip: how would one insert an additional tag into the data stream? And if I want to remove it again in the output plugin with tagexclude, will tagpass still work at this point? I’ve learned that tagpass must be at the end of the configuration and that tagexclude would then come before it? That seems a little strange to me.
Based on my experience the order is the following:
Tagpass
Tagdrop
Taginclude
Tagexclude
Meaning it can still route based on provided tags and then exclude them.
The order in the config file is not relavant to the order of operations. But the documentation mentions that you need to put, “maps” like tagpass en tagdrop need to be at the end (taginclude and exclude are array type configs). This is because of TOML limitations.
I also saw that you are going to do some processing. Something i did not think about at first is planning ahead in processing voor order. The numbers of the ordering don’t need to follow up on each other. So first define some ranges so you have room to add stuff later on. For example 0-9 for input specific, 10 to 19 for global processing, 20-29 for metric specific stuff.
I have a specific order for some processors to work best. For example an unpivot followed by a pivot (to split one metric with multiple fields to multiple metrics with a single field). Followed by a deadband function (that I want to do on a single field metrics).
Then I also have some that do bit extraction based on a template tag, these happen before the deadband processor (because I only want the changed bits).
Using metric filters and the “order” parameter of the processor plugins, I control the path of my data through the processing chain.
In my case, I do indeed still have a lot to do:
the data I’m looking at here, in addition to the data I’ve already processed, comes from a solar inverter. I receive exactly one value per MQTT message/topic. There is no timestamp in the MQTT payload, and all MQTT messages of a “transmission” come in quick succession as a burst (60 lines, if I counted correctly).
What I now need to do is:
1. Assign a uniform timestamp to all 60 related MQTT messages, i.e., all lines should be given the time at which telegraf receives the burst of MQTT messages.
2. All related metrics (values) should be packed into a single “line” (metric), which is then sent to InfluxDB2 and, if necessary, to an extra file.
The second task sounds somewhat like the Pivot plugin?! But unfortunately, I don’t yet know exactly how to accomplish it.
Since I get the impression that you have quite a lot of experience with complex processing chains with telegraf, may I ask you for a few tips on how I should proceed in principle?
1: Metrics are mostly timestamped on gathering unless the plugin has a special option for this (and even then the default will probably be gather). Looking at MQTT_Consumer using JSONv2 for example there is an option to provide a JSON-path to your timestamp.
2: Since all messages from a burst are separate metrics, you can’t use pivot. It is only possible to pivot on a single collected metric (with multiple fields). What you want to accomplish is sounded like an aggregation to me. And you are in luck, this is exactly what the “merge” aggregator does.
You can define a period at which you are sure that all messages are received once (aggregators clear all data once they publish, so it will not remember the previous value to repeat in the new one). If you want to log the origninal burst messages somewhere, you need to keep the original in the config and filter between original and aggregated based on a tag you add at the aggregator level.
If this is a bit much, feel free to ask some additional questions. Aggregators are a thing you need to play around with to fully grasp what they are doing to your metrics.
PS: turn off processors after aggregators at agent level to avoid double processing, unless you actually need it.
unfortunately, it has taken me quite some time to get back to this issue. Thank you very much for your comments! However, I’m afraid I still have a few questions, as my attempts so far have not been successful.
Let’s start with your second paragraph:
The reference to the “merge” aggregator plugin is very valuable! I also think that this does exactly what I need. I timestamped the incoming MQTT messages with this and took a closer look at them: There are three different blocks of MQTT messages, which are identified by identical parts of the topic (from the beginning of the topic) and are transmitted as bursts. The blocks/bursts arrive approximately every 5 seconds, meaning that each block repeats approximately every 15 seconds. The blocks have different numbers of lines/MQTT messages, so the time from the first to the last line of a block varies: approx. 20 ms/approx. 50 ms/approx. 180 ms. The challenge when using the “merge” plugin is therefore to configure the two time specifications correctly: I should probably set ‘period’ to be slightly shorter than the repeat interval, i.e. 10…13s, and then perhaps set “round_timestamp_to” to 1s??? Can I really be sure that all related lines will always be processed together? The rounding of the timestamp is causing me some headaches.
But before I can try that, I first have to get my inputs.mqtt_consumer configurations for this data right. And that’s where I’m failing at the moment.
In fact, timestamps are implicitly generated, as I can see from my tests, which actually seem to originate from the time when Telegraf receives the MQTT messages. However, this post made me a little uncertain. I haven’t tried generating the timestamp with “starlark” as recommended there yet.
Unfortunately, I don’t quite understand your reference to providing the timestamp using JSONv2. With my previous data, I had a JSON payload with a timestamp field, and I was able to get the time:
I can deal with that. But now I don’t have a JSON payload, just a single value per MQTT message, which means I don’t have a timestamp in the payload either!
First, I tried to create an inputs.mqtt_consumer configuration for the first block, consisting of 7 MQTT messages.
I have two problems with this:
firstly, three MQTT messages deliver an integer value as payload and the other four MQTT messages deliver a float value. But since I can only specify either
data_format = "value"
data_type = "integer"
or
data_format = "value"
data_type = "float"
, do I really have to create two complete, different configuration sections, one for each data type (integer, float)? That would be a lot of redundancy (broker address, username, password…) and doesn’t seem very elegant.
My second problem is that this way I cannot assign the last segment of the topic to the name of the value (field). Instead, the output is always “value=xxx”. Let’s assume that the input is:
In other words, a second value field with a pointless name.
Unfortunately, there are far too few examples of inputs.mqtt_consumer configurations on the internet. They are always the same ones, and some questions remain unanswered.
Also, I’m afraid I didn’t understand what you meant by your last sentence:
That was quite the read, hope I can give some quick answers.
I forgot to ask last time, but why do you actually need alignment of the bursts of data? Graphs can be visualized either way and aggregate windows will align them at query level.
But to continue, can you share the topics and payloads you want to merge? So I can have a better understanding of the data you are working with.
Telegraf uses the collection timestamp on metrics. Otherwise buffering multiple metrics of the same series until flushing to the output would not really make sense. Don’t worry about that post. You don’t need the external timestamping I think.
This I don’t understand, the mqtt consumer plugin page is pretty clear on both of these issues.
For the datatype problem, the mqtt_consumer provides [inputs.mqtt_consumer.topic_parsing.types] there you can specify how each field should be interpreted. Just configure the main datatype as float like the example.
For the other topic parsing, I don’t get where you came up with the idea to use the + symbol (probably from topic wildcards). Please take a look at this. All topic levels you don’t need in topic parsing are underscores.
Then for the value issue, scroll a bit further down to pivoting. There it states that the output will by default include the fieldname as tag and the field itself as value. But using pivot you can solve this.
I don’t think the example is 100% correct since I think pivot can only act on a single metric and turning 3 separate metrics into a single 3 field metric is a job for a merge aggregator. But the pivot will rectify the field name issue nevertheless.
Processors are generally run after inputs before they hit the aggregators. But if you want you can also process the output metrics of the aggregators as if they were new inputs. But since the data was already processes before hitting the aggregator, doing it again afterwards might corrupt your values/structure.
thanks for your comments! To keep things clear, I’ll now answer the individual questions in separate posts.
When I think about it in detail, you are absolutely right: for normal processing such as plotting time series, it’s completely irrelevant whether the individual MQTT messages are stored with timestamps that differ by a few milliseconds. The windowing during data querying already does this correctly in a practical sense. But I’d still like to try to transform the data as if all values of a block were arriving from the sensor in a single line. Even if it sounds like an academic problem.
It never hurts to learn new things.
Let’s take a look at the shortest of the three different blocks sent, as an example: it consists of only seven MQTT messages/lines. These contain integer and float values. Later, in the other blocks, there may also be strings as values/fields.
This is the output of mosquitto_sub -v, preceded by the reception timestamp. I have only replaced the German terms in the topic. Otherwise, it is real data.
2x power , yieldtotal , irradiation are float values, the rest is integer.
If I have a JSON payload, then everything is clear to me: I set data_format = “json_v2” and can then use the JSON parser to specify a type for each individual data field and continue working with it. I understand that.
If, on the other hand, as in the present case, I always have a single value per MQTT message (data_format = "value"), which can then have different data types (integer, float, string…) for the accepted topics, then I have to decide on one! I can’t specify different types with data_type = "xxx" directive! The logical consequence seems to me to be that I then have to create a separate [[inputs.mqtt_consumer]] section for each data type, i.e., a separate instance of the mqtt_consumer plugin, which in turn means that I have to repeat all the mandatory information in each of these sections (“servers”, “qos”, “username”, Password”…). I can’t process multiple fields with different data types in one step, as with a JSON payload?! And that is what I meant by “inelegant.”
Of course, I can read practically anything that is a “number” as a float, as you suggested. But I’d still like to explicitly specify the data type. Sorry for being so picky.
Yes, I came up with the idea because one can also “catch” the “measurement” name out of the topic string by placing a “+” at the specific position (measurement = “+/_/_/_”). I must have misunderstood something.
Okay, but that means I can only rename “value” to the desired value name derived from the topic using a downstream processor.pivot plugin, not directly in the input.mqtt_consumer plugin?!
Oh, I see. The logic behind the processing wasn’t entirely clear to me. Now I’m beginning to understand and can also see what consequences this can have.
In my case, I now need processors.pivot to set the correct name for the “value=” fields, then aggregators.merge to summarize data according to time. And after that, I might need another processor plugin. This could get complicated.
By the way: Was I correct in my last post (last Sunday) regarding the selection of the two time parameters for the aggregators.merge plugin?
What was meant by “at agent level” in “…turn off processors after aggregators at agent level…”?