Multicast communication is not being stored in procstat_socket

To whom it may concern,

I am trying to monitor the network usage of a certain process using Telegraf — specifically, how many bytes a particular process sends and receives over the network. I am using Procstat input plugin (saving to InfluxDB table procstat_socket).

This works fine for regular network traffic, but I am unable to capture multicast communication.
Multicast traffic does not appear at all in the saved data. I am not sure whether this is due to a incorrect configuration in my Telegraf setup, a bug in Telegraf itself, or simply because multicast monitoring is not supported.

I would appreciate any guidance or suggestions on this issue. I am attaching my Telegraf configuration, Python scripts that I use for local testing of this functionality and the SQL query which I use to see the results from procstat_sockets.

Versions

Telegraf: 1.36.2
InfluxDB: 3.8.0

Telegraf configuration

# Configuration for telegraf agent
[agent]
  interval = "1s"               ## default data collection interval for all inputs
  round_interval = true         ## if interval="10s" then always collect on :00, :10, :20, etc.
  metric_batch_size = 1000      ## this controls the size of writes that Telegraf sends to output plugins
  metric_buffer_limit = 10000   ## for failed writes, telegraf will cache metric_buffer_limit metrics for each output, and will flush this buffer on a successful write
  collection_jitter = "0s"      ## collection jitter is used to jitter the collection by a random amount. Each plugin will sleep for a random time within jitter before collecting
  flush_interval = "10s"        ## default flushing interval for all outputs. Maximum flush_interval will be flush_interval + flush_jitter
  flush_jitter = "0s"           ## jitter the flush interval by a random amount. This is primarily to avoid large write spikes for users running a large number of telegraf instances.
  precision = "1ns"

  ## Logging configuration:
  debug = true                  ## Run telegraf with debug log messages.
  quiet = false                 ## Run telegraf in quiet mode (error log messages only).
  logfile = ""                  ## Specify the log file name. The empty string means to log to stderr.

  hostname = ""                 ## Override default hostname, if empty use os.Hostname()
  omit_hostname = false         ## If set to true, do no set the "host" tag in the telegraf agent.
  skip_processors_after_aggregators = true

## this part is input for collecting data from my Application
[[inputs.socket_listener]]
service_address = "udp://:8081"
data_format = "influx"

## this part is for collecting metrics from the OS
[[inputs.cpu]]
  percpu = true            ## whether to report per-cpu stats or not
  totalcpu = true          ## whether to report total system cpu stats or not
  collect_cpu_time = false ## if true, collect raw CPU time metrics
  report_active = false    ## if true, compute and report the sum of all non-idle CPU states.
[[inputs.disk]]
  ## By default stats will be gathered for all mount points.
  ## Set mount_points will restrict the stats to only the specified mount points.
  mount_points = ["/", "/var", "/home", "/home_local", "/boot", "/tmp"]
  ## Ignore mount points by filesystem type.
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.ethtool]]
  normalize_keys = ["snakecase", "trim", "lower", "underscore"]
  interface_exclude = ["docker0", "br-*"]
[[inputs.kernel]]
[[inputs.kernel_vmstat]]
[[inputs.mem]]
[[inputs.net]]
  ignore_protocol_stats=true
[[inputs.processes]]
[[inputs.procstat]]
  ## Pattern as argument for pgrep (ie, pgrep -f <pattern>)
  pattern = "data_sender"

  ## Properties to collect. Available options are cpu, limits, memory, mmap, sockets
  properties = ["cpu", "memory", "sockets"]

  ## Protocol filter for the sockets property. Available options are all, tcp4, tcp6, udp4, udp6, unix.
  socket_protocols = ["tcp4", "udp4", "unix"]

  ## Mode to use when calculating CPU usage. Can be one of 'solaris' or 'irix'.
  mode = "solaris"
[[inputs.swap]]
[[inputs.system]]
[[inputs.temp]]

# output for InfluxDB 3.x (over v2 API)
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8181"]
  token = "-"
  bucket = "example"
  insecure_skip_verify = true

Python data sender

import requests
import time
import socket
import json

ENDPOINT = "https://petstore.swagger.io/v2/pet"
HEADERS = {"Content-Type": "application/json"}

DATA_500_KB = {
    "id": 0,
    "category": {"i": 0, "n": "s"},
    "name": "d",
    "photoUrls": ["A" * 499_882],
    "tags": [{"i": 0, "n": "s"}],
    "status": "a"
}
FILE_PATH = "test_5mb.bin"

MCAST_GRP = '239.255.255.250'
MCAST_PORT = 3000

def send_multicast():
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0)

    try:
        sock.sendto(b"x" * (50 * 1000), (MCAST_GRP, MCAST_PORT))
        print("Uploaded 50 kB to multicast")
    except Exception as ex:
        print(f"Multicast error: {ex}")
    finally:
        sock.close()

if __name__ == "__main__":
    while True:
        try:
            # 1) send 1 MB to unicast
            json_encoded = json.dumps(DATA_500_KB)
            res = requests.post(ENDPOINT, data=json_encoded, headers=HEADERS, timeout=4)
            print(f"Uploaded 500 kB to Unicast ({res.status_code})")

            # 2) send 42 kB to multicast
            send_multicast()

            # 3) download 627 kB
            res = requests.get("https://cdn.eso.org/images/screen/eso1914a.jpg")
            print(f"Downloaded 627 kB ({res.status_code})")
        except Exception as e:
            print(f"Error: {e}")

        time.sleep(2)

Python multicast receiver

import socket
import struct

MCAST_GRP = '239.255.255.250'
MCAST_PORT = 3000

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', MCAST_PORT))

mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

print("Listening for multicast...")
while True:
    data, addr = sock.recvfrom(65536)
    print(f"Received {len(data)} bytes from {addr}")

SQL testing query

WITH counter_diffs AS (
  SELECT
    date_bin(INTERVAL '2s', time) AS time_bucket,
    bytes_sent,
    bytes_received,
    pid,
    GREATEST(bytes_sent - LAG(bytes_sent) OVER (PARTITION BY pid ORDER BY time), 0) AS bytes_sent_increase,
    GREATEST(bytes_received - LAG(bytes_received) OVER (PARTITION BY pid ORDER BY time), 0) AS bytes_received_increase
  FROM
    procstat_socket
  WHERE
    time >= now() - INTERVAL '5 minutes'
)

SELECT
  time_bucket,
  SUM(bytes_sent_increase) AS bytes_sent_rate,
  SUM(bytes_received_increase) AS bytes_received_rate
FROM
  counter_diffs
WHERE
  pid IN (SELECT DISTINCT pid FROM procstat WHERE cmdline ILIKE '%$component%' AND time >= now() - INTERVAL '5 minutes')
  AND (bytes_sent_increase > 0 OR bytes_received_increase > 0)
GROUP BY time_bucket, pid
ORDER BY pid, time_bucket