Poor write performance to influxd - events getting dropped

#1

I am trying to aggregate bigip request logs for a couple of virtual servers to influxd and graph them using grafana (graph the data from the retention policies to which i write via continuous queries in influxd). The idea is something like this: f5 -> rsyslog (udp - writes user.info facility entries to every1min logrotating files on a tmpfs (to not be a bottleneck)) -> telegraf logparser using bellow conf -> influxd default retention policy ( -> continuous queries -> retention policies -> grafana )

my problem is that the write performance in influxd is abysmal. As in it looks like a lot of events get dropped (see chronograph graph). I’ve tried changing the output to file and counted the input file entries / minute versus the output-logparser-parsed entries / minute and came to the conclusion that telegraf is not the bottleneck (the logparser and the processors.regex at least) -the ones commented bellow.

about 1000 requests / second, not necessarily in order but most are (some requests may take longer)

[global_tags]
[agent]
interval = “10s”
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = “0s”
flush_interval = “10s”
flush_jitter = “0s”
precision = “”
debug = false
quiet = false
logfile = “/var/log/telegraf/telegraf_error.log”
hostname = “”
omit_hostname = false
[[outputs.influxdb]]
namedrop = [“rqbipsysl”,“internal_"]
urls = [“http://127.0.0.1:8086”] # required
database = “telegraf” # required
retention_policy = “”
write_consistency = “any”
timeout = “5s”
[[inputs.cpu]]
percpu = true
totalcpu = true
collect_cpu_time = false
report_active = false
[[inputs.disk]]
ignore_fs = [“tmpfs”, “devtmpfs”, “devfs”]
[[inputs.diskio]]
[[inputs.kernel]]
[[inputs.mem]]
[[inputs.processes]]
[[inputs.swap]]
[[inputs.system]]
[[inputs.logparser]]
files = ["/srv/accesslog_tmpfs/bigip_access_log"]
from_beginning = false
name_override = “rqbipsysl”
[inputs.logparser.grok]
patterns = ["%{BIGIPSYSLOG}"]
custom_patterns = ‘’’
BIGIPSYSLOG %{IP:bigipip:tag} [%{HTTPDATE:ts:ts-httpd}] %{IP:clientip} %{IP:virtual_ip:tag} %{DATA:virtual_name:tag} %{DATA:virtual_pool_name:tag} %{DATA:server:tag} %{NUMBER:server_port} “(?:%{WORD:verb:tag} %{NOTSPACE:request:tag}(?: HTTP/%{NUMBER:http_version:float})?|%{DATA})” %{NUMBER:resp_code:tag} %{NUMBER:bytes:int} %{NUMBER:response_ms:int} %{QS:referrer} %{QS:agent}
‘’’
[[processors.regex]]
namepass = [“rqbipsysl”]
[[processors.regex.tags]]
key = “resp_code”
pattern = “^(\d)\d\d$”
replacement = “{1}xx" [[processors.regex.tags]] key = "request" pattern = "/?([[:alnum:]%_=.:~+,-]+)/?([[:alnum:]%_=.:~+,-]+)?/?([[:alnum:]%_=.:~+,-]+)?.*" replacement = "{1}”
result_key = “context”
[[processors.regex.tags]]
key = “request”
pattern = "/?([[:alnum:]%=.:~+,-]+)/?([[:alnum:]%=.:~+,-]+)?/?([[:alnum:]%_=.:~+,-]+)?.

replacement = “{2}" result_key = "subcontext" [[processors.regex.tags]] key = "request" pattern = "/?([[:alnum:]%_=.:~+,-]+)/?([[:alnum:]%_=.:~+,-]+)?/?([[:alnum:]%_=.:~+,-]+)?.*" replacement = "{3}”
result_key = “subsubcontext”
[[outputs.influxdb]]
urls = [“http://127.0.0.1:8086”] # required
database = “bigipsyslog” # required
write_consistency = “any”
timeout = “5s”
namepass = [“rqbipsysl”]
tagexclude = [“request”]
retention_policy = “asis”
[outputs.influxdb.tagdrop]
virtual_name = [ “/Common/vsname_ssl_vs” ]
#[[outputs.file]]

files = ["/srv/accesslog_tmpfs/bubu7.txt"]

namepass = [“rqbipsysl”]

tagexclude = [“request”]

#[[inputs.internal]]

collect_memstats = true

#[[outputs.file]]

files = ["/var/log/telegraf/telegraf_internal.log"]

namepass = [“internal_*”]

#2

it’s logparser related…if i decrease the number of tags/fields it produces influx starts to be under load.

BIGIPSYSLOG %{IP} [%{HTTPDATE}] %{IP} %{IP:clientip} %{DATA} %{DATA} %{DATA} %{NUMBER} “(?:%{WORD} %{NOTSPACE}(?: HTTP/%{NUMBER})?|%{DATA})” %{NUMBER} %{NUMBER} %{NUMBER} %{QS} %{QS}

produces this:

#3

after switching to filebeat -> logstash -> influxdb the write rate is substantially better (magnitudes faster with magnitudes smaller cpu footprint compared to telegraf). still, the influx output plugin for logstash is not really supported and quite a couple of errors occur (around 2/3 of metriucs reach influx)…So basically, no way to parse large amounts of logs using the TICK platform :frowning:

kind of a bummer…that should have been one of it’s star features…

#4

ended up with this logstash config which seems to work excelently. had to improve timestamp precision so that log entries don’t overwrite each other in influxdb. it should work for up to 1mil requests / sec.

really sad that i couldn’t do this in telegraf due to performance (besides lack of functionality).

input {

beats {
    port => "5044"
}

}

filter {

grok {
    match => { "message" => "%{IP:bigipip} \[%{HTTPDATE:timestamp}\] %{IP:clientip} %{IP:virtual_ip} %{DATA:virtual_name} %{DATA:virtual_pool_name} %{DATA:server} %{NUMBER:server_port} \"(?:%{WORD:verb} %{NOTSPACE:request_uri}(?: HTTP/%{NUMBER:http_version})?|%{DATA})\" %{NUMBER:resp_code} %{NUMBER:bytes} %{NUMBER:response_ms} %{QS} %{QS}"}
    match => { "request_uri" => "/?(?<context>([[:alnum:]%_=.:~+,-]+)?)/?(?<subcontext>([[:alnum:]%_=.:~+,-]+)?)/?" }
    break_on_match => false
}

date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}

improve timestamp precision with filebeat offset so that entries don’t overwrite each other in influxdb

ruby {
    code => 'event.set("offset", event.get("offset")%1000000);
             event.set("time", event.get("@timestamp").to_i);
             event.set("time", event.get("time")*1000000 + event.get("offset"));
             event.set("time", event.get("time").to_s);
            '
}

if "_grokparsefailure" in [tags] { mutate { add_field => { "grokstatus" => "nok" } }
} else { mutate { add_field => { "grokstatus" => "ok" } } }

filter the values for context and subcontext because of influxdb limit of unique values per tag

if [context] and [context] not in
    [
    "ztest"
    ]
    {
      mutate {
        replace => { "context" => "zrestnotinabovelistz" }
      }
    }

if [subcontext] and [subcontext] not in
    [
    "ztest"
    ]
    {
      mutate {
        replace => { "subcontext" => "zrestnotinabovelistz" }
      }
    }

mutate {
    gsub => [ "resp_code", "^(\d)\d\d$", '\1xx' ]
    remove_field => [ "prospector", "tags", "beat", "host", "input", "request_uri", "source", "@version", "message", "timestamp", "offset" ]
    convert => { "response_ms" => "integer" }
    convert => { "bytes" => "integer" }
}

}

output {

stdout { }

file { path => “/srv/accesslog_tmpfs/logstash_dump” }

influxdb {
    allow_time_override => "true"
    db => "bigipsyslog"
    host => "127.0.0.1"
    measurement => "rqbipsysl"

codec => “json”

    use_event_fields_for_data_points => true
    exclude_fields => [ "@timestamp" ]
    send_as_tags => ["bigipip", "virtual_name", "virtual_pool_name", "server", "verb", "context", "subcontext", "subsubcontext", "resp_code" ]
    retention_policy => "asis"
    time_precision => "u"
    flush_size => 500
    idle_flush_time => 10
}

}