Hello,
I got this query in an old influx that I need to move to FLUX.
It’s used to aggregate the volume on the interfaces.
CREATE CONTINUOUS QUERY CQ_Volume_Day ON Interface BEGIN SELECT sum(Input) AS Input, sum(Output) AS Output INTO Infinity.Volume_Day FROM Infinity.Volume_Hour WHERE time > now() - 1d GROUP BY ID, Port, time(1d) END
Any help to tips is greatly appreciated.
1 Like
Hello @Skywalker,
This would be achieved with a task. Your task would look something like this:
option task = {name: "CQ_Volume_DAy", every: 1d, offset: 5m}
Input = from(bucket: "Interface")
|> range(start: -task.every, stop: now())
|> filter(fn: (r) => r["_measurement"] == "Input")
|> group(columns: ["Port", "ID"], mode:"by")
|> sum()
Output = from(bucket: "Interface")
|> range(start: -task.every, stop: now())
|> filter(fn: (r) => r["_measurement"] == "Output")
|> group(columns: ["Port", "ID"], mode:"by")
|> sum()
Input |> to(bucket: "Volume_day")
Output|> to(bucket: "Volume_day")
You could also split them into two tasks if you want.
I recommend making sure that the output of the Input and Output variables are correct before you execute the task. You can add a yield() function to do so.
Lmk if something isn’t as expected.
Finally this documentation could be useful to you:
Hello @Anaisdg
Thank you kindly for your answer!
Could you please guide me in the right direction on the below as well?
CREATE CONTINUOUS QUERY CQ__Volume_Hour ON Vessel BEGIN SELECT non_negative_derivative(last(Input), 1h) AS Input, non_negative_derivative(last(Output), 1h) AS Output INTO Infinity_Volume_Hour FROM ID WHERE time > now() - 1h GROUP BY ID, Port, time(1h) END
Thanks!
Hello @Skywalker,
Yes you’ll follow a similar approach as above, but you’ll use the derivative function instead:
Hello,
Got this far but get runtime error: to: engine: unknown field type for Input
(Changed measurement name)
option task = {name: "CQ 1 Hour Devices VLAN", every: 1h, offset: 5m}
Input = from(bucket: "VLAN")
|> range(start: -task.every, stop: now())
|> filter(fn: (r) =>
(r["_measurement"] == "Devices"))
|> filter(fn: (r) => r["_field"] == "Input")
|> group(columns: ["Port", "ID"], mode: "by")
|> derivative(unit: 1h, nonNegative: true, timeColumn: "_time")
Output = from(bucket: "VLAN")
|> range(start: -task.every, stop: now())
|> filter(fn: (r) =>
(r["_measurement"] == "Devices"))
|> filter(fn: (r) => r["_field"] == "Output")
|> group(columns: ["Port", "ID"], mode: "by")
|> derivative(unit: 1h, nonNegative: true, timeColumn: "_time")
Input
|> to(bucket: "CQ")
Output
|> to(bucket: "CQ")
Any tips?