Task to create a specific metric if there is no new data during specified period

I need to monitor if data are being constantly added to one of my buckets, and if there is no data during last 5 minutes for a particular metric, I need to send a specific metric with all tags from this particular metric into another bucket. I.e. generate metric with tag1=“My Tag1”, error=“No data” and current timestamp.

I plan to create a task for that.

Here if what I have so far, but can’t figure out how to generate a metric if my query returns no data, as well as how to do nothing if query returns some data:

import "array"

isEmpty = (tables) => {
	columnsArray = tables
		|> columns()
		|> findColumn(fn: (key) => true, column: "_value")
	return length(arr: columnsArray) == 0
}

myquery =
from(bucket: "MYBUCKET")
  |> range(start: -5m)
  |> filter(fn: (r) =>
    r._field == "MyField" and r.tag1=="MyTag1" and r._measurement=="MyMeasurement"
  )
|> last()

isEmptyQuery = isEmpty(tables: myquery)

if isEmpty then
// here I need to 'return' new metric that will be sent by task
else
// here I should 'return' nothing

How can I achieve that?

Finally solved it this way:

errorquery = array.from(rows: [{tag1 : "MyTag1", Error : "No data", _measurement : "Meas", _field : "Alarm", _value : 1, _time : now()}]) 
// ...
if not isEmpty(tables: myquery) then 
// return nothing - remove all columns from myquery to receive empty
myquery 
|> drop (columns: ["_field", "_value", "tag1", "_measurement"])
else 
// return query to be sent to another bucket
errorquery
// Send to required bucket
// ...

@ebabeshko I’d create a function that checks if the input tables are empty, and if so, returns a default table that you can provide with array.from(). It’d look something like this:

import "array"

isEmpty = (tables=<-) => {
    columnsArray =
        tables
            |> columns()
            |> findColumn(fn: (key) => true, column: "_value")

    return length(arr: columnsArray) == 0
}

outputToInfluxDB = (tables=<-) => {
    inputIsEmpty = isEmpty(tables: tables)
    defaultTable = array.from(rows: [{_measurement: "Error", _time: now(), tag1: "MyTag1", _field: "Error", _value: "No data"}])
    targetBucket = if inputIsEmpty then "ERROR_BUCKET" else "GOOD_BUCKET"
    outputData = if inputIsEmpty then defaultTable else tables

    return outputData |> to(bucket: targetBucket)
}

from(bucket: "MYBUCKET")
    |> range(start: -5m)
    |> filter(fn: (r) => r._field == "MyField" and r.tag1 == "MyTag1" and r._measurement == "MyMeasurement")
    |> last()
    |> outputToInfluxDB()
1 Like