Kapacitor running with high memory and then crash out


#1

Hi,

I am new to Influx and currently using InfluxDB v1.5.3-1 and Kapacitor1.5.0-1. The influxDB is in-taking on average 18,000 points per second in batch mode with batch size of 10,000. And this data is streamed to kapacitor for processing and finally written back the result to influxDB. The problem is when the whole process is started, the memory usage of the kapacitord.exe keep rising, and eventually eat up all the memory until it crash. Could anyone kindly help to give some advice? Thank you.

Below is my tickscript:

dbrp “market”.“autogen”

var database = ‘market’
var period = 80s
var every = 60s

var ticks = stream
|from()
.measurement(‘tick’)
|where(lambda: “last” > 0 AND “bid” > 0 AND “ask” > 0 AND “bid” < “ask”)
|window()
.period(period)
.every(every)
.align()
|groupBy(‘server’, ‘exch’, ‘pair’)

var diff_bid = ticks
|changeDetect(‘bid’)
|difference(‘bid’)
.as(‘value’)
.usePointTimes()

var diff_ask = ticks
|changeDetect(‘ask’)
|difference(‘ask’)
.as(‘value’)
.usePointTimes()

var diff_last = ticks
|changeDetect(‘last’)
|difference(‘last’)
.as(‘value’)
.usePointTimes()

var result = ticks
|join(diff_bid, diff_ask, diff_last)
.as(‘tick’, ‘diff_bid’, ‘diff_ask’, ‘diff_last’)
.fill(‘null’)
|default()
.field(‘diff_bid.value’, 0.0)
.field(‘diff_ask.value’, 0.0)
.field(‘diff_last.value’, 0.0)
|eval(
lambda: “tick.bid”,
lambda: “tick.ask”,
lambda: “tick.last”,
lambda: “bid” - “diff_bid.value”,
lambda: “ask” - “diff_ask.value”,
lambda: “last” - “diff_last.value”,
lambda: max(“bid” - “prev_ask”, 0.0) / “prev_ask” * 100.0,
lambda: max(“last” - “prev_ask”, 0.0) / “prev_ask” * 100.0,
lambda: max(“bid” - “prev_last”, 0.0) / “prev_last” * 100.0,
lambda: min(“ask” - “prev_bid”, 0.0) / “prev_bid” * 100.0,
lambda: min(“last” - “prev_bid”, 0.0) / “prev_bid” * 100.0,
lambda: min(“ask” - “prev_last”, 0.0) / “prev_last” * 100.0,
lambda: (“last” - “prev_last”) / “prev_last” * 100.0
)
.as(‘bid’, ‘ask’, ‘last’,
‘prev_bid’, ‘prev_ask’, ‘prev_last’,
‘long_ask_bid’, ‘long_ask_last’, ‘long_last_bid’,
‘short_bid_ask’, ‘short_bid_last’, ‘short_last_ask’,
‘last_delta’
)
.keep(‘bid’, ‘ask’, ‘last’,
‘prev_bid’, ‘prev_ask’, ‘prev_last’,
‘long_ask_bid’, ‘long_ask_last’, ‘long_last_bid’,
‘short_bid_ask’, ‘short_bid_last’, ‘short_last_ask’,
‘last_delta’
)

result
|influxDBOut()
.database(database)
.measurement(‘result’)
.buffer(10000)