Logstash to influxdb2 aggregate datapoints issue

Hi
I have lots of log lines like this in exact same time, when i try to use logstash to pars and send to influxdb2, influx or ligstash aggregates some lines!
e.g here is the sample lines that aggregate is I[847676]

2023-11-20 14:05:49:787 INFO T[SHR1I7783] APP R[GW] I[675940] U[461301]
2023-11-20 14:05:49:787 INFO T[SHR5I7787] APP R[GW] I[847676] U[432408]
2023-11-20 14:05:49:787 INFO T[SHR5I7787] APP R[GW] I[847676] U[935351]
2023-11-20 14:05:49:787 INFO T[SHR5I7787] APP R[GW] I[847676] U[533755]
2023-11-20 14:05:49:787 INFO T[SHR0I7782] APP R[GW] I[50888] U[600770]
2023-11-20 14:05:49:787 INFO T[SHR5I7787] APP S[GW] I[847676] U[432409]

1-I try to set I as field but still aggregate them.
2-try to set I as tag, it work but it will produce performance issue due high cardinality.
3-try to use uid, for each even still aggregate them.
4-U is quietly unique field, i try to use that but still aggregate datapoints.
5-if i want use telegraf instead of logstash what is the correct configuration that work with patter that i mentioned?

FYI1: don’t want use metric because it is important to group by T,I,R,S
FYI2: R means received, S means send

Any idea?
Thanks

Can you share your config you have at present?

@FixTestRepeat sure, here it is:

input {
file {
path => “/home/app/logs/2023.log”
start_position => “beginning”
sincedb_path => “/dev/null”
exclude => [“.gz" , ".bz2” , “*.slice” ]
codec => plain { charset => “UTF-8” }
}
}

filter {

mutate
{
    replace => { "host" => "${HOSTNAME}"}

    replace => { "IP_INFLUX" => "192.168.1.1"}
    replace => { "BUCKET_INFLUX" => "mybucket"}
    replace => { "TOKEN_INFLUX" =>  "mytoken"}
    replace => { "ORG_INFLUX" => "myorg"}
}

mutate
{
    replace => { "URL" => "http://%{[IP_INFLUX]}:8087/api/v2/write?bucket=%{[BUCKET_INFLUX]}&precision=s&org=%{[ORG_INFLUX]}"}
}

grok {
  match => { "message" => "%{NOTSPACE} %{NOTSPACE} %{NOTSPACE} %{WORD:module} %{DATA:direction} %{GREEDYDATA}" }
}

if [direction] == "R[GW]" {

grok {
  match => { "message" => "^%{TIMESTAMP_ISO8601:timestamp} INFO T\[%{DATA:Thread}\] APP R\[%{DATA:R}\] I\[%{NONNEGINT:I}\] U\[%{NONNEGINT:uid}\]" }
}

  mutate {
    add_tag => ["GW_In"]

  }
}
if [direction] == "S[GW]" {

grok {
  match => { "message" => "^%{TIMESTAMP_ISO8601:timestamp} INFO T\[%{DATA:Thread}\] APP S\[%{DATA:S}\] I\[%{NONNEGINT:I}\] U\[%{NONNEGINT:uid}\]" }
}

  mutate {
    add_tag => ["GW_Out"]
  }
}

}

output
{
if “GW_In” in [tags]

{

http {
url => “%{[URL]}”
http_method => “post”
format => message
message => ‘APP_In,Thread=%{[Thread]},host=%{[host]},R=%{[R]},I=%{[I]} uid=“%{[uid]}”’

headers => [
  'Authorization', 'Token %{[TOKEN_INFLUX]}'
]

}

}

if "GW_Out" in [tags]

{

http {
url => “%{[URL]}”
http_method => “post”
format => message
message => ‘APP_Out,Thread=%{[Thread]},host=%{[host]},S=%{[S]},I=%{[I]} uid=“%{[uid]}”’

headers => [
  'Authorization', 'Token %{[TOKEN_INFLUX]}'
]

}

Hmm if you are open to this and bypassing logstash completely, that might be the best way to go. (I can’t see any logstash to influxdb plugins that appear to be maintained currently.)

Looks like the telegraf tail plugin + grok filter is the current best method in the influx ecosystem. Take a look here.

1 Like

Thank you @FixTestRepeat