Tasks works fine when started manually, but not automatically?

Hi,
I’m a newbie, moving from InfluxDb 1.8 to 2.x. So I have to migrate my continuous queries to tasks.
Now I have made up a Flux Task that nicely downsamples my data to hourly or daily level, and writes this to the appropriate bucket.

This works all fine when I start the task by pressing the “Run Task” button in the GUI but not when the task runs according to the schedule. The log show " success" at the scheduled times, so it is run at the appropriate times, but I see no data appearing in the target bucket.

This is the task:

option task = {
    name: "Solar_Hourly",
    every: 1h,
    offset: 5m,
}

from(bucket: "telegraf")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _time: r._time,
            _measurement: "solar_hourly",
            _field: "usage",
        }),
    )
    |> to(
        bucket: "meters_hourly",
        org: "my-org",
    )

In the Influx logs I see no errors or anything that helps me to pinpoint why this doe not work.

Anybody a clue what could be the issue?

Perhaps it helps to show what the result table looks like if I perform this task on the CLI

So if I use the same query as the task to show the table interactively:

from(bucket: "telegraf")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "telegraf")
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _time: r._time,
            _measurement: "solar_hourly",
            _field: "usage",
        }),
    )

This is the result:

Does anybody see something strange here, so why this does not write to a bucket when started automatically?

Anybody out here that can give me tips why this task works perfectly when started manually, but not when it runs as scheduled? I have tried all kinds of variations, but keep having this.
This is driving me crazy!

How are you querying for your data to ensure it’s working as expected? I mean how are you verifying that it works perfectly when started manually? Do you see data in the bucket from those runs? How are you querying for that newly written data? Through the UI?

This is very odd. Can you show me what the logs look like for the task once it’s scheduled?
Are you sure the task is set to active? Instead of inactive?

Yes this is very odd.
To verify if is working I’m querying the bucket (meters_hourly) to which the data is written. I do that using the UI, using this simple query:

from(bucket: "meters_hourly")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "solar_hourly")

When I trigger the task manually I see the data appearing in the bucket immediately. But from the Scheduled runs I don’t see any data.
The log show’s that the automated task have been run with a success result, nicely every hour (+1 minute offset), but only the one I triggered manually (in this example at 22:25) produced data in the bucket. So the task is active and running.

And this is the detail-info if I click the “View Logs” button from an automated run that didn’t result in data written to the bucket (sorry this is a long line):

2022-03-22 22:01:00
Started task from script: "option task = {\n    name: \"SolarHourly\",\n    every: 1h,\n    offset: 1m,\n}\n\ndata = from(bucket: \"telegraf\")\n    |> range(start: -task.every)\n    |> filter(fn: (r) => r[\"measurement\"] == \"solar\")\n    |> filter(fn: (r) => r[\"topic\"] == \"import\")\n\ndata\n    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)\n    |> difference(\n        nonNegative: true,\n        columns: [\"_value\"],\n        keepFirst: false,\n    )\n    |> map(\n        fn: (r) => ({\n            _value: r._value,\n            _field: \"usage\",\n            _time: r._time,\n            _measurement: \"solar_hourly\",\n        }),\n    )\n    |> to(bucket: \"meters_hourly\"\n        //        org: \"my-org\",\n        )"

This is how the task looks like now (I did some small changes compared to the first post):

option task = {
    name: "SolarHourly",
    every: 1h,
    offset: 1m,
}

data = from(bucket: "telegraf")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")

data
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _field: "usage",
            _time: r._time,
            _measurement: "solar_hourly",
        }),
    )
    |> to(
        bucket: "meters_hourly",
        org: "my-org",
    )

And this is the Bucket content (using the simple query from the GUI with a time range of the last 6 hours), only showing data from the manually started tasks, not the scheduled hourly results:

I have same issue. No error on log, manually launched a script it writes to the bucket but not when scheduled.

Did you get some success?

Reply, not really.

I did however seem to find out that it is related to the use of the difference() function.
If I comment that out in the example above, it all does write the data again.

So question is, why that is? Or, how I can rewrite this query not using difference()?

The goal is to calculate the hourly kWh power of my solar panels by calculating the increase of the Kwh-meter every hour.

Yes, seems also to me issues is with difference.

I use same function and… It doesnt write to new bucket. It does write only manually launched from copy and paste inside window editor on source bucket.

We are on same boat: I can Imagine a bug…

Interesting! Wonder if more people have the same issue.
In the mean time, would you know an alternative way to solve this?

I want know an alternative way. But I don’t have it.

We can only hope in moderators / staff.

@Anaisdg can you indeed verify if the difference() function has this weird behavior, when started automatically with a task? Thanks!

@MarcoB I opened an issue on github project. I did read on somewhere that an issue there probably will prioritize the request (or… move to their attention).

If you think, you can upvote it there: Task with difference doesn't write to destination bucket · Issue #23231 · influxdata/influxdb · GitHub

1 Like

Hello,

I am quite new to Influx so I might be away from the solution, but I am going to try anyways.

So, from what I understand, you are trying to create a task that retrieves the hourly kWh power of your solar panels by calculating the difference with respect with the previous hour, isn’t this right?

To do so, you are getting the last hour of data (|> range(start: -task.every)) and then applying an aggregateWindow() that is going to get the last record of your range. The problem is that this will result in only one record, therefore, trying to apply difference after this is going to yield no results, since you only have one record in your stream.

Let me know if this does not solve your problem.

Hi,
That’s a good point!

I actually also was thinking in that direction. It explains why this works manually, as then the task is triggered outside the full-hour, so the task finds 2 results.

Indeed the idea is to calculate the hourly Kwh usage by using the difference between the 2 last counter values from the “current” hour and the previous hour.

The counter is measuring the total generated Kwh usage (not the instant power in Kw). I have the same thing with my watermeter in m3.

So if this is the issue, then the question is how to give the taks the proper range to do this every hour?

I think I figured this out. Still have to check if the values match my expectation, but it does writes the values now into the bucket!

import "experimental"

option task = {
    name: "SolarHourly",
    every: 1h,
    offset: 1m,
}

data = from(bucket: "telegraf")
    |> range(start: experimental.subDuration(d: 2h, from: now()))
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")

data
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _field: "usage",
            _time: r._time,
            _measurement: "solar_hourly",
        }),
    )
    |> to(
        bucket: "meters_hourly",
        org: "my-org",
    )

What do you think, is this the right way?

Yes, now after choosing a range of 2h, when doing the aggregation you will get the last values from the last two hours, being able to calculate te difference between these.

Try that out and let me know if the result is what you expected.

Yes, indeed this could even be simplified by using
range(start: -2h) instead of
range(start: experimental.subDuration(d: 2h, from: now())) which does the same.

So thanks for the tips!

Have the same problem but using the sum function instead

Mmm, after having this running for a couple of weeks I started to notice that sometimes I don’t get an hourly calculation written to the database. So I have to re-open this issue :frowning:

It turns out that this happens when my (water) meter didn’t send an update for more then 1 hour, as there was no usage. Unfortunately, the task then also does not calculate a usage for the first hour that new measurements are registered again.

The reason is that the Task is looking back 2 hours to calculate the difference with the “previous” hour.
However, if there where no measurements in the table 2 hours ago, this results in an empty row for that hour, so the difference between the previous hour yields an empty result.

As an example I have my watermeter reading in the table below (I have offset this to start with “0” for ease or reading).
As you can see (first column) the total utility water meter counter increases from 0 to 54 Liter between 02:00 and 11:00 o’clock. Note that between 04:00 and 06:00 there where no readings.

So in the second column you see that the Hourly task, records 4L usage at 03:00 o’clock (difference between the utility meter starting at 0L at 02:00 and 4L at 03:00 o’ clock), and at 11:00 23L (difference between 31L at 10:00 and 54L at 11:00).

However for 07:00 and 10:00 o’clock there is nothing calculated because the difference function did not see any readings voor resp. 06:00 and 09:00 o’ clock.

In the last column I have shown the result of doing the difference function for the whole day. It then works fine as e.g. at 07:00 it finds the previous row filled at 03:00.

The way to solve this would be to make sure that there is a measurement in the table for every hour. So if there where no measurements during an hour, to just copy the last meter reading from 2 hours ago into the 59’th minute of the last hour. This however feels a bit awkward (to “invent” measurements) and I also wouldn’t know how to write a task that could do that.

So now knowing what the issue is, can anybody find a way run an hourly task to solve this elegantly?