Consistent Kapacitor Out Of Memory during Influx write bursts

We’re using Docker image kapacitor:1.3.3-alpine.

We’ve written a TICK script (below) to calculate and store VPD derived from air temperature and relative humidity readings stored to Influx. Vapor Pressure Deficit is a well known transform in environmental data.

In typical usage there’s 10 total data points (5 temperature + 5 humidity) coming in staggered across a 15 second span. That is, it’s a pretty low volume of data and not a high data rate. Watching docker stats for the Kapacitor container shows barely any CPU or memory usage in calculating VPD from the incoming data.

We have an unusual setup in that we a have a second sensor system collecting temperature and humidity but unable to immediately send the readings to Influx. Instead, for the time being, we’re forced to parse some CSV files and periodically “batch“ upload this data to Influx. A batch is typically a week’s worth of data collected at a similar sampling rate to our typical case. The only difference is that it’s 24 sensors instead of 5.

Influx ingests the data that our python script parses from the CSVs without issue. A week’s worth of data is uploaded in about 15 minutes. However, Kapacitor consistently and repeatedly dies during these operations. Using docker stats we can watch the Kapacitor container’s CPU usage spike, and we can watch memory usage steadily climb until an out of memory crash event — in just a few minutes time. The TICK script below is the only task running in this scenario.

After a batch upload, we can fill in the missing VPD data using a replay query (example below). This typically takes only a couple minutes to run. CPU usage spikes and RAM usage climbs to consume much of that available but does not generally crash. That said, memory usage seems to remain at its elevated level after the replay query finishes running.

It stands to reason that during the upload Kapacitor might not be able to keep up. The http subscription from Influx is something of a firehose. However, it seems odd that a replay query shows similar symptoms to the OOM crash scenario. We presume Kapacitor can “adjust the valve” on processing replay query results. Maybe not.

Questions:

  1. What can we glean from this in understanding the underlying architecture of how Kapacitor works?
  2. Is there something wrong in either our TICK script or replay query that is causing memory usage to bloom without release like this? Can we reformulate and optimize this somehow?
  3. Are we seeing a true memory leak? We submitted virtually this same issue to the Kapacitor github project weeks ago but have not received any response.

Thanks for any insight.


kapacitor -url http://example.com:9092 replay-live query -task VPD_Calculation -rec-time -query "SELECT Value,Validity FROM \"xxxxx\".\"autogen\"./^(Air Temperature|Relative Humidity)/ WHERE time >= <start> AND time <= <end> GROUP BY *"

var airtemp = stream
  |from()
    .measurement('Air Temperature')
    .groupBy(*)
    
var humidity = stream
  |from()
    .measurement('Relative Humidity')
    .groupBy(*)

airtemp
  |join(humidity)
      // Provide prefix names for the fields of the data points.
      .as('airtemp', 'humidity')
      // points that are within 30 seconds are considered the same time.
      .tolerance(30s)
      // name the resulting stream
      .streamName('vpd')
  // Both the "Value" fields from each parent have been prefixed
  |eval(lambda: ("airtemp.Value"+273.15) * 1.8,
        lambda: exp( -10440.0/"airtemp_Rankine" + -11.29 + -0.02702*"airtemp_Rankine" + 0.00001289*pow("airtemp_Rankine",2.0) + -0.000000002478*pow("airtemp_Rankine",3.0) + 6.546*log("airtemp_Rankine") ),
        lambda: "vpSat" - ("vpSat" * ("humidity.Value" / 100.0)),
        lambda: "vpd_psi" * 6.894757293,
        lambda: 'kPa',
        lambda: if("airtemp.Validity" == 'Good' AND "humidity.Validity" == 'Good', 'Good', 'Bad'))
     .as('airtemp_Rankine',
         'vpSat',
         'vpd_psi',
         'Value',
         'Units',
         'Validity')
           // Do not store intermediary calculations as fields
           .keep('Value', 'Units', 'Validity')
  |influxDBOut()
     .database('xxxxx')
     .measurement('Vapor Pressure Deficit')