Tasks runs quite long and uses a lot of memory

Hi experts,
recently I used my 1st task in influxdb and it was the 1st time I used Flux.
Obviously I miss some experience to set the task up in a proper way.
I noticed the following:

  • The task runs each 5 mins. Usually it runs for 0.7 … 0.8 s (according to the influxdb UI). I feel that’s already quite long, but roughly each hour it runs for 10 … 13 s (!).
  • The memory consumption of the server, which was around 550 MB all the time before increased up to 1 GB since the task runs. And it’s fluctuating.
  • I have installed the same task 1st on kind of a test server, where influxdb and some other tools (grafana, telegraf, traefik) runs in docker containers. Here the task runs only for 0.02 … 0.04 s and I can’t the see the dramatic increase in run time each hour or so. There is no noticeable increase of memory use.
  • The main difference between the main server and the test server is, that the main server has much more historical data.

My idea was that the task only runs ones on historical data when it is started for the 1st time. Afterwards I hoped it will only process new data that has arrived since the last run. I’m using tasks.lastSuccess() function to achieve that (actually ChatGPT recommended it :see_no_evil:).

But I assume that didn’t work out for some reasons and the task is procesing all historic data in each run. Still I do not know, why it sometimes takes even much longer (10 … 20 times longer !).

And here is the task’s code:

import "influxdata/influxdb/tasks"

option task = {name: "calcLevel", every: 5m}

from(bucket: "<bucket-name>")
    |> range(start: tasks.lastSuccess(orTime: 2024-06-01T00:00:00Z))
    |> filter(fn: (r) => r._measurement == "source_measurement" and r._field == "distance")
    |> map(
        fn: (r) =>
            ({r with _time: r._time,
                _measurement: "destination_measurement",
                _field: "level",
                _value:
                    // calculate new "level"-value based on the device that produced the in-value
                    if r.deviceName == "device_1" then
                        163.5 - r._value / 10.0
                    else if r.deviceName == "device_2" then
                        183.5 - r._value / 10.0
                    else if r.deviceName == "device_3" then
                        193.0 - r._value / 10.0
                    else
                        float(v: 0.0),
            }),
    )
    // write new "level"-value to the same bucket
    |> to(bucket: "<bucket-name>")

Can someone tell what I did wrong?
Thank you very much!
Christian

Update:
I changed the line
|> range(start: tasks.lastSuccess(orTime: 2024-06-01T00:00:00Z))
to
|> range(start: tasks.lastSuccess(orTime: 2024-09-01T00:00:00Z))
since old data are already converted, there is no need to do it again.
The execution time went down from 0.7 … 0.8 s to about 0.06 s.
It seems clear to me that the tasks.lastSuccess() does not work as I expected and all data is processed in each run of the task.
Though I found kind of a workaround, I have to adjust the orTime variable regularly, let’s say ones a month. That’s what I wanted to avoid by thetasks.lastSuccess() function.

Any advice what I did wrong?

Hello @cortlieb,
Yes that’s what lastSuccess should do.
Very strange.
@scott have you seen this before?

Ok @Anaisdg , thank you for your reply!
And from your post I assume that at least I haven’t used the lastSuccess() function in a completely wrong way and theoretically my script should work.

Maybe I can add a bit more of information. Here is a graph of the server’s memory consumption. The red arrow marks the point in time when I activated the script. At the blue arrow I changed the orTime as described above.
Later memory consumption peaks are caused by other activities.
What looks weird to me is the steady increase of consumed memory and the a sharp reduction, almost to the level of consumption before I introduced the task. Then the cycle starts again.

OK, seems that @scott is on vacation.

Maybe I can rephrase my question: is there any documentation about best practices in debugging FLUX scripts? It seems that the lastSuccess function isn’t doing what I expect it to do. It could be a start to somehow print out the result of that function.

Christian

Sorry, I’ve been playing catch-up with a few other projects and haven’t been able to answer sooner. There have been reports of tasks.lastSuccess() not working:

I have also duplicated the behavior locally. I’m guessing this has something to do on the InfluxDB side rather than the Flux side and there haven’t been any issues created about it on the InfluxDB repo, so I went ahead and created one: Flux tasks.lastSuccess function does not return the last successful runtime of a task · Issue #25384 · influxdata/influxdb · GitHub

Hopefully this will get resolved soon.

Ok, great (Ok, I mean not that great). I subscribed to that GitHub issue, hence I think I will be informed when something happens.