I have have continuous data inputs into influx, to simplify it I have a “user” and some “traffic”
Name: adata
┃ index ┃ time ┃ user ┃ traffic┃
┃ 1┃ 1695640504251410944.0000000000┃u1 ┃ 20.0000000000┃
┃ 2┃ 1695640505792459008.0000000000┃u2 ┃ 20.0000000000┃
┃ 3┃ 1695640507544683776.0000000000┃u2 ┃ 21.0000000000┃
┃ 4┃ 1695640509311823872.0000000000┃u1 ┃ 24.0000000000┃
┃ 5┃ 1695640628409903360.0000000000┃u3 ┃ 3.0000000000┃
┃ 6┃ 1695641781005702912.0000000000┃u1 ┃ 24.0000000000┃
now I want to have at the end the comulative sum of “traffic” for each user as a Single entry. For all the time…
so like
u1 - 68
u2 - 41
u3 - 3
to get this I am thinking to use a real time stream processing with flux, which takes data for each 10 seconds and calculates the sum first. But then writes it back as the single entry. So actuall doing an update statement if the entry for user exists, or an insert if the user is not known before.
It does not metter for me where this output goes to. back to influx or sql or whatever…
here is what I tried with sql.out:
from(bucket: “test”) // Replace “your_bucket” with your InfluxDB bucket name
|> range(start: -10s)
|> filter(fn: (r) => r[“_measurement”] == “adata”)
|> group(columns: [“user”])
|> aggregateWindow(every: 10s, fn: sum)
|> map(fn: (r) => ({
user: r.user,
traffic: r._value,
}))
|> sql.to(driverName: “mysql”, dataSourceName: “user:pass@tcp(1.2.3.4:3306)/data”, database:“mysum”)
but this creates new entries in the database each time the flux script is running… I need a single entry at the end… sorrely I can’t create a "REPLACE INTO … " statements in “sql.to”
I mean there should ne some kind of UPDATE function in the flux-output.
Any idea on this ?