ello
I try to normalized data , per interval of 1 minutes, and use to batch selects , one to get the data, then a second one to compute mean , min, max, stddev
and then via a join, to normalized the data
While I get the data, then the mean, min, max, stddev, despite bunch of tries, the join does not return anything.
the data have two tags, the “stats”, only one (one of the data ones) , and the “on” is done on this tag.
obviously the time stamps do not match , but I though that joining on a tag, was specially done for that purpose.
here is the “show” output
( the script is written in order to be “templated” afterward).
influx@EQLFRCL1PDL201:~$ kapacitor show compute-normalized_data-15m
ID: compute-normalized_data-15m
Error:
Template:
Type: batch
Status: enabled
Executing: true
Created: 13 Apr 18 17:50 CEST
Modified: 13 Apr 18 17:51 CEST
LastEnabled: 13 Apr 18 17:51 CEST
Databases Retention Policies: [“telegraf”.“autogen”]
TICKscript:
//
// tick stcrip to compute signal signature every minute
// var CheckName string
var CheckName = ‘compute_signatures’
// set the vartriggerTupe , in other to tag alerts
// var triggerType string
// var triggerType = ‘st_dev_exceeded vs past period’
// set the db , does not seems to work presently whne used with template
// var db string
var db = ‘telegraf’
// set the retention policy forom source , does not seems to work presently whne used with template
// var rp string
var rp = ‘autogen’
// set the meassurement source , does not seems to work presently whne used with template
// var measurement string
var measurement = ‘mqtt_consumer’
// define teh frist tag, lkely the customer
// var CustTag string
var CustTag = ‘app’
// define teh Obejct tag ( which contains the indicator) , likely sometion like Location-Component-Subcomponat : ex Beijing-ALRT001-ifETH0
// var Object string
var Object = ‘devaddr’
// deine the groupby clause, generaly customer / application
// var groupBy string
var groupBy = [CustTag, Object]
// define the indicator to be used
// var indicator string
var indicator = ‘rssi’
// define the duration of teh perido to be observe , currently an in the past
// var period duration
var period = 15m
var periodstr = ‘15m’
// definet teh periodicity for running this batch : warning , teh more freqeuent it is , the higher load hence cost it generates
// var every duration
var every = 1m
// define the log file
// var LogFile string
var LogFile = ‘/var/log/kapacitor/compute_signature’
var whereFilter = lambda: TRUE
var idVar = CheckName
var idTag = ‘alertID’
var outputDB = ‘chronograf’
var outputRP = ‘autogen’
var indicator_normed = indicator + ‘-normalized’
var last_period_stat = batch
|query(‘SELECT mean(’ + indicator + ‘) AS indicator_mean, stddev(’ + indicator + ‘) AS indicator_stddev, max(’ + indicator + ‘) AS indicator_max , min(’ + indicator + ') AS indicator_min from ’ + db + ‘.’ + rp + ‘.’ + measurement)
.groupBy(Object)
.period(period)
.every(every)
.align()
.offset(-1m)
|log()
var last_period = batch
|query(‘SELECT (’ + indicator + ') as value from ’ + db + ‘.’ + rp + ‘.’ + measurement)
.groupBy(groupBy)
.period(period)
.every(every)
.align()
|log()
// normalize the data
last_period_stat
|join(last_period)
.as(‘last_period_stat’, ‘last_period’)
.on(Object)
|log()
|eval(lambda: abs((float(“last_period.value”) - float(“last_period_stat.indicator_min”)) / (float(“last_period_stat.indicator_max”) - float(“last_period_stat.indicator_min”))))
.as(indicator_normed)
// for debugging purpose, log into /var/log/kapacitor/kapacitor.log
|log()
|influxDBOut()
.create()
.database(db)
.retentionPolicy(rp)
.measurement(measurement)
.tag(‘normalized-period’, periodstr)
DOT:
digraph compute-normalized_data-15m {
graph [throughput=“0.00 batches/s”];
query3 [avg_exec_time_ns=“0s” batches_queried=“1” errors=“0” points_queried=“15” working_cardinality=“0” ];
query3 -> log4 [processed=“1”];
log4 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log4 -> join6 [processed=“1”];
query1 [avg_exec_time_ns=“8.668621ms” batches_queried=“1” errors=“0” points_queried=“1” working_cardinality=“0” ];
query1 -> log2 [processed=“1”];
log2 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log2 -> join6 [processed=“1”];
join6 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
join6 -> log7 [processed=“0”];
log7 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log7 -> eval8 [processed=“0”];
eval8 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
eval8 -> log9 [processed=“0”];
log9 [avg_exec_time_ns=“0s” errors=“0” working_cardinality=“0” ];
log9 -> influxdb_out10 [processed=“0”];
influxdb_out10 [avg_exec_time_ns=“0s” errors=“0” points_written=“0” working_cardinality=“0” write_errors=“0” ];
}
influx@EQLFRCL1PDL201:~$
any thoughts where I’ve missed something ?
thanks in advance
regards
philippe