Hello @bjbezzi,
I’m sorry for the delay.
Unfortunately StateDuration and StateCount don’t pick up from where they left off. I’m not sure how to accomplish this elegantly.
Here is one of coworkers solution for getting the StateCount across task executions. It calculates the Nth time a level == “crit” consecutively across state changes or a “true” Nth time state is reached. It does this by joining the statecount from the last run with the new run and filtering for the last time the statecount was reset. I don’t know if there’s something from this that can be repurposed.
import "influxdata/influxdb/tasks"
import "array"
import "join"
import "date"
today = date.truncate(t: now(), unit: 1d)
cutoff = tasks.lastSuccess(orTime: -6m)
// This will fetch the last status-count for each table, regardless of status
fetch =
from(bucket: "status-count")
|> range(start: -12m)
|> filter(fn: (r) => r["_measurement"] == "statuses_counter")
|> filter(fn: (r) => r["_check_id"] == "09952d758d3d6000")
|> filter(fn: (r) => r["_field"] == "_message")
|> group(
columns: [
// "_level",
"_type",
"device_id",
"topic",
],
)
|> sort(columns: ["_time"])
|> last()
|> keep(columns: ["crit_query_counter", "device_id", "_level"])
|> group()
// A baseline record of count "0" ensures there is a table to pass into join (otherwise it fails)
sampleData = array.from(rows: [{_level: "xxxx", crit_query_counter: "0", device_id: "xxxx"}])
right = union(tables: [sampleData, fetch])
// count the new statuses since last run
left =
from(bucket: "_monitoring")
|> range(start: cutoff)
|> filter(fn: (r) => r["_measurement"] == "statuses")
|> filter(fn: (r) => r["_check_id"] == "09952d758d3d6000")
|> filter(fn: (r) => r["_field"] == "_message")
|> keep(
columns: [
"_measurement",
"_field",
"_value",
"_check_id",
"_check_name",
"_level",
"_source_measurement",
"_start",
"_stop",
"_time",
"_type",
"device_id",
"topic",
],
)
|> group()
|> group(
columns: [
"_type",
"device_id",
"topic",
],
)
|> sort(columns: ["_time"])
|> stateCount(fn: (r) => r._level == "crit", column: "crit_query_counter")
|> group()
// join the tables on device_id and _level, if there's no count in the right table, right record is null
merge =
join.left(
left: left,
right: right,
on: (l, r) => l.device_id == r.device_id and l._level == r._level,
as: (l, r) =>
({
_measurement: l._measurement,
_field: l._field,
_value: l._value,
_check_id: l._check_id,
_check_name: l._check_name,
_level: l._level,
_source_measurement: l._source_measurement,
_start: l._start,
_stop: l._stop,
_time: l._time,
_type: l._type,
crit_query_counter_new: l.crit_query_counter,
crit_query_counter_prev: int(v: r.crit_query_counter),
device_id: l.device_id,
topic: l.topic,
}),
)
|> fill(column: "crit_query_counter_prev", value: 0)
|> group(
columns: [
"_type",
"device_id",
"topic",
],
)
|> sort(columns: ["_time"])
// fetch the record where the stateCount resets (i.e. not CRIT, therefore counter ==-1)
right05 =
merge
|> filter(fn: (r) => r["crit_query_counter_new"] == -1)
|> sort(columns: ["_time"])
|> first()
|> keep(columns: ["_check_id", "device_id", "_time"])
|> group()
// Again, add in dummy row so join doesn't break
sampleData2 = array.from(rows: [{_check_id: "xxxx", device_id: "xxxx", _time: today}])
right2 = union(tables: [sampleData2, right05])
left2 =
merge
|> group()
// Join tables, adding new column with the _time where the stateCount is reset (we don't want to add the rolling stateCount beyond this)
join.left(
left: left2,
right: right2,
on: (l, r) => l.device_id == r.device_id and l._check_id == r._check_id,
as: (l, r) =>
({
_measurement: l._measurement,
_field: l._field,
_value: l._value,
_check_id: l._check_id,
_check_name: l._check_name,
_level: l._level,
_source_measurement: l._source_measurement,
_start: l._start,
_stop: l._stop,
_time: l._time,
_type: l._type,
crit_query_counter_new: l.crit_query_counter_new,
crit_query_counter_prev: l.crit_query_counter_prev,
break_date: r._time, // New column
device_id: l.device_id,
topic: l.topic,
}),
)
|> group(
columns: [
"_type",
"device_id",
"topic",
],
)
// For all records that exist greater than the stateCount reset, DO NOT add the rolling stateCount
|> map(
fn: (r) =>
({r with crit_query_counter:
if r._time >= r.break_date then
string(v: r.crit_query_counter_new)
else
string(v: r.crit_query_counter_new + r.crit_query_counter_prev),
}),
)
|> drop(columns: ["crit_query_counter_new", "crit_query_counter_prev", "break_date"])
|> set(key: "_measurement", value: "statuses_counter")
|> to(bucket: "status-count")
The following Flux query notifies on the Nth time level == “crit” within 1 day (or specified time period):
import "date"
import "influxdata/influxdb/tasks"
today = date.truncate(t: now(), unit: 1d)
cutoff = tasks.lastSuccess(orTime: -1m) // runs every minute
from(bucket: "_monitoring")
|> range(start: today)
|> filter(fn: (r) => r["_measurement"] == "statuses")
|> filter(fn: (r) => r["_check_name"] == "Query Rate Limit Check")
|> filter(fn: (r) => r["_field"] == "_message")
|> filter(fn: (r) => r["_level"] == "crit" and exists r._value)
|> stateCount(fn: (r) => r._level == "crit", column: "crit_query_counter")
|> filter(fn: (r) => r.crit_query_counter % 10 == 0 and r._time >= cutoff)
|> map(fn:(r) => ({ r with crit_query_counter: string(v: r.crit_query_counter)}))
|> map(fn:(r) => ({r with _measurement: "statuses_counter"}))
|> to(bucket: "_monitoring")
Perhaps this last Flux script can be amended to fit your needs. I realize you’re asking for stateDuration not stateCount but I think the challenges overlap because you can’t specify a column for stateCount or stateDurationi to pick up where it left off.