I have a following python script which reads raw data based on filters and then writes/stores the final aggregated data into a new measurement.
I was wondering, whether we can use stream and use the output from an existing stream tickscript for alerting this will save us from additional/unnecessary writes.
Can we use something like query_api.query_stream(query=query) etc. ?
“
from influxdb_client import InfluxDBClient, Point, Dialect, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url=“http://10.132.44.44:8086”, token=“my-token”, org=“my-org”)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
my_org = ""
query= '''
import "date"
from(bucket:"telegraf/autogen")
|> range(start:-2d)
|> filter(fn: (r) =>
r._measurement == "Processor" and
r._field == "Percent_Processor_Time" and
r.instance == "_Total")
|> filter(fn: (r) => exists r._value)
|> aggregateWindow(every: 5m, fn: mean)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> keep(columns: ["_time", "_value", "_field", "_measurement", "host", "instance", "objectname"])
'''
data_frame = query_api.query_data_frame(org=my_org, query=query)
data_frame = data_frame.drop(columns=['result', 'table'])
data_frame = data_frame.set_index("_time")
data_frame = data_frame.tz_convert(None)
write_api.write(bucket="telegraf/autogen", record=data_frame, data_frame_measurement_name='new_flux')
print(data_frame.to_string())
client.__del__()