To whom it may concern,
I am trying to monitor the network usage of a certain process using Telegraf — specifically, how many bytes a particular process sends and receives over the network. I am using Procstat input plugin (saving to InfluxDB table procstat_socket).
This works fine for regular network traffic, but I am unable to capture multicast communication.
Multicast traffic does not appear at all in the saved data. I am not sure whether this is due to a incorrect configuration in my Telegraf setup, a bug in Telegraf itself, or simply because multicast monitoring is not supported.
I would appreciate any guidance or suggestions on this issue. I am attaching my Telegraf configuration, Python scripts that I use for local testing of this functionality and the SQL query which I use to see the results from procstat_sockets.
Versions
Telegraf: 1.36.2
InfluxDB: 3.8.0
Telegraf configuration
# Configuration for telegraf agent
[agent]
interval = "1s" ## default data collection interval for all inputs
round_interval = true ## if interval="10s" then always collect on :00, :10, :20, etc.
metric_batch_size = 1000 ## this controls the size of writes that Telegraf sends to output plugins
metric_buffer_limit = 10000 ## for failed writes, telegraf will cache metric_buffer_limit metrics for each output, and will flush this buffer on a successful write
collection_jitter = "0s" ## collection jitter is used to jitter the collection by a random amount. Each plugin will sleep for a random time within jitter before collecting
flush_interval = "10s" ## default flushing interval for all outputs. Maximum flush_interval will be flush_interval + flush_jitter
flush_jitter = "0s" ## jitter the flush interval by a random amount. This is primarily to avoid large write spikes for users running a large number of telegraf instances.
precision = "1ns"
## Logging configuration:
debug = true ## Run telegraf with debug log messages.
quiet = false ## Run telegraf in quiet mode (error log messages only).
logfile = "" ## Specify the log file name. The empty string means to log to stderr.
hostname = "" ## Override default hostname, if empty use os.Hostname()
omit_hostname = false ## If set to true, do no set the "host" tag in the telegraf agent.
skip_processors_after_aggregators = true
## this part is input for collecting data from my Application
[[inputs.socket_listener]]
service_address = "udp://:8081"
data_format = "influx"
## this part is for collecting metrics from the OS
[[inputs.cpu]]
percpu = true ## whether to report per-cpu stats or not
totalcpu = true ## whether to report total system cpu stats or not
collect_cpu_time = false ## if true, collect raw CPU time metrics
report_active = false ## if true, compute and report the sum of all non-idle CPU states.
[[inputs.disk]]
## By default stats will be gathered for all mount points.
## Set mount_points will restrict the stats to only the specified mount points.
mount_points = ["/", "/var", "/home", "/home_local", "/boot", "/tmp"]
## Ignore mount points by filesystem type.
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.ethtool]]
normalize_keys = ["snakecase", "trim", "lower", "underscore"]
interface_exclude = ["docker0", "br-*"]
[[inputs.kernel]]
[[inputs.kernel_vmstat]]
[[inputs.mem]]
[[inputs.net]]
ignore_protocol_stats=true
[[inputs.processes]]
[[inputs.procstat]]
## Pattern as argument for pgrep (ie, pgrep -f <pattern>)
pattern = "data_sender"
## Properties to collect. Available options are cpu, limits, memory, mmap, sockets
properties = ["cpu", "memory", "sockets"]
## Protocol filter for the sockets property. Available options are all, tcp4, tcp6, udp4, udp6, unix.
socket_protocols = ["tcp4", "udp4", "unix"]
## Mode to use when calculating CPU usage. Can be one of 'solaris' or 'irix'.
mode = "solaris"
[[inputs.swap]]
[[inputs.system]]
[[inputs.temp]]
# output for InfluxDB 3.x (over v2 API)
[[outputs.influxdb_v2]]
urls = ["http://localhost:8181"]
token = "-"
bucket = "example"
insecure_skip_verify = true
Python data sender
import requests
import time
import socket
import json
ENDPOINT = "https://petstore.swagger.io/v2/pet"
HEADERS = {"Content-Type": "application/json"}
DATA_500_KB = {
"id": 0,
"category": {"i": 0, "n": "s"},
"name": "d",
"photoUrls": ["A" * 499_882],
"tags": [{"i": 0, "n": "s"}],
"status": "a"
}
FILE_PATH = "test_5mb.bin"
MCAST_GRP = '239.255.255.250'
MCAST_PORT = 3000
def send_multicast():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)
try:
sock.sendto(b"x" * (50 * 1000), (MCAST_GRP, MCAST_PORT))
print("Uploaded 50 kB to multicast")
except Exception as ex:
print(f"Multicast error: {ex}")
finally:
sock.close()
if __name__ == "__main__":
while True:
try:
# 1) send 1 MB to unicast
json_encoded = json.dumps(DATA_500_KB)
res = requests.post(ENDPOINT, data=json_encoded, headers=HEADERS, timeout=4)
print(f"Uploaded 500 kB to Unicast ({res.status_code})")
# 2) send 42 kB to multicast
send_multicast()
# 3) download 627 kB
res = requests.get("https://cdn.eso.org/images/screen/eso1914a.jpg")
print(f"Downloaded 627 kB ({res.status_code})")
except Exception as e:
print(f"Error: {e}")
time.sleep(2)
Python multicast receiver
import socket
import struct
MCAST_GRP = '239.255.255.250'
MCAST_PORT = 3000
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
print("Listening for multicast...")
while True:
data, addr = sock.recvfrom(65536)
print(f"Received {len(data)} bytes from {addr}")
SQL testing query
WITH counter_diffs AS (
SELECT
date_bin(INTERVAL '2s', time) AS time_bucket,
bytes_sent,
bytes_received,
pid,
GREATEST(bytes_sent - LAG(bytes_sent) OVER (PARTITION BY pid ORDER BY time), 0) AS bytes_sent_increase,
GREATEST(bytes_received - LAG(bytes_received) OVER (PARTITION BY pid ORDER BY time), 0) AS bytes_received_increase
FROM
procstat_socket
WHERE
time >= now() - INTERVAL '5 minutes'
)
SELECT
time_bucket,
SUM(bytes_sent_increase) AS bytes_sent_rate,
SUM(bytes_received_increase) AS bytes_received_rate
FROM
counter_diffs
WHERE
pid IN (SELECT DISTINCT pid FROM procstat WHERE cmdline ILIKE '%$component%' AND time >= now() - INTERVAL '5 minutes')
AND (bytes_sent_increase > 0 OR bytes_received_increase > 0)
GROUP BY time_bucket, pid
ORDER BY pid, time_bucket
