I have a TICKscript that runs a UDF based on a batch query and writes to another InfluxDB measurement via stream to influxDBOut()
. It all works as expected the first time the TICKscript runs. Subsequent executions run the UDF (which we can verify through logging) but do not write to the measurement. Removing the UDF results in the script working as expected (future batches get written to measurement2
)
For reference, here’s the TICKscript:
batch
|query('''SELECT * FROM "database"."autogen"."measurement1"''')
.every(10s)
.period(10s)
@mirror()
|influxDBOut()
.database('database')
.retentionPolicy('autogen')
.measurement('measurement2')
I’ve tried various modifications to the TICKscript (every()
, period()
, align()
, precision()
, etc.) that had no effect on this issue. I can recreate the “first time success” by disabling/re-enabling the TICKscript.
To try to simplify things, I replaced my UDF with one that (I think) should just mirror points from the input batch out to the output stream. Here’s that UDF:
package main
import (
"log"
"os"
"github.com/influxdata/kapacitor/udf/agent"
)
type mirrorHandler struct {
agent *agent.Agent
begin *agent.BeginBatch
}
func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
return &mirrorHandler{agent: agent}
}
func main() {
a := agent.New(os.Stdin, os.Stdout)
m := newMirrorHandler(a)
a.Handler = m
log.Println("Starting agent")
a.Start()
err := a.Wait()
if err != nil {
log.Fatal(err)
}
}
func (m *mirrorHandler) Point(p *agent.Point) error {
m.agent.Responses <- &agent.Response{
Message: &agent.Response_Point{
Point: p,
},
}
return nil
}
// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
info := &agent.InfoResponse{
Wants: agent.EdgeType_BATCH,
Provides: agent.EdgeType_STREAM,
Options: map[string]*agent.OptionInfo{},
}
return info, nil
}
// Initialze the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
init := &agent.InitResponse{
Success: true,
Error: "",
}
return init, nil
}
// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
return &agent.SnapshotResponse{}, nil
}
// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
return &agent.RestoreResponse{
Success: true,
}, nil
}
// Start working with the next batch
func (m *mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
m.begin = begin
return nil
}
func (m *mirrorHandler) EndBatch(end *agent.EndBatch) error {
m.agent.Responses <- &agent.Response{
Message: &agent.Response_Begin{
Begin: m.begin,
},
}
return nil
}
// Stop the handler gracefully.
func (m *mirrorHandler) Stop() {
close(m.agent.Responses)
}
kapacitor show mirror
proves that further batches/points do not make it to influxDBOut()
:
DOT:
digraph mirror {
graph [throughput="0.00 batches/s"];
query1 [avg_exec_time_ns="2.013993ms" batches_queried="3" errors="0" points_queried="7" working_cardinality="0" ];
query1 -> mirror2 [processed="3"];
mirror2 [avg_exec_time_ns="914ns" errors="0" working_cardinality="0" ];
mirror2 -> influxdb_out3 [processed="2"];
influxdb_out3 [avg_exec_time_ns="0s" errors="0" points_written="2" working_cardinality="0" write_errors="0" ];
}
Any thoughts or suggestions would be greatly appreciated.