We have couple of airflow jobs that batch process some DataSets. I wanted to use influxDB to monitor for sudden drops in processed records. Airflow DAGs every 10 minutes saves total number of records processed to influxDB metric and I’ve used current_value / movingAvrage(3) to check for sudden drops.
Currently InfluxDB (Version 2.0.0 (c8af0f3)) do not support custom queries for checks so I had to use InfluxDB API to create them.
My problem is that almost all the time check reports wrong status (ok instead of crit). Here is how it looks like:
[Monitoring results]
Here is my query:
from(bucket: "stage_suggestion_validator")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "content_warehouse_suggestions_filtered")
|> filter(fn: (r) => r["_field"] == "TOTAL")
|> filter(fn: (r) => r["metric"] == "row_counter")
|> map(fn: (r) => ({ r with currentVal: float(v: r._value) }))
|> movingAverage(n: 3)
|> map(fn: (r) => ({ r with _value:
if r.currentVal > 0 then
if r._value >0 then r.currentVal / r._value * 100.0
else float(v: 100)
else float(v: 0)}))
|> aggregateWindow(every: 5m, fn: last)
|> yield(name: "last")
And payload for API call
{
"name": "Postman upload test - changed",
"ownerID": "06008edf3a770000",
"orgID": "06008edf53370000",
"query": {
"text": "from(bucket: \"stage_suggestion_validator\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r[\"_measurement\"] == \"content_warehouse_suggestions_filtered\") |> filter(fn: (r) => r[\"_field\"] == \"TOTAL\") |> filter(fn: (r) => r[\"metric\"] == \"row_counter\") |> map(fn: (r) => ({ r with currentVal: float(v: r._value) })) |> movingAverage(n: 3) |> map(fn: (r) => ({ r with _value: if r.currentVal > 0 then if r._value >0 then r.currentVal / r._value * 100.0 else float(v: 100) else float(v: 0)})) |> aggregateWindow(every: 5m, fn: last) |> yield(name: \"last\")",
"editMode": "advanced",
"name": "",
"builderConfig": {
"buckets": [
"stage_suggestion_validator"
],
"tags": [
{
"key": "_measurement",
"values": [
"content_warehouse_suggestions_filtered"
],
"aggregateFunctionType": "filter"
},
{
"key": "_field",
"values": [
"TOTAL"
],
"aggregateFunctionType": "filter"
},
{
"key": "metric",
"values": [
"row_counter"
],
"aggregateFunctionType": "filter"
},
{
"key": "",
"values": [],
"aggregateFunctionType": "filter"
}
],
"functions": [
{
"name": "last"
}
],
"aggregateWindow": {
"period": "1m"
}
}
},
"statusMessageTemplate": "Check: ${ r._check_name } is: ${ r._level }",
"every": "5m",
"offset": "0s",
"tags": [],
"createdAt": "2020-07-16T15:06:48.874169659Z",
"updatedAt": "2020-07-17T00:20:55.25449393Z",
"thresholds": [
{
"allValues": false,
"level": "CRIT",
"value": 90,
"type": "lesser"
}
],
"type": "threshold",
"labels": []
}
Can someone point me to what I’m doing wrong ?