Found column "_field" in the group key

Hi!

I have the following FLUX Query running in a Task every minute!
When i run the query with |>yield() in “Explorer” it works as desired.
In a scheduled task i got the following error message:
“could not execute task run: runtime error @64:5-71:3: check: found column “_field” in the group key; experimental.to() expects pivoted data”

Can someone explaine this behaviour?

import "dict"
import "influxdata/influxdb/monitor"

option task = {name: "StatusTaskDict", every: 1m}

check = {
	_check_id: "statusTask_00002",
	_check_name: "StatusTask",
	_type: "custom",
	tags: {},
}
hysteresis = 10
defaultTH = 0.0
thCritDict = [
"t11": 200.0,
"t12": 200.0,
"t13": 200.0,
"t14": 200.0,
"t15": 200.0,
]
thAlarmDict = [
"t11": 100.0,
"t12": 100.0,
"t13": 100.0,
"t14": 100.0,
"t15": 100.0,
]
thWarnDict = [
"t11": 75.0,
"t12": 75.0,
"t13": 75.0,
"t14": 75.0,
"t15": 75.0,
]
statusData = from(bucket: "oceanTest-short")
	|> range(start: 0)
	|> tail(n: hysteresis, offset: 0)
	|> filter(fn: (r) =>
		(r["_measurement"] == "trends"))
	|> filter(fn: (r) =>
		(r["ptid"] == "331"))
	|> drop(columns: ["_start", "_stop"])
	|> group(columns: ["_field", "ptid", "_measurement"])
	|> stateCount(fn: (r) =>
		(r._value >= dict.get(dict: thCritDict, key: r._field, default: defaultTH)), column: "critCount")
	|> stateCount(fn: (r) =>
		(r._value >= dict.get(dict: thAlarmDict, key: r._field, default: defaultTH)), column: "alarmCount")
	|> stateCount(fn: (r) =>
		(r._value >= dict.get(dict: thWarnDict, key: r._field, default: defaultTH)), column: "warnCount")
	|> stateCount(fn: (r) =>
		(r._value < dict.get(dict: thWarnDict, key: r._field, default: defaultTH)), column: "normalCount")
crit = (r) =>
	(r["critCount"] > hysteresis)
alarm = (r) =>
	(r["alarmCount"] > hysteresis)
warn = (r) =>
	(r["warnCount"] > hysteresis)
normal = (r) =>
	(r["normalCount"] > hysteresis)
messageFn = (r) =>
	(if r._level == "crit" then "CRITICAL (${string(v: float(v: r._value))}%)" else if r._level == "warn" then "ALARM (${string(v: float(v: r._value))}%)" else if r._level == "info" then "WARNING (${string(v: float(v: r._value))}%)" else if r._level == "ok" then "NORMAL (${string(v: float(v: r._value))}%)" else "UNDEFINED")

statusData
	|> monitor.check(
		crit: crit,
		warn: alarm,
		info: warn,
		ok: normal,
		messageFn: messageFn,
		data: check,
	)

Hello @wmm,
Looks like monitor.check uses experimental.to() under the hood.
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/experimental/to/
It looks like you need to pivot your data first–probably with a fieldsAsCol() function first before you use stateCount.
https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/influxdb-schema/fieldsascols/

Have you had a chance to look at:

I highly recommend giving it a read if you’re creating custom checks. It outlines how checks use the fieldsAsCol function by default when generating a check through the UI.

Alternatively, you could rename your field form “_field” to something else and it might work? I haven’t tried it.
Let me know if you can pivot your data. If not, I’ll check in with the flux team about not pivoting your data.

Hi @Anaisdg
I cant pivot the data because i want to count the threshold exceedances of every field.
So i have to group my data with the _field key to get a table per _field and to execute the stateCount function on each table.
The hint with renaming _field is a good idea and work now.
Its just strange, that i get 2 _monitoring results:

I also change the dictionary approach to an array like the following:

import "array"

hysteresis = 10
thData = array.from(rows: [{
	f: "t1",
	thW: 75.1,
	thA: 100.1,
	thC: 200.1,
}, {
	f: "t2",
	thW: 75.2,
	thA: 100.2,
	thC: 200.2,
}, {
	f: "t3",
	thW: 75.3,
	thA: 100.3,
	thC: 200.3,
}, {
	f: "t4",
	thW: 75.4,
	thA: 100.4,
	thC: 200.4,
}, {
	f: "t5",
	thW: 75.5,
	thA: 100.5,
	thC: 200.5,
}])
data = from(bucket: "oceanTest-short")
	|> range(start: -24h)
	|> tail(n: hysteresis, offset: 0)
	|> filter(fn: (r) =>
		(r["_measurement"] == "trends"))
	|> filter(fn: (r) =>
		(r["ptid"] == "401"))
    //|> filter(fn: (r) => (r["_field"] == "t99"))
	|> drop(columns: ["_start", "_stop"])
	|> rename(columns: {_field: "f"})
	|> group(columns: ["f", "_measurement"])
statusData = join(tables: {d1: data, d2: thData}, on: ["f"])
    //|> pivot(
    //    rowKey:["_time"],
    //    columnKey: ["_measurement", "f"],
    //    valueColumn: "_value"
    //)
	|> stateCount(fn: (r) =>
		(r._value >= r.thC), column: "critCount")
	|> stateCount(fn: (r) =>
		(r._value >= r.thA), column: "alarmCount")
	|> stateCount(fn: (r) =>
		(r._value >= r.thW), column: "warnCount")
	|> stateCount(fn: (r) =>
		(r._value < r.thW), column: "normalCount")
    |> yield()

The performance with this “array/join” approach is better i think!
I get result tables like this:

Hello @wmm,
I just learned that you can pass options into the monitor.check() function to override the experimental.to() function:

import "influxdata/influxdb/monitor"
option monitor.write = (tables=<-) => tables |> to(bucket: monitor.bucket)

Does that help eliminate the duplicate checks?

Docs on options:

https://docs.influxdata.com/influxdb/v2.0/reference/flux/language/options/