Tasks works fine when started manually, but not automatically?

And this is the detail-info if I click the “View Logs” button from an automated run that didn’t result in data written to the bucket (sorry this is a long line):

2022-03-22 22:01:00
Started task from script: "option task = {\n    name: \"SolarHourly\",\n    every: 1h,\n    offset: 1m,\n}\n\ndata = from(bucket: \"telegraf\")\n    |> range(start: -task.every)\n    |> filter(fn: (r) => r[\"measurement\"] == \"solar\")\n    |> filter(fn: (r) => r[\"topic\"] == \"import\")\n\ndata\n    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)\n    |> difference(\n        nonNegative: true,\n        columns: [\"_value\"],\n        keepFirst: false,\n    )\n    |> map(\n        fn: (r) => ({\n            _value: r._value,\n            _field: \"usage\",\n            _time: r._time,\n            _measurement: \"solar_hourly\",\n        }),\n    )\n    |> to(bucket: \"meters_hourly\"\n        //        org: \"my-org\",\n        )"

This is how the task looks like now (I did some small changes compared to the first post):

option task = {
    name: "SolarHourly",
    every: 1h,
    offset: 1m,
}

data = from(bucket: "telegraf")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")

data
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _field: "usage",
            _time: r._time,
            _measurement: "solar_hourly",
        }),
    )
    |> to(
        bucket: "meters_hourly",
        org: "my-org",
    )

And this is the Bucket content (using the simple query from the GUI with a time range of the last 6 hours), only showing data from the manually started tasks, not the scheduled hourly results:

I have same issue. No error on log, manually launched a script it writes to the bucket but not when scheduled.

Did you get some success?

Reply, not really.

I did however seem to find out that it is related to the use of the difference() function.
If I comment that out in the example above, it all does write the data again.

So question is, why that is? Or, how I can rewrite this query not using difference()?

The goal is to calculate the hourly kWh power of my solar panels by calculating the increase of the Kwh-meter every hour.

Yes, seems also to me issues is with difference.

I use same function and… It doesnt write to new bucket. It does write only manually launched from copy and paste inside window editor on source bucket.

We are on same boat: I can Imagine a bug…

Interesting! Wonder if more people have the same issue.
In the mean time, would you know an alternative way to solve this?

I want know an alternative way. But I don’t have it.

We can only hope in moderators / staff.

@Anaisdg can you indeed verify if the difference() function has this weird behavior, when started automatically with a task? Thanks!

@MarcoB I opened an issue on github project. I did read on somewhere that an issue there probably will prioritize the request (or… move to their attention).

If you think, you can upvote it there: Task with difference doesn't write to destination bucket · Issue #23231 · influxdata/influxdb · GitHub

1 Like

Hello,

I am quite new to Influx so I might be away from the solution, but I am going to try anyways.

So, from what I understand, you are trying to create a task that retrieves the hourly kWh power of your solar panels by calculating the difference with respect with the previous hour, isn’t this right?

To do so, you are getting the last hour of data (|> range(start: -task.every)) and then applying an aggregateWindow() that is going to get the last record of your range. The problem is that this will result in only one record, therefore, trying to apply difference after this is going to yield no results, since you only have one record in your stream.

Let me know if this does not solve your problem.

Hi,
That’s a good point!

I actually also was thinking in that direction. It explains why this works manually, as then the task is triggered outside the full-hour, so the task finds 2 results.

Indeed the idea is to calculate the hourly Kwh usage by using the difference between the 2 last counter values from the “current” hour and the previous hour.

The counter is measuring the total generated Kwh usage (not the instant power in Kw). I have the same thing with my watermeter in m3.

So if this is the issue, then the question is how to give the taks the proper range to do this every hour?

I think I figured this out. Still have to check if the values match my expectation, but it does writes the values now into the bucket!

import "experimental"

option task = {
    name: "SolarHourly",
    every: 1h,
    offset: 1m,
}

data = from(bucket: "telegraf")
    |> range(start: experimental.subDuration(d: 2h, from: now()))
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")

data
    |> aggregateWindow(every: 1h, fn: last, createEmpty: false)
    |> difference(
        nonNegative: true,
        columns: ["_value"],
        keepFirst: false,
    )
    |> map(
        fn: (r) => ({
            _value: r._value,
            _field: "usage",
            _time: r._time,
            _measurement: "solar_hourly",
        }),
    )
    |> to(
        bucket: "meters_hourly",
        org: "my-org",
    )

What do you think, is this the right way?

Yes, now after choosing a range of 2h, when doing the aggregation you will get the last values from the last two hours, being able to calculate te difference between these.

Try that out and let me know if the result is what you expected.

Yes, indeed this could even be simplified by using
range(start: -2h) instead of
range(start: experimental.subDuration(d: 2h, from: now())) which does the same.

So thanks for the tips!

Have the same problem but using the sum function instead

Mmm, after having this running for a couple of weeks I started to notice that sometimes I don’t get an hourly calculation written to the database. So I have to re-open this issue :frowning:

It turns out that this happens when my (water) meter didn’t send an update for more then 1 hour, as there was no usage. Unfortunately, the task then also does not calculate a usage for the first hour that new measurements are registered again.

The reason is that the Task is looking back 2 hours to calculate the difference with the “previous” hour.
However, if there where no measurements in the table 2 hours ago, this results in an empty row for that hour, so the difference between the previous hour yields an empty result.

As an example I have my watermeter reading in the table below (I have offset this to start with “0” for ease or reading).
As you can see (first column) the total utility water meter counter increases from 0 to 54 Liter between 02:00 and 11:00 o’clock. Note that between 04:00 and 06:00 there where no readings.

So in the second column you see that the Hourly task, records 4L usage at 03:00 o’clock (difference between the utility meter starting at 0L at 02:00 and 4L at 03:00 o’ clock), and at 11:00 23L (difference between 31L at 10:00 and 54L at 11:00).

However for 07:00 and 10:00 o’clock there is nothing calculated because the difference function did not see any readings voor resp. 06:00 and 09:00 o’ clock.

In the last column I have shown the result of doing the difference function for the whole day. It then works fine as e.g. at 07:00 it finds the previous row filled at 03:00.

The way to solve this would be to make sure that there is a measurement in the table for every hour. So if there where no measurements during an hour, to just copy the last meter reading from 2 hours ago into the 59’th minute of the last hour. This however feels a bit awkward (to “invent” measurements) and I also wouldn’t know how to write a task that could do that.

So now knowing what the issue is, can anybody find a way run an hourly task to solve this elegantly?

Bump: anybody a suggestion how to solve this?

Then, just increase the range of the query:

data = from(bucket: "telegraf")
    |> range(start: experimental.subDuration(d: 1d, from: now()))
    |> filter(fn: (r) => r["measurement"] == "solar")
    |> filter(fn: (r) => r["topic"] == "import")

Using this, you will be able to collect the last day of data. Then, when you aggregate by hour and apply the difference, it will calculate it even if the time elapsed is more than an hour.

If you want to optimize it, you can also apply tail(n: 2) so it only takes into account the last two entries.

Let me know how this goes.

Hi, thanks for your thoughts.
Yes this is indeed also what I have done now. I have created a task that runs daily to aggregate the last 24 hours. This repairs the “holes”.
That works….until I have a day or week e.g. during vacations when there is no usage during more then 24 hours.

Indeed, querying by time ranges can result inconvenient for certain applications. Nevertheless, you can always query larger ranges of data and keep the last 2 entries using tail(). That way you will make sure that you are retrieving data even after long holidays, while the operations remain optimal since you are working with only 2 records.

If anyone else has any other option, feel free to comment.

Hi,
I have met the same problem. when I ran task manually, would insert data to “control” bucket. There is no data insert into “control” bucket when ran task automatically, but logs record as sucessfully. Below is my task , anyone can help?

option task = { 

  name: "control_power_forever",

  every: 15m,

}

control = from(bucket: "forever")

    |> range(start: -task.every)

    |> filter(fn: (r) => r._measurement == "T3_5s")

    |> filter(fn: (r) =>  r["_field"] == "control_power" and r.type =="2" )

    |> aggregateWindow(every: 15m, fn: last)

    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

    |> group(columns: ["_time"], mode:"by")

    |> unique(column: "control_power")

  

original = () => {

    meter = from(bucket: "forever")

        |> range(start: -task.every)

        |> filter(fn: (r) => r._measurement == "T3_5s")

        |> filter(fn: (r) => r["_field"] == "ac_history_negative_power_in_kwh" and r.type =="2" )

        |> aggregateWindow(every: 15m, fn: last)

        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

        |> difference(columns:["ac_history_negative_power_in_kwh"])

        |> group(columns: ["_time"], mode:"by")

        |> map(fn: (r) => ({ r with ac_history_negative_power_in_kwh: r.ac_history_negative_power_in_kwh * 4.0 }))

        |> sum(column:"ac_history_negative_power_in_kwh")

   ems = from(bucket: "forever")

        |> range(start: -task.every)

        |> filter(fn: (r) => r._measurement == "T1_5s")

        |> filter(fn: (r) => r["_field"] == "ems_history_output_energy" )

        |> aggregateWindow(every: 15m, fn: last)

        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

        |> difference(columns:["ems_history_output_energy"])

        |> group(columns: ["_time"], mode:"by")

        |> map(fn: (r) => ({ r with ems_history_output_energy: r.ems_history_output_energy * 4.0 }))

        |> sum(column:"ems_history_output_energy")

    return join( tables: {meter, ems},on: ["_time"],)

        |> group(columns: ["_time"], mode:"by")

        |> map(fn: (r) => ({ r with originalDemand: r.ems_history_output_energy + r.ac_history_negative_power_in_kwh}))

}

originalDemand = original()

join( tables: {originalDemand, control}, on: ["_time"], )

 |> map(fn: (r) => ({r with max : if ( r.control_power > r.ac_history_negative_power_in_kwh) then (r.originalDemand - r.control_power) else (r.originalDemand - r.ac_history_negative_power_in_kwh)}))

 |> map(fn: (r) => ({r with _value : if r.max > 0 then r.max  else 0.0}))

 |> map(fn: (r) => ({r with _field : "real"}))

 |> drop(columns: ["ammeterId","uuid"])

 |> to(bucket: "control")