Convert to from InfluxQL to FLUX Continuous query

Hello,

I got this query in an old influx that I need to move to FLUX.
It’s used to aggregate the volume on the interfaces.

CREATE CONTINUOUS QUERY CQ_Volume_Day ON Interface BEGIN SELECT sum(Input) AS Input, sum(Output) AS Output INTO Infinity.Volume_Day FROM Infinity.Volume_Hour WHERE time > now() - 1d GROUP BY ID, Port, time(1d) END

Any help to tips is greatly appreciated.

1 Like

Hello @Skywalker,
This would be achieved with a task. Your task would look something like this:

option task = {name: "CQ_Volume_DAy", every: 1d, offset: 5m}

Input =  from(bucket: "Interface")
  |> range(start: -task.every, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "Input")
  |> group(columns: ["Port", "ID"], mode:"by")
  |> sum()

Output =  from(bucket: "Interface")
  |> range(start: -task.every, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "Output")
  |> group(columns: ["Port", "ID"], mode:"by")
  |> sum()

Input |> to(bucket: "Volume_day")
Output|> to(bucket: "Volume_day")

You could also split them into two tasks if you want.
I recommend making sure that the output of the Input and Output variables are correct before you execute the task. You can add a yield() function to do so.

Lmk if something isn’t as expected.

Finally this documentation could be useful to you:

Hello @Anaisdg

Thank you kindly for your answer!
Could you please guide me in the right direction on the below as well?

CREATE CONTINUOUS QUERY CQ__Volume_Hour ON Vessel BEGIN SELECT non_negative_derivative(last(Input), 1h) AS Input, non_negative_derivative(last(Output), 1h) AS Output INTO Infinity_Volume_Hour FROM ID WHERE time > now() - 1h GROUP BY ID, Port, time(1h) END

Thanks!

Hello @Skywalker,
Yes you’ll follow a similar approach as above, but you’ll use the derivative function instead:

Hello,

Got this far but get runtime error: to: engine: unknown field type for Input
(Changed measurement name)

option task = {name: "CQ 1 Hour Devices VLAN", every: 1h, offset: 5m}

Input = from(bucket: "VLAN")
	|> range(start: -task.every, stop: now())
	|> filter(fn: (r) =>
		(r["_measurement"] == "Devices"))
    |> filter(fn: (r) => r["_field"] == "Input")
    |> group(columns: ["Port", "ID"], mode: "by")
    |> derivative(unit: 1h, nonNegative: true, timeColumn: "_time")

Output = from(bucket: "VLAN")
	|> range(start: -task.every, stop: now())
	|> filter(fn: (r) =>
		(r["_measurement"] == "Devices"))
    |> filter(fn: (r) => r["_field"] == "Output")
	|> group(columns: ["Port", "ID"], mode: "by")
	|> derivative(unit: 1h, nonNegative: true, timeColumn: "_time")

Input
	|> to(bucket: "CQ")
Output
	|> to(bucket: "CQ")

Any tips?