Various questions on Influx tasks

I have a simple measure which contains readings from devices that usually have multiple sensors. For example there is weather station which measures temperature humidity, so I create values where sensor = “weatherstation”, sensor=“temperature” and value=12.2. I know I could have used the sensor name as a value key instead of the value key “value”. I may change that in the future.

Regardless, I wanted to create a task which calculates daily average, min and max values. This is my task:

option task = {name: "Weatherstation daily average temp", every: 1d}
// Data source
data = from(bucket: "mydb")
    |> range(start: -duration(v: int(v: task.every)))
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device"] == "weatherstation")
    |> filter(fn: (r) => r["sensor"] == "temperature")
data
    // Data transformation
    |> aggregateWindow(every: 1d, fn: mean)
    |> rename(columns: {_value: "average"})
    // Data destination
    |> to(bucket: "aggregates")

When this task runs, I get the error:

could not execute task run: runtime error @15:8-15:32: to: table has no _value column

But if I copy the flux statement to the explorer like this:

from(bucket: "mydb")
    |> range(start: -1d)
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device"] == "weatherstation")
    |> filter(fn: (r) => r["sensor"] == "temperature")
    |> aggregateWindow(every: 1d, fn: mean)
    |> rename(columns: {_value: "average"})

I get a record back. And I can see the average field.
Originally I thought that maybe rename cannot be used in the statement in the task, but I don’t understand why not.
Maybe the issues is with my range definition in the task? And rename is failing because nothing is returned by the query above?

OK, I made the task work. After carefully reading the error message, I realized that the issue is in the “to” line and it was missing the declaration of what the value field is. So I added some of that:

option task = {name: "Weatherstation daily average temp", every: 1d}

// Data source
from(bucket: "mydb")
    |> range(start: -duration(v: int(v: task.every)))
    |> filter(fn: (r) => r["_measurement"] == "sensors")
    |> filter(fn: (r) => r["device"] == "weatherstation")
    |> filter(fn: (r) => r["sensor"] == "temperature")
    |> aggregateWindow(every: 1d, fn: mean)
    // Data destination
    |> to(
        bucket: "aggregates",
        tagColumns: ["device", "sensor"],
        fieldFn: (r) => ({"average": r._value}),
    )

I am not sure if it is only me, but every time I read this documentation on flux, if feels like it is only explaining half of the story and the examples are the really simple examples that always leave more questions than answers.

Like what if I have more fields in the source database. Those will all get aggregated and in that case there would be multiple _value fields. How will I refer to those then?

Also the entire triggering is unexplained (or at least I did not find it): if I configure it every 1d, I can see that it appears to trigger at 00:00:00. Is that by accident or be design? Will this ensure that if I want to generate downsampled data and the range is set to range(start: -duration(v: int(v: task.every))) it will only select data for the last day? Otherwise the aggregateWindow function will generate multiple lines and I would have some crap subset record in the bucket. These details are also conveniently skipped in the documentation.

This community is lit, so let me just respond to myself.

I let the task run yesterday on it’s own. Based on the logs, it ran 01:00:00 today night:

Only check the first line, rest was me testing and triggering it manually. I am guessing it is 01:00:00 because the system runs in UTC and I am in CET (GMT+1). So is this good? Is this how it is supposed to happen? Looks like it.

There is a comment on https://docs.influxdata.com/flux/v0.x/stdlib/universe/aggregatewindow/ saying that if you schedule every week it will start on Thursday 00:00:00 UTC because unix time started on Thrursday. Since it mentions 00:00:00 UTC I assume it always locks into UTC timezone.

Before running the job, I deleted all the data from the bucket, so at least I can confirm that only one average is generated by each task, so the task.every duration and task.every aggregateWindow ensures it runs the aggregation for precisely for 1 day (or week, year, etc).

Just keep in mind the timestamp is for the date when the task executed. So these downsampled values have a timestamp of the end of the period.

1 Like

Hello @nygma2004,
Yes you’re correct. You can always elect to “run query” to run it now to test it if you need.
Good find on the to() function here’s the documentation for anyone looking in the future:

Sorry for the delay and nice debugging and thank you for sharing your solution.

Thanks @Anaisdg,

One question remains, actually two:

  1. If I set up the task to run every 1D, I noticed that the jobs start 00:00:00 UTC. But what should I do if I live in the US and I want my tasks to aggregate exactly by local time? I mean it is a question of running the jobs midnight local time and also the aggregateWindow to aggregate daily by local time. I did not find anything related to this issue in the documentation.
  2. I did not find any way of telling aggregateWindow to set the beginning of the period for the _time value. If I aggregate daily, I would have preferred the time to be the beginning of the period and not the end. In the comments on Youtube I was suggested that for the to() function I can specify timeColumn = “_start” which in fact works fine if the range is the same period as the aggregation period. But what if not? Can somehow the time defined with a function?

Best Regards,
Csongor

@nygma2004,
Thanks for clarifying. How are you creating these tasks? In the UI? Is your time set to local or UTC in the UI when you’re creating the task? Do you create the task from the data explorer and then select save as task?

  1. You could use cron scheduling instead which runs on the system time: InfluxDB task configuration options | InfluxDB Cloud Documentation
  2. Please check out these threads there are various solutions: Month aggregateWindow doesn't work with negative offset
    Query to generate windowed aggregates with timestamp at window start and without partial data
    Out of curiosity which video was that?

@Anaisdg,
Thanks a lot, and I will go through these discussions to better understand.

I don’t remember whether the timezone was set to local or UTC. I assume this is the time zone on the same UI screen (top right) correct? As I did not find any such option in the settings. I did not know that has an influence on how the task is configured.

I usually start in the data explorer and put my query together, I change to the query view and just copy and past the flux query and create the task manually. Of course I replace the v.xxx parameters in the range and aggregateWindow.

I document my process and learning as I go on my channel. The comment I was referring to was under my video: Downsample data in InfluxDB 2.x - YouTube

Merry Christmas! :smiley:

Hi @Anaisdg,
a similar discussion is started in github: Cron-based task doesn't run at system time · Issue #20922 · influxdata/influxdb · GitHub
I am also facing this issue. Cron-based task doesn’t run at system time. The first run is at UTC and then the subsequent ones are at LOCAL TIME. If any changes are made to the task query, again the first run is at UTC and then the subsequent ones are at LOCAL TIME. How to resolve this?

@vkhemani,
I’m afraid I don’t have an answer for you. I’d try using the every parameter instead of Cron?