Telegraf processors.execd

telegraf.conf file
[agent]
debug = true
quiet = false
interval = “10s”
round_interval = true
metric_batch_size = 10000
metric_buffer_limit = 100000
flush_interval = “10s”

[[inputs.snmp_trap]]
service_address = “udp://:1610”
path = [“/usr/smp/snmp/mibs”]

[[processors.execd]]
command = [“/usr/bin/python3”,“/var/telegraf/alarm_script.py”]
restart_delay = “10s”

[[outputs.file]]
files = [“stdout”,“/var/log/telegraf/metrics.json”]

This is my telegraf file and when I receive a trap from any controller it comes in
sample trap:
snmp_trap,community=public,host=slp.com,mib=ALPHA-NOTIFICATION-MIB,name=alphaAlarmClearState,oid=.1.3.6.1.4.1.7.10.2,source=10.413.18.10,version=2c sysUpTime.0=586294866i,alarmActiveModelPointer=“alarmModelEntry.20004.4”,alarmModelDescription.20004.4=“DC System 48V : Output Voltage High : Major”,alarmSeverity=0i,alarmCustomDescription=“—”,alarmAdditionalInformation=“::”,alarmActiveResourceId=“componentListStaticName.1”,alarmModelState.20004.4=3i,componentListStaticName.2.1=“REWCP12WEL-HP (DC System 48V/2259)”,componentListReference.2.1=1i,controllerInfoName=" REWCP12WEL" 1746542789735623327

This is my alarm_script.py file
import sys
import re
from datetime import datetime

def escape_influx_string(value):
“”“Escape special characters for Influx Line Protocol.”“”
if value is None:
return “” # Return an empty string if the value is None
if isinstance(value, str):
return value.replace(‘"’, ‘“”’)
return value

Read metrics from STDIN

for line in sys.stdin:
try:
# Strip whitespace and skip empty lines
line = line.strip()
if not line:
continue

    # Prepare the trap input
    trap_input = line

    # Extract and remove the last number (alarm_time_ns) from the trap input
    trap_parts = trap_input.rsplit(' ', 1)
    trap_body = trap_parts[0]
    alarm_time_ns = trap_parts[1]

    # Remove 'version=2c' from the trap body
    trap_body = trap_body.replace('version=2c ', '')  # Ensure only 'version=2c ' is removed

    # Parse the trap body into a dictionary
    trap = dict(pair.split('=', 1) for pair in trap_body.split(',') if '=' in pair)
    trap["time_ns"] = alarm_time_ns

    # Extract values from the trap
    alarm_state = trap.get("name", "unknown")
    alarm_source_ip = trap.get("source", "unknown")
    alarm_oid = trap.get("oid", "unknown")
    alarm_time_ns = trap.get("time_ns", "0")

    # Determine alarm state
    if alarm_state in ["alphaAlarmActiveState", "dcPwrSysAlarmActiveTrap"]:
        alarm_state = "Active"
    elif alarm_state in ["alphaAlarmClearState", "dcPwrSysAlarmClearedTrap"]:
        alarm_state = "InActive"
    elif alarm_state == "dcPwrSysRelayTrap":
        alarm_state = "RelayTrap"

    # Use regex to find alarm name field
    alarm_name = None
    for key in trap.keys():
        if re.match("alarmModelDescription", key):
            alarm_name = trap[key].strip("'")
            break
        elif re.match("dcPwrSysCurrAlrmStringValue", key):
            alarm_name = trap[key].strip("'")
            break
        elif re.match("dcPwrSysRelayStringValue", key):
            alarm_name = trap[key].strip("'")
            break
        elif re.match("dcPwrSysMiscAlrmStringValue", key):
            alarm_name = trap[key].strip("'")
            break
        elif re.match("dcPwrSysRectAlrmStringValue", key):
            alarm_name = trap[key].strip("'")
            break
        elif re.match("dcPwrSysBattAlrmStringValue", key):
            alarm_name = trap[key].strip("'")
            break

    # Use regex to find alarm source CLLI field
    alarm_source_clli = None
    for key in trap.keys():
        if re.match("controllerInfoName", key):
            alarm_source_clli = trap[key].strip("'")
            break
        elif re.match("dcPwrSysSiteName", key):
            alarm_source_clli = trap[key].strip("'")
            break

    # Create Influx Line Protocol output
    influx_line = (
        f"alarm,source_ip={alarm_source_ip},state={alarm_state} "
        f"name={alarm_name},oid={alarm_oid},source_clli={alarm_source_clli} {alarm_time_ns}"
    )

    # Print the Influx Line Protocol to STDOUT
    print(influx_line)
except Exception as e:
    # Print error message to STDERR
    print(f"Error processing trap: {e}", file=sys.stderr)

The Problem which I’m facing is that the traps can be printed and can also be saved to a file if the processor plugin is not used but when I use this processor plugin, traps does go into the script but it does not print anything or saves anything in the file
My end goal:
I have two types of controller this script will create one common payload regardless of the type of controller.
I also make sure that the output matches the Influx line protocol still
any help

@Prerak_Patel Welcome to the Influxdata Community!

I’ve made some adjustments to your Telegraf config and Python script to help get everything working smoothly. The changes include logging, timeouts, environment setup, and proper formatting for Influx Line Protocol.

Updated Telegraf Config

[agent]
  debug = true
  quiet = false
  interval = "10s"
  round_interval = true
  metric_batch_size = 10000
  metric_buffer_limit = 100000
  flush_interval = "10s"
  # Add this to help with debugging
  logtarget = "file"
  logfile = "/var/log/telegraf/telegraf.log"
  logfile_rotation_interval = "1d"
  logfile_rotation_max_size = "10MB"
  logfile_rotation_max_archives = 5

[[inputs.snmp_trap]]
  service_address = "udp://:1610"
  path = ["/usr/smp/snmp/mibs"]
  # Add timeout to ensure snmp operations don't hang
  timeout = "5s"

[[processors.execd]]
  command = ["/usr/bin/python3", "/var/telegraf/alarm_script.py"]
  restart_delay = "10s"
  # Add data format to ensure proper parsing
  data_format = "influx"
  # Ensure script has time to process data
  timeout = "10s"
  # Ensure signals are sent properly
  signal = "STDIN"
  # Ensure the environment is set up correctly
  environment = [
    "PYTHONUNBUFFERED=1"
  ]

[[outputs.file]]
  files = ["stdout", "/var/log/telegraf/metrics.json"]
  # Ensure data is written immediately
  flush_interval = "2s"
  # Set format to ensure proper formatting
  data_format = "influx"

Python Script

I’ve also added logging and some extra validation to the script to help troubleshoot any issues.

#!/usr/bin/env python3
import sys
import re
import json
import logging

# Set up logging to file for debugging
logging.basicConfig(
    filename='/var/log/telegraf/processor_debug.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def escape_influx_string(value):
    """Escape special characters for Influx Line Protocol."""
    if value is None:
        return ""  # Return an empty string if the value is None
    if isinstance(value, str):
        # Remove surrounding quotes if present
        value = value.strip('"\'')
        # Escape quotes for Influx Line Protocol
        value = f'"{value.replace("\"", "\\\"")}"'
        return value
    return value

def parse_line_protocol(line):
    """Parse Influx Line Protocol line into its components."""
    logging.debug(f"Parsing line: {line}")
    
    # Find the last space to separate the timestamp
    parts = line.rsplit(' ', 1)
    if len(parts) != 2:
        logging.error(f"Invalid line format (missing timestamp): {line}")
        return None
    
    main_part, timestamp = parts
    
    # Split main part into measurement+tags and fields
    measurement_tags, fields = main_part.split(' ', 1)
    
    return {
        "measurement_tags": measurement_tags,
        "fields": fields,
        "timestamp": timestamp
    }

def extract_key_value_pairs(text):
    """Extract key-value pairs from text with proper handling of quoted values."""
    logging.debug(f"Extracting key-value pairs from: {text}")
    
    pairs = {}
    current_pos = 0
    text_length = len(text)
    
    while current_pos < text_length:
        # Find the next key
        key_end = text.find('=', current_pos)
        if key_end == -1:
            break
        
        key = text[current_pos:key_end].strip()
        
        # Find the value
        value_start = key_end + 1
        if value_start >= text_length:
            break
            
        # Check if the value is quoted
        if text[value_start] == '"':
            # Find the matching closing quote
            quote_end = value_start + 1
            while quote_end < text_length:
                if text[quote_end] == '"' and text[quote_end-1] != '\\':
                    break
                quote_end += 1
            
            if quote_end >= text_length:
                # No closing quote found
                value = text[value_start:].strip()
                current_pos = text_length
            else:
                value = text[value_start:quote_end+1].strip()
                # Find the next comma or end
                next_comma = text.find(',', quote_end + 1)
                if next_comma == -1:
                    current_pos = text_length
                else:
                    current_pos = next_comma + 1
        else:
            # Find the next comma or end
            next_comma = text.find(',', value_start)
            if next_comma == -1:
                value = text[value_start:].strip()
                current_pos = text_length
            else:
                value = text[value_start:next_comma].strip()
                current_pos = next_comma + 1
        
        pairs[key] = value
    
    return pairs

# Main processing loop
for line in sys.stdin:
    try:
        # Strip whitespace and skip empty lines
        line = line.strip()
        if not line:
            continue

        logging.debug(f"Received input line: {line}")
        
        # Parse the incoming line protocol
        parsed = parse_line_protocol(line)
        if not parsed:
            continue
            
        # Parse measurement and tags
        measurement_tags = parsed["measurement_tags"]
        measurement_parts = measurement_tags.split(',')
        measurement = measurement_parts[0]
        
        # Extract tags
        tags = {}
        for part in measurement_parts[1:]:
            if '=' in part:
                key, value = part.split('=', 1)
                tags[key] = value
        
        # Extract fields
        fields_str = parsed["fields"]
        fields = extract_key_value_pairs(fields_str)
        
        # Get timestamp
        timestamp = parsed["timestamp"]
        
        logging.debug(f"Parsed measurement: {measurement}")
        logging.debug(f"Parsed tags: {json.dumps(tags)}")
        logging.debug(f"Parsed fields: {json.dumps(fields)}")
        
        # Determine alarm state
        alarm_state = "unknown"
        if tags.get("name") == "alphaAlarmActiveState" or tags.get("name") == "dcPwrSysAlarmActiveTrap":
            alarm_state = "Active"
        elif tags.get("name") == "alphaAlarmClearState" or tags.get("name") == "dcPwrSysAlarmClearedTrap":
            alarm_state = "InActive"
        elif tags.get("name") == "dcPwrSysRelayTrap":
            alarm_state = "RelayTrap"
        
        # Extract source IP
        alarm_source_ip = tags.get("source", "unknown")
        
        # Extract OID
        alarm_oid = tags.get("oid", "unknown")
        
        # Find alarm name using regex patterns
        alarm_name = None
        for key in fields:
            if re.match("alarmModelDescription", key):
                alarm_name = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysCurrAlrmStringValue", key):
                alarm_name = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysRelayStringValue", key):
                alarm_name = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysMiscAlrmStringValue", key):
                alarm_name = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysRectAlrmStringValue", key):
                alarm_name = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysBattAlrmStringValue", key):
                alarm_name = fields[key].strip('"\'')
                break
        
        # Use regex to find alarm source CLLI field
        alarm_source_clli = None
        for key in fields:
            if re.match("controllerInfoName", key):
                alarm_source_clli = fields[key].strip('"\'')
                break
            elif re.match("dcPwrSysSiteName", key):
                alarm_source_clli = fields[key].strip('"\'')
                break
        
        # If we couldn't find critical fields, log and continue
        if not alarm_name:
            logging.warning(f"Could not find alarm name in message: {line}")
            alarm_name = "Unknown"
        
        if not alarm_source_clli:
            logging.warning(f"Could not find source CLLI in message: {line}")
            alarm_source_clli = "Unknown"
        
        # Escape values for Influx Line Protocol
        alarm_name_escaped = escape_influx_string(alarm_name)
        alarm_oid_escaped = escape_influx_string(alarm_oid)
        alarm_source_clli_escaped = escape_influx_string(alarm_source_clli)
        
        # Create standardized Influx Line Protocol output
        influx_line = (
            f"alarm,source_ip={alarm_source_ip},state={alarm_state} "
            f"name={alarm_name_escaped},oid={alarm_oid_escaped},source_clli={alarm_source_clli_escaped} {timestamp}"
        )
        
        # Print the Influx Line Protocol to STDOUT
        sys.stdout.write(influx_line + "\n")
        sys.stdout.flush()  # Ensure output is flushed
        
        logging.debug(f"Output line: {influx_line}")
        
    except Exception as e:
        # Log the full exception and continue processing
        logging.error(f"Error processing trap: {str(e)}", exc_info=True)
        # Also write to stderr so Telegraf can see it
        sys.stderr.write(f"Error processing trap: {str(e)}\n")
        sys.stderr.flush()
1 Like