influxDBOut() only writes the first time TICKscript runs

I have a TICKscript that runs a UDF based on a batch query and writes to another InfluxDB measurement via stream to influxDBOut(). It all works as expected the first time the TICKscript runs. Subsequent executions run the UDF (which we can verify through logging) but do not write to the measurement. Removing the UDF results in the script working as expected (future batches get written to measurement2)

For reference, here’s the TICKscript:

batch
    |query('''SELECT * FROM "database"."autogen"."measurement1"''')
        .every(10s)
        .period(10s)
    @mirror()
    |influxDBOut()
        .database('database')
        .retentionPolicy('autogen')
        .measurement('measurement2')

I’ve tried various modifications to the TICKscript (every(), period(), align(), precision(), etc.) that had no effect on this issue. I can recreate the “first time success” by disabling/re-enabling the TICKscript.

To try to simplify things, I replaced my UDF with one that (I think) should just mirror points from the input batch out to the output stream. Here’s that UDF:

package main

import (
	"log"
	"os"

	"github.com/influxdata/kapacitor/udf/agent"
)

type mirrorHandler struct {
	agent *agent.Agent
	begin *agent.BeginBatch
}

func newMirrorHandler(agent *agent.Agent) *mirrorHandler {
	return &mirrorHandler{agent: agent}
}

func main() {
	a := agent.New(os.Stdin, os.Stdout)
	m := newMirrorHandler(a)
	a.Handler = m

	log.Println("Starting agent")
	a.Start()
	err := a.Wait()
	if err != nil {
		log.Fatal(err)
	}
}

func (m *mirrorHandler) Point(p *agent.Point) error {
	m.agent.Responses <- &agent.Response{
		Message: &agent.Response_Point{
			Point: p,
		},
	}
	return nil
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (*mirrorHandler) Info() (*agent.InfoResponse, error) {
	info := &agent.InfoResponse{
		Wants:    agent.EdgeType_BATCH,
		Provides: agent.EdgeType_STREAM,
		Options:  map[string]*agent.OptionInfo{},
	}
	return info, nil
}

// Initialze the handler based of the provided options.
func (*mirrorHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
	init := &agent.InitResponse{
		Success: true,
		Error:   "",
	}
	return init, nil
}

// Create a snapshot of the running state of the process.
func (*mirrorHandler) Snapshot() (*agent.SnapshotResponse, error) {
	return &agent.SnapshotResponse{}, nil
}

// Restore a previous snapshot.
func (*mirrorHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
	return &agent.RestoreResponse{
		Success: true,
	}, nil
}

// Start working with the next batch
func (m *mirrorHandler) BeginBatch(begin *agent.BeginBatch) error {
	m.begin = begin

	return nil
}

func (m *mirrorHandler) EndBatch(end *agent.EndBatch) error {
	m.agent.Responses <- &agent.Response{
		Message: &agent.Response_Begin{
			Begin: m.begin,
		},
	}
	return nil
}

// Stop the handler gracefully.
func (m *mirrorHandler) Stop() {
	close(m.agent.Responses)
}

kapacitor show mirror proves that further batches/points do not make it to influxDBOut():

DOT:
digraph mirror {
graph [throughput="0.00 batches/s"];

query1 [avg_exec_time_ns="2.013993ms" batches_queried="3" errors="0" points_queried="7" working_cardinality="0" ];
query1 -> mirror2 [processed="3"];

mirror2 [avg_exec_time_ns="914ns" errors="0" working_cardinality="0" ];
mirror2 -> influxdb_out3 [processed="2"];

influxdb_out3 [avg_exec_time_ns="0s" errors="0" points_written="2" working_cardinality="0" write_errors="0" ];
}

Any thoughts or suggestions would be greatly appreciated.

The UDF code has a small error. An EndBatch message is never sent. So Kapacitor never closes out the batch and its never passed on to the InfluxDBOut node.

To fix this update both the BeginBatch and EndBatch methods. Have the BeginBatch method immediately respond with the begin message.

Have the EndBatch method immediately respond with the end message.

func (m *mirrorHandler) EndBatch(end *agent.EndBatch) error {
        // This should be sending an end message, not a begin message.
	m.agent.Responses &lt;- &agent.Response{
		Message: &agent.Response_Begin{
			Begin: m.begin,
		},
	}
	return nil
}

Thanks for the response, Nathaniel. I’m afraid I’m still a bit stuck. I ended up running an InfluxDB client inside my UDF as a very hacky substitute for influxDBOut(). What we’re finding is that the InfluxDB client works as expected for a time, but eventually our UDF gets a keepalive timedout error. Could that be related to the EndBatch() issue?

Is accepting a batch and providing a stream a supported use case for UDFs?

Could you rewrite my EndBatch() function as you describe? What I’ve tried so far is not working.