There there an equivalent to the logstash translate filter in telegraf?


#1

Hello,

I am looking to migrate from logstash to telegraf. I have a logstash configuration that uses a translate filter. The translate filter is a general search and replace tool that uses a configured hash and/or a file to determine replacement values. Each dictionary item is a key value pair.

Does anyone know if there a way to implement this capability in telegraf?

Thank you,


#2

I looked over the Logstash docs briefly and it looks like these types of tasks are handled in Telegraf with a processor plugin. The enum processor in particular seems to have quite a bit of overlap with the Logstash translate filter.


#3

Thanks for the feedback @daniel

I looked into the enum processor and I was not sure if enum could be used to replace a string with another string. I need to then use the result of the enum as the measurement name.

Let me explain the use case. At the moment, I use the logstash http input. I have a script that sends a csv file with timestamp and value using CURL. I also send three http headers with the csv file that are key-value pairs with metadata for the device UID, device channel and name of a gateway device. I want to form the measurement name preferably from a lookup of the UID or the UID itself. I want the field key to be same as the device channel and I want to tag the field values with the gateway id. Finally, the database name where I send the data is a lookup of the gateway ID.

I’ve been experimenting to find a way to implement something similar with telegraf and have not found my path yet. I think I am trying to avoid static name_overrides so that I do not have a pile of [[iinputs]] in the configuration file. My thinking about having multiple [[inputs]] is that their presence would adversely affect performance of telegraf. Perhaps that line of thought is not correct … can you please comment on that as well? If I have many [[inputs]], does that affect performance in any way (memory, cpu usage, etc)? Ultimately, there could be many thousands of devices (UID’s).


#4

One way I thought I might be able to approach this was to use the value of the path tag that is emitted from the tail input plugin. That path contains the gateway ID and a filename and the filename contains the UID and channel number metadata. If I could extract parts of the path tag value and use those for the measurement name and field key that would help me achieve a solution for this. Here is an example of the path captured by telegraf into influx:

time applicationName battery host path


1524449392000000000 telegraf-test-csv 3.53516 ip-172-30-3-170 /motescan/scannet03/plotmote/l00010000167A2E17_154.csv_channel_batt.csv

In this example, gateway id = “scannet03”, the UID = “00010000167A2E17” and the device channel = “batt”.


#5

I’ve had the best luck treating the measurement as a namespace instead of an exact identifier, so I would recommend using a general measurement name that applies to all the data with the same shape. The metadata will make good tags and you can look up the exact device using these.

We could use the regex processor to do some processing on the path field, but it isn’t powerful enough to split the data into multiple tags. Is all the data coming in through the tail plugin with one file per device? Is there any flexibility in the how the input data looks?

Additional plugins would be more expensive, mostly in terms of memory use. Each plugin type has different performance characteristics but usually each one is fairly cheap, still I doubt you will want to define thousands if its avoidable.


#6

The csv files have one file per device and channel. So, each device has up to 9 files, one for each channel of data (temperature, battery, x, y, z, a, b, c, d).

I ran a test yesterday using multiple instances of [[inputs.tail]]. Along the lines you suggested, I “generalized” the measurement name. Each [[inputs.tail]] used a unique name_override. I included an example of the config file below (note there are multiple [[inputs.tail]] sections, for each channel type, not shown to keep the size of this post shorter).

I have two processors, a regex processor to parse the path and extract the uid, channel and gateway and create a tag with the metadata. I also have an enum processor to convert the channel.

Finally a simple output to influx.

I ran a test with about a dozen csv files. Nine of the csv files were about 230MB each (about 10 millions points each). I had top running (top -h) and I noticed that when telegraf was done most of the memory was used and did not free. It appears to me that telegraf consumed about 2GB of memory. When I restarted telegraf, the memory freed up. Could this be due to a memory leak somewhere?

I have a question about the file and tail plugins. Is there a way to tell when it has finished processing a file? I would like to get rid of the csv file after telegraf has processed the data.

I considered the http_listener_v2 plugin but there is a 500MB limit to what you can send. It is rare but possible that a file I send could exceed that size.

Regarding the regex processor, my regex has three named subgroups. At the moment I use three tags processors so the regex runs three times. I have not tried this yet, can you have multiple “replacement” and “result_key” parameters in the same regex processor? This would avoid having to run the regex three times. For example:

[[processors.regex.tags]]
key = “path”
pattern = “^/motescan/(?P<gateway>scannet[0-9]+)/csv/l(?P<uid>[0-9A-Fa-f]+)[0-9]+\.csv_channel(?P<channel>[A-Za-z]?[A-Za-z0-9]+)\.csv$”
replacement = “${gateway}”
result_key = “gateway”
replacement = “${uid}”
result_key = “uid”
replacement = “${channel}”
result_key = “channel”


telegraf.conf

[[inputs.tail]]
files = ["/motescan/scannet03/csv/l*.csv_channel_z.csv"]
name_override = “witap.payload.data.acceleration”
from_beginning = false
pipe = false
watch_method = “inotify”
data_format = “grok”
grok_patterns = ["^%{NUMBER:timestamp:ts-epoch},%{NUMBER:z:float}$"]
[inputs.tail.tags]
applicationName = “telegraf-test-csv”
device_type = “mote”
sensor_type = “acceleration”
country = “ca”
city = “edmonton”
organization = “scanimetrics”

[[processors.regex]]
namepass = “witap.payload.data.*”
order = 1

[[processors.regex.tags]]
key = “path”
pattern = “^/motescan/(?P<gateway>scannet[0-9]+)/csv/l(?P<uid>[0-9A-Fa-f]+)[0-9]+\.csv_channel(?P<channel>[A-Za-z]?[A-Za-z0-9]+)\.csv$”
replacement = “${gateway}”
result_key = “gateway”

[[processors.regex.tags]]
key = “path”
pattern = “^/motescan/(?P<gateway>scannet[0-9]+)/csv/l(?P<uid>[0-9A-Fa-f]+)[0-9]+\.csv_channel(?P<channel>[A-Za-z]?[A-Za-z0-9]+)\.csv$”
replacement = “${uid}”
result_key = “uid”

[[processors.regex.tags]]
key = “path”
pattern = “^/motescan/(?P<gateway>scannet[0-9]+)/csv/l(?P<uid>[0-9A-Fa-f]+)[0-9]+\.csv_channel(?P<channel>[A-Za-z]?[A-Za-z0-9]+)\.csv$”
replacement = “${channel}”
result_key = “channel”

[[processors.enum]]
namepass = “witap.payload.data.*”
order = 2
[[processors.enum.mapping]]
field = “channel”
[processors.enum.mapping.value_mappings]
batt = “battery”
temp = “temperature”
x = “acceleration_x”
y = “acceleration_y”
z = “acceleration_z”
a = “strain_a”
b = “strain_b”
c = “strain_c”
d = “strain_d”

[[outputs.influxdb]]
namepass = “witap.payload.data.*”
urls = [“http://x.x.x.x:8086”] # required
database = “P100” # required
retention_policy = “”
write_consistency = “any”
timeout = “5s”
[outputs.influxdb.tagpass]
gateway = [ “scannet03” ]


#7

@daniel I noticed today that the enum doesn’t appear to work the way I have used it above. Can enum be used to map a string value?


#8

I’ll take a look into the memory use, we may still be referencing something that is no longer needed.

On the http_listener_v2 plugin you can increase the max_body_size option, though this plugin does read the entire body into memory before parsing the data.

The regex processor does not allow the creation of multiple tags/fields, but its a good idea. The example configuration wouldn’t be valid TOML, but I’m sure we could come up with something. Do you think you could open a feature request for this?

You can use the enum processor to remap to string data types, but it only operates fields and it looks like channel is a tag. This would be another good feature request, the only way I can think of to work around this currently would be to use the converter processor to convert it to a field, run the enum processor, then use converter again to move it back to a tag.


#9

Ok, I created a feature request for the regex and enum.

I’ll try your work around for the enum … that sounds like it will work.

Maybe logrotate takes care of the tail issue for me. If I rotate the csv files then they eventually get deleted and I do not have to keep them around.


#10

One issue I can think of though. I noticed that you need to restart telegraf when you update the configuration to load the new configuration. It is likely in my situation this will happen whenever we add a new device. I anticipate putting another file into the conf.d directory and restarting telegraf.

If telegraf is processing a file with the tail plugin at the time the configuration is reloaded will processing continue from where it last was or is there a chance there will be data loss during the restart/reload? That is, if telegraf is, say, half way through the new data at the end of a file and telegraf restarts, does it pick up at the same point in the file or at the end of the file?


#11

It picks up at the end, or if from_beginning = true from the very beginning, but isn’t able to continue at the same file position. If you are sending to InfluxDB reprocessing the data should overwrite with not changes, but it would become a concern if there are many files in the directory that are reprocessed.

I have heard ideas for an input that moves files to another directory after processing, it seems like something like this would be nice here. It seems that this type of processing is quite common especially in industrial use-cases.

You might want to also use the new grok_unique_timestamp = "auto" option in 1.10-rc1 or later to avoid any timestamp adjustments, or switch to the csv parser (also would need 1.10-rc1 for the unix timestamp parsing).


#12

I like the idea of moving the file after processing it.

I’ll look up grok_unique_timestamp and see what it does.

Thank you for your feedback and assistance.


#13

I checked out the memory usage and I didn’t see any problems (tested with the latest development code) after parsing a 1GB file with tail+grok. For keeping an eye on memory, and for other troubleshooting, I usually enable the internal plugin with memstats:

[[inputs.internal]]
  collect_memstats = true

The main metrics I watch for memory are internal_memstats: heap_sys_bytes and heap_alloc_bytes. With top make sure to look at RES for a reasonable figure on memory, VIRT isn’t usually very meaningful.


#14

Ok. I started collecting stats earlier today with using the telegraf mem input plugin. I will also enable the internal one you mentioned and let you know what I find out.

Did your test include a regex processor using subgroups as well?


#15

No, I skipped these, do you think these could be the cause?


#16

I am not sure. I will do some more testing and get back to you.


#17

@daniel

I thought I would give you an update. I’m still trying to figure out what is going on. It is not clear to me yet from the observations. I threw another large batch of data at telegraf today and observed the same behaviour I mentioned before. Free memory dropped dramatically and did not come back. I’ve attached a screen shot.

Here you can see at 10:42 I started sending the data to telegraf. There was an immediate drop in free memory from about 1.5GB to about 670 MB. An hour later, I restarted logstash at 11:42 which reclaimed the memory logstash was consuming. I restarted telegraph at 11:44 but there was no change in the free memory. I expected after the telegraf restart total free memory would be about 2.5G. I’m not sure where it is allocated but what caused its allocation in the first place appears to be telegraf. Perhaps this is an OS related issue? The “dump” of data to telegraf happens when I concatenate a large file onto an existing file that telegraf is watching with the tail plugin. Maybe the OS allocates a bunch of memory when it does the concatenation?

I am not certain this is a telegraf problem.


#18

Maybe it is what’s described here: https://www.linuxatemyram.com/. Take a look at the buffered and cached memory during this period.


#19

@daniel Thank you for the link and apologies for the time to get back to you on this.

I think the issue is the OS is caching files. So, when you throw a large file at telegraf, the OS caches it and eats up a bunch of ram. It is freed again when you need it. To get a better picture I needed to look at free memory and cached memory together (which is available memory).

Thanks again for the assistance.