Challenge: Is it possible in Flux to create an exact daily aggregate (in my timezone) for a state change event input timeseries?

Hey there,

I’m currently a bit stuck with writing a Flux query to build an exact time zone based daily aggregate of my sensor state timeseries.

The time series I have is a machine state stream. I receive an event roughly once an hour where the state of my weatherstation is reported. The reported state could be “CLOUDY”, “RAINING” or whatever. In general, beside receiving the state once an hour, I should also receive an event every time the state changes, but there is no guarantee to it and events can be missing, the regular ones as well as the change events.

I can detect missing data via a threshold: If there wasn’t event for more than 90 minutes, I’m sure that something is broken somewhere.

Here is my sample data as CSV:


My goal is to query a daily aggregate of the states of my weather station, so I’d like to know something like “yesterday, there were 4 hours of sun, 12 hours of rain and 8 hours cloudy”.

The following output is expected:

localDate state eventduration
0 2022-04-06 00:00:00+02:00 CLEAR 0 days 01:58:18
1 2022-04-06 00:00:00+02:00 CLOUDY 0 days 01:50:00
2 2022-04-06 00:00:00+02:00 MISSING 0 days 14:59:42
3 2022-04-06 00:00:00+02:00 RAINING 0 days 02:51:25
4 2022-04-06 00:00:00+02:00 SUNNY 0 days 02:20:35
5 2022-04-06 00:00:00+02:00 SNOW 0 days 00:00:00

The problems are:

  1. I want the daily aggregate to be in my timezone (Europe/Berlin)
  2. I want to have a precise result which needs me to include prior date state tracking over midnight. What do I mean by that? It can happen that e.g. 23:30, I receive a “RAINING” event and 00:30, I receive a “SNOW” event. In that case, the daily aggreagte should take 30 minute of raining into account (00:00 - 00:30) along with the other data of that day. In general: A state is valid from report time up until the next event/state is reported. A real “state change” event should arrive, but there is no guarantee to it. The interpolation is: Always keep a state valid up until another one is reported except a threshold of (in my case 90 minutes) is exceeded. In that case: “Change” the event state to MISSING for the sake of daily aggregate computation and then take the event into consideration as any others and taken into account.
  3. The daily aggregation should contain all states. Some of the possible states might not happen on a day. In that case, the state should be included in the daily aggregate with 0 minutes duration. For the sake of this example, “all states” is [“CLOUDY”, “RAINING”, “CLEAR”, “SUNNY”, “MISSING”, “SNOW”] where SNOW is never reported on my sample data but should be part of the result.

Here’s an example image of the input specified here:

And here is a graph of the desired output (Doesn’t look too good with only one day…)

Here is a link of a jupyter notebook where I implemented the logic with Python dataframes and also upload the data as timeseries to a local InfluxDB: timeseriestest/StatusDailyDurationFromChangeEvents.ipynb at main · theodiefenthal/timeseriestest · GitHub

In the end, I’d like to run the “daily aggregate query” once a day as InfluxDB task for the previous day. If there isn’t a (performant) flux way of doing it, I’m going to fallback to python or any other language and schedule a periodic aggregation task. But at best, I’d like to stick in the influx ecosystem and hence formulate my query in Flux.

Do you come up with a flux script which can solve the issues? I’m already stuck with range in the begining of a flux script not being able to start my query always at midnight (my timezone) - 2 hours…