Connection error upon attempting to create a kapacitor stream recording

kapacitor
#1

I am currently attempting to backfill data for a stream tickscript using the method found here: https://github.com/influxdata/docs.influxdata.com/issues/925

Attempting to create the recording generates the following error however:
Post http://localhost:9092/kapacitor/v1/recordings/query: read tcp [::1]:59814->[::1]:9092: read: connection reset by peer

I’m pretty sure this isn’t a firewall issue as I am able to telnet to 9092 on the machine

here’s my Kapacitor config for reference

# The hostname of this node.
# Must be resolvable by any configured InfluxDB hosts.
hostname = "localhost"
# Directory for storing a small amount of metadata about the server.
data_dir = "/var/lib/kapacitor"

# Do not apply configuration overrides during startup.
# Useful if the configuration overrides cause Kapacitor to fail startup.
# This option is intended as a safe guard and should not be needed in practice.
skip-config-overrides = false

# Default retention-policy, if a write is made to Kapacitor and
# it does not have a retention policy associated with it,
# then the retention policy will be set to this value
default-retention-policy = ""

[http]
  # HTTP API Server for Kapacitor
  # This server is always on,
  # it serves both as a write endpoint
  # and as the API endpoint for all other
  # Kapacitor calls.
  bind-address = ":9092"
  auth-enabled = false
  log-enabled = true
  write-tracing = false
  pprof-enabled = false
  https-enabled = false
  https-certificate = "/etc/ssl/kapacitor.pem"

[config-override]
  # Enable/Disable the service for overridding configuration via the HTTP API.
  enabled = true

[logging]
    # Destination for logs
    # Can be a path to a file or 'STDOUT', 'STDERR'.
    file = "/var/log/kapacitor/kapacitor.log"
    # Logging level can be one of:
    # DEBUG, INFO, WARN, ERROR, or OFF
    level = "INFO"

[replay]
  # Where to store replay files, aka recordings.
  dir = "/var/lib/kapacitor/replay"

[task]
  # Where to store the tasks database
  # DEPRECATED: This option is not needed for new installations.
  # It is only used to determine the location of the task.db file
  # for migrating to the new `storage` service.
  dir = "/var/lib/kapacitor/tasks"
  # How often to snapshot running task state.
  snapshot-interval = "60s"

[storage]
  # Where to store the Kapacitor boltdb database
  boltdb = "/var/lib/kapacitor/kapacitor.db"

[deadman]
  # Configure a deadman's switch
  # Globally configure deadman's switches on all tasks.
  # NOTE: for this to be of use you must also globally configure at least one alerting method.
  global = false
  # Threshold, if globally configured the alert will be triggered if the throughput in points/interval is <= threshold.
  threshold = 0.0
  # Interval, if globally configured the frequency at which to check the throughput.
  interval = "10s"
  # Id -- the alert Id, NODE_NAME will be replaced with the name of the node being monitored.
  id = "node 'NODE_NAME' in task '{{ .TaskName }}'"
  # The message of the alert. INTERVAL will be replaced by the interval.
  message = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"collected\" | printf \"%0.3f\" }} points/INTERVAL."


# Multiple InfluxDB configurations can be defined.
# Exactly one must be marked as the default.
# Each one will be given a name and can be referenced in batch queries and InfluxDBOut nodes.
[[influxdb]]
  # Connect to an InfluxDB cluster
  # Kapacitor can subscribe, query and write to this cluster.
  # Using InfluxDB is not required and can be disabled.
  enabled = true
  default = true
  name = "localhost"
  urls = ["https://localhost:8086"]
  username = "kapacitor"
  password = "<snip>"
  timeout = 0
  # Absolute path to pem encoded CA file.
  # A CA can be provided without a key/cert pair
  #   ssl-ca = "/etc/kapacitor/ca.pem"
  # Absolutes paths to pem encoded key and cert files.
  #   ssl-cert = "/etc/kapacitor/cert.pem"
  #   ssl-key = "/etc/kapacitor/key.pem"

  # Do not verify the TLS/SSL certificate.
  # This is insecure.
  insecure-skip-verify = true

  # Maximum time to try and connect to InfluxDB during startup
  startup-timeout = "5m"

  # Turn off all subscriptions
  disable-subscriptions = false

  # Which protocol to use for subscriptions
  # one of 'udp', 'http', or 'https'.
  subscription-protocol = "http"

  # Subscriptions resync time interval
  # Useful if you want to subscribe to new created databases
  # without restart Kapacitord
  subscriptions-sync-interval = "1m0s"

  # Override the global hostname option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to `hostname` if empty.
  kapacitor-hostname = ""

  # Override the global http port option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to the port from `[http] bind-address` if 0.
  http-port = 0

  # Host part of a bind address for UDP listeners.
  # For example if a UDP listener is using port 1234
  # and `udp-bind = "hostname_or_ip"`,
  # then the UDP port will be bound to `hostname_or_ip:1234`
  # The default empty value will bind to all addresses.
  udp-bind = ""
  # Subscriptions use the UDP network protocl.
  # The following options of for the created UDP listeners for each subscription.
  # Number of packets to buffer when reading packets off the socket.
  udp-buffer = 1000
  # The size in bytes of the OS read buffer for the UDP socket.
  # A value of 0 indicates use the OS default.
  udp-read-buffer = 0

  [influxdb.subscriptions]
    # Set of databases and retention policies to subscribe to.
    # If empty will subscribe to all, minus the list in
    # influxdb.excluded-subscriptions
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "default", "longterm" ]
  [influxdb.excluded-subscriptions]
    # Set of databases and retention policies to exclude from the subscriptions.
    # If influxdb.subscriptions is empty it will subscribe to all
    # except databases listed here.
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "default", "longterm" ]

[kubernetes]
  # Enable/Disable the kubernetes service.
  # Needed by the k8sAutoscale TICKscript node.
  enabled = false
  # There are several ways to connect to the kubernetes API servers:
  #
  # Via the proxy, start the proxy via the `kubectl proxy` command:
  #   api-servers = ["http://localhost:8001"]
  #
  # From within the cluster itself, in which case
  # kubernetes secrets and DNS services are used
  # to determine the needed configuration.
  #   in-cluster = true
  #
  # Direct connection, in which case you need to know
  # the URL of the API servers,  the authentication token and
  # the path to the ca cert bundle.
  # These value can be found using the `kubectl config view` command.
  #   api-servers = ["http://192.168.99.100:8443"]
  #   token = "..."
  #   ca-path = "/path/to/kubernetes/ca.crt"



[smtp]
  # Configure an SMTP email server
  # Will use TLS and authentication if possible
  # Only necessary for sending emails from alerts.
  enabled = false
  host = "localhost"
  port = 25
  username = ""
  password = ""
  # From address for outgoing mail
  from = ""
  # List of default To addresses.
  # to = ["oncall@example.com"]

  # Skip TLS certificate verify when connecting to SMTP server
  no-verify = false
  # Close idle connections after timeout
  idle-timeout = "30s"

  # If true the all alerts will be sent via Email
  # without explicitly marking them in the TICKscript.
  global = false
  # Only applies if global is true.
  # Sets all alerts in state-changes-only mode,
  # meaning alerts will only be sent if the alert state changes.
  state-changes-only = false

[snmptrap]
  # Configure an SNMP trap server
  enabled = false
  # The host:port address of the SNMP trap server
  addr = "localhost:162"
  # The community to use for traps
  community = "kapacitor"
  # Number of retries when sending traps
  retries = 1


[opsgenie]
    # Configure OpsGenie with your API key and default routing key.
    enabled = false
    # Your OpsGenie API Key.
    api-key = ""
    # Default OpsGenie teams, can be overridden per alert.
    # teams = ["team1", "team2"]
    # Default OpsGenie recipients, can be overridden per alert.
    # recipients = ["recipient1", "recipient2"]
    # The OpsGenie API URL should not need to be changed.
    url = "https://api.opsgenie.com/v1/json/alert"
    # The OpsGenie Recovery URL, you can change this
    # based on which behavior you want a recovery to
    # trigger (Add Notes, Close Alert, etc.)
    recovery_url = "https://api.opsgenie.com/v1/json/alert/note"
    # If true then all alerts will be sent to OpsGenie
    # without explicitly marking them in the TICKscript.
    # The team and recipients can still be overridden.
    global = false

[victorops]
  # Configure VictorOps with your API key and default routing key.
  enabled = false
  # Your VictorOps API Key.
  api-key = ""
  # Default VictorOps routing key, can be overridden per alert.
  routing-key = ""
  # The VictorOps API URL should not need to be changed.
  url = "https://alert.victorops.com/integrations/generic/20131114/alert"
  # If true the all alerts will be sent to VictorOps
  # without explicitly marking them in the TICKscript.
  # The routing key can still be overridden.
  global = false

[pagerduty]
  # Configure PagerDuty.
  enabled = false
  # Your PagerDuty Service Key.
  service-key = ""
  # The PagerDuty API URL should not need to be changed.
  url = "https://events.pagerduty.com/generic/2010-04-15/create_event.json"
  # If true the all alerts will be sent to PagerDuty
  # without explicitly marking them in the TICKscript.
  global = false

[slack]
  # Configure Slack.
  enabled = false
  # The Slack webhook URL, can be obtained by adding
  # an Incoming Webhook integration.
  # Visit https://slack.com/services/new/incoming-webhook
  # to add new webhook for Kapacitor.
  url = ""
  # Default channel for messages
  channel = ""
  # If true all the alerts will be sent to Slack
  # without explicitly marking them in the TICKscript.
  global = false
  # Only applies if global is true.
  # Sets all alerts in state-changes-only mode,
  # meaning alerts will only be sent if the alert state changes.
  state-changes-only = false

[telegram]
  # Configure Telegram.
  enabled = false
  # The Telegram Bot URL should not need to be changed.
  url = "https://api.telegram.org/bot"
  # Telegram Bot Token, can be obtained From @BotFather.
  token = ""
  # Default recipient for messages, Contact @myidbot on Telegram to get an ID.
  chat-id = ""
  # Send Markdown or HTML, if you want Telegram apps to show bold, italic, fixed-width text or inline URLs in your alert message.
  #parse-mode  = "Markdown"
  # Disable link previews for links in this message
  disable-web-page-preview = false
  # Sends the message silently. iOS users will not receive a notification, Android users will receive a notification with no sound.
  disable-notification = false
  # If true the all alerts will be sent to Telegram
  # without explicitly marking them in the TICKscript.
  global = false
  # Only applies if global is true.
  # Sets all alerts in state-changes-only mode,
  # meaning alerts will only be sent if the alert state changes.
  state-changes-only = false

[hipchat]
  # Configure HipChat.
  enabled = false
  # The HipChat API URL. Replace subdomain with your
  # HipChat subdomain.
  url = "https://subdomain.hipchat.com/v2/room"
  # Visit https://www.hipchat.com/docs/apiv2
  # for information on obtaining your room id and
  # authentication token.
  # Default room for messages
  room = ""
  # Default authentication token
  token = ""
  # If true then all alerts will be sent to HipChat
  # without explicitly marking them in the TICKscript.
  global = false
  # Only applies if global is true.
  # Sets all alerts in state-changes-only mode,
  # meaning alerts will only be sent if the alert state changes.
  state-changes-only = false

[alerta]
  # Configure Alerta.
  enabled = false
  # The Alerta URL.
  url = ""
  # Default authentication token.
  token = ""
  # Default environment.
  environment = ""
  # Default origin.
  origin = "kapacitor"

[sensu]
  # Configure Sensu.
  enabled = false
  # The Sensu Client host:port address.
  addr = "sensu-client:3030"
  # Default JIT source.
  source = "Kapacitor"

[reporting]
  # Send usage statistics
  # every 12 hours to Enterprise.
  enabled = true
  url = "https://usage.influxdata.com"

[stats]
  # Emit internal statistics about Kapacitor.
  # To consume these stats create a stream task
  # that selects data from the configured database
  # and retention policy.
  #
  # Example:
  #  stream.from().database('_kapacitor').retentionPolicy('default')...
  #
  enabled = true
  stats-interval = "10s"
  database = "_kapacitor"
  retention-policy= "autogen"

[udf]
# Configuration for UDFs (User Defined Functions)
[udf.functions]
    # Example go UDF.
    # First compile example:
    #   go build -o avg_udf ./udf/agent/examples/moving_avg.go
    #
    # Use in TICKscript like:
    #   stream.goavg()
    #           .field('value')
    #           .size(10)
    #           .as('m_average')
    #
    # uncomment to enable
    #[udf.functions.goavg]
    #   prog = "./avg_udf"
    #   args = []
    #   timeout = "10s"

    # Example python UDF.
    # Use in TICKscript like:
    #   stream.pyavg()
    #           .field('value')
    #           .size(10)
    #           .as('m_average')
    #
    # uncomment to enable
    #[udf.functions.pyavg]
    #   prog = "/usr/bin/python2"
    #   args = ["-u", "./udf/agent/examples/moving_avg.py"]
    #   timeout = "10s"
    #   [udf.functions.pyavg.env]
    #       PYTHONPATH = "./udf/agent/py"

    # Example UDF over a socket
    #[udf.functions.myCustomUDF]
    #   socket = "/path/to/socket"
    #   timeout = "10s"

[talk]
  # Configure Talk.
  enabled = false
  # The Talk webhook URL.
  url = "https://jianliao.com/v2/services/webhook/uuid"
  # The default authorName.
  author_name = "Kapacitor"

##################################
# Input Methods, same as InfluxDB
#

[collectd]
  enabled = false
  bind-address = ":25826"
  database = "collectd"
  retention-policy = ""
  batch-size = 1000
  batch-pending = 5
  batch-timeout = "10s"
  typesdb = "/usr/share/collectd/types.db"

[opentsdb]
  enabled = false
  bind-address = ":4242"
  database = "opentsdb"
  retention-policy = ""
  consistency-level = "one"
  tls-enabled = false
  certificate = "/etc/ssl/influxdb.pem"
  batch-size = 1000
  batch-pending = 5
  batch-timeout = "1s"
#2

So it seems that the issue isn’t so much creating the recording; it’s that kapacitor itself has hung.

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                                                                                                             
24929 kapacit+  20   0  653196  31888   6296 S 102.3  0.8 132:09.41 kapacitord 

Attempting to do something like listing all tasks results in the same connection reset by peer message. Based on timing, I suspect my attempt to record a 24-hour period overloaded something…

#3

It seems that the method I’m using to create the recordings is causing kapacitor to hang

Even for a window as short as 10 minutes it still locks up and freezes…