I am building a task in flux that executes every second and will query -1s to find any new data coming into a bucket. If data exists, the task encodes the records into json and send the data out via http post. It essentially just is a task that will notify the http server listening when a new data point comes into a bucket.
here is the flux:
import "json"
import "http"
import "strings"
option task = {name: "Event Monitoring", every: 1s, offset: 1s}
DataArray =
from(bucket: "MyBucket")
|> range(start: -1s)
|> filter(fn: (r) => r._field == "Value" or r._field == "Quality")
|> drop(columns: ["_start", "_stop"])
|> pivot(columnKey: ["_field"], rowKey: ["_time"], valueColumn: "_value")
|> map(fn: (r) => ({r with jsonString: string(v: json.encode(v: r))}))
|> keep(columns: ["jsonString"])
|> yield()
|> findColumn(fn: (key) => true, column: "jsonString")
DataString = strings.joinStr(arr: DataArray, v: ";;;")
http.post(
url: "http://HTTPServer:7777/",
headers: {InfluxServer: "http://MyInfluxServer:8086|MyBucket", y: "b"},
data: bytes(v: DataString),
)
I am running into an issue where the program I have writing data to influx is getting bogged down and not writing the data until after this task has already executed. That issue results in this task missing data and not sending out the updates expected. I need to be able to handle situations like this due to latency and outside factors I cannot control.
Is there a standard way being able to get updates for when a new value is written even if the timestamp on the point is further than 1 second in the past? I would prefer to not push the start time of this query back further because in the majority of cases it would result in resending the same data over and over which is unnecessary network traffic. Any ideas would be appreciated!