Use "markers" to define beg. and end of a period, then do calcs on data within that period

Hi everyone. This question is best explained using a picture…

Below is my hourly electricity usage. The power company does not bill evenly on a monthly basis. For example, the Feb invoice may run from Jan 29 to Feb. 27 (instead of Feb. 1 to Feb. 28).

Question 1: What is the best way to tag the beginning of the period and end of the period? I do not think using regular tags is a good idea because of the eventual high cardinality.
Question 2: Is there a way to define / name each period? In my mockup, I called it Period 02-2022, but it could be anything.
Question 3: Once we resolve 1 & 2 above, I presume there is a way to get the Max value for each period and the total (sum) for each period.

Thank you in advance.

No solution yet, but some more food for thought…

If I had just one beginning and one end, then this would work perfectly, or this approach where there is a boolean field (called ComprON) and then grouping on ComprON. However, since there are 12 period beginnings and 12 period endings every year (representing our 12 bills received), and I plan to store several years’ worth of data, I do not think the above approaches make sense. I should also note that there is a rate per kWh that applies to each period, and this too changes. However, that should be easy enough to store in a new field called “kWh_rate” or whatever.

Hi @grant1 - I’d like to do the same thing. I don’t have a clear solution, but here’s my 2 cents worth . . .

My data is collected by Telegraf so it would be easy to map the timestamp to one of 12 periods and append that as a tag to the measurement. Once in Influx, the Flux query could begin with the current year and match on the tag’s value = the current period which could be setup as a task that is run manually or on a “monthly” schedule.

If done entirely within InFluxdb, variables could be set (manually, perhaps) for, say _periodStart and _periodStop and used in the range statement. Then the task fires using those values.

I’ve tracked usage in a spreadsheet for many, many years and have noticed that the current month’s period may jitter (+/- 1 or 2 days) compared with the prior year’s period. So in my case some amount of manual intervention seems inevitable.

I’ll be interested to know what you implement.

Hi @phill So after sleeping on this, I think creating a separate table with the period name, the period beginning timestamp, the period ending timestamp, and pricing / rate data would be my first task:

BillingPeriod BegOfPeriod EndOfPeriod DemandChargeCapacityRate DemandChargeNonCapacityRate MaxkWDemandRate
01_2022 2022-01-02T00:00:00Z 2022-01-28T23:59:59Z $14.0700 $2.9200 $4.1710
02_2022 2022-01-29T00:00:00Z 2022-03-01T23:59:59Z $14.0700 $2.9200 $4.1710
03_2022 2022-03-02T00:00:00Z 2022-03-31T23:59:59Z $14.0700 $2.9200 $4.1710

Next (and likely with the help of @Anaisdg, @Jay_Clifford and others here), I would do a join or union or pivot or something like that to do some calcs for each billing period, such as the Max value, the total (sum) usage, the calculated cost of each billing period (using the rates from the above table), etc.

Hello @grant1,
I’m a little confused about how you’re determining the periods.
I would assume that every value below 150 is either the start or stop of a new period.
Im surprised to see that one valley in the period of 04-2022 is missed or that the next period after that ends with the value at ~240.

I am working on this script:

import "math"
import outer "join"
data = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "h2o_feet")
  |> filter(fn: (r) => r["_field"] == "water_level")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> limit(n: 400)
//   |> yield(name: "raw")

derr = data 
// unit for the time in between timestamps
|> derivative(unit: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "derr")

derr_shift = derr
|> timeShift(duration: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "derr_shift")


inflection_time = join(tables: {derr: derr, derr_shift: derr_shift}, on: ["_time"])
// change to just one side of the or expression if you want to find a true period in a sin wave
|> filter(fn: (r) => r._value_derr >= 0.0 and r._value_derr_shift <= 0.0 or r._value_derr <= 0.0 and r._value_derr_shift >= 0.0 )
// to account for when the derivative does == 0 and define inflection point from either side of crossing from pos to neg or neg to pos
// also helps us make sure that local max and min are looking right/adequate time between them
|> elapsed(unit: 6m)
|> filter(fn: (r) => r.elapsed > 1)
// |> yield(name: "inflection points")
// add a counter for the inflection points so we can fill previous after subsequent join to track which values are a part of which period
|> map(fn: (r) => ({r with _value: 1.0}))
|> cumulativeSum()
|> keep(columns: ["_time", "_value"])

outer.time(left: data, right: inflection_time, as: (l, r) => ({l with label: r._value}))
|> fill(usePrevious: true)
|> yield(name: "_result")

Which uses

But unfortunately I get an error which is part of this PR

I’m told I can use

|> debug.pass()
To bypass the errors but it's not working. 
:( 

I'll update you as I know.

Okay this worked:

import "math"
import outer "join" 
import "internal/debug"
data = from(bucket: "noaa")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "h2o_feet")
  |> filter(fn: (r) => r["_field"] == "water_level")
  |> filter(fn: (r) => r["location"] == "coyote_creek")
  |> keep(columns: ["_time", "_value"])  
  |> limit(n: 400)
//   |> yield(name: "raw")

derr = data 
// unit for the time in between timestamps
|> derivative(unit: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "derr")

derr_shift = derr
|> timeShift(duration: 6m)
|> keep(columns: ["_time", "_value"])
// |> yield(name: "derr_shift")


inflection_time = join(tables: {derr: derr |> debug.pass(), derr_shift: derr_shift|> debug.pass()} , on: ["_time"])
// change to just one side of the or expression if you want to find a true period in a sin wave
|> filter(fn: (r) => r._value_derr >= 0.0 and r._value_derr_shift <= 0.0 or r._value_derr <= 0.0 and r._value_derr_shift >= 0.0 )
// to account for when the derivative does == 0 and define inflection point from either side of crossing from pos to neg or neg to pos
// also helps us make sure that local max and min are looking right/adequate time between them
|> elapsed(unit: 6m)
|> filter(fn: (r) => r.elapsed > 1)
// |> yield(name: "inflection points")
|> map(fn: (r) => ({r with _value: 1.0}))
|> cumulativeSum()
|> keep(columns: ["_time", "_value"])

outer.time(left: data |> debug.pass(), right: inflection_time |> debug.pass(), as: (l, r) => ({l with label: r._value}), method: "full" )
|> fill(column: "label", usePrevious: true)
|> group(columns: ["label"], mode:"by")
// |> yield(name: "_result")

Maybe you can start by working off of that?
I’m using this dataset

In case you want to try the example for yourself

Basically what i’m doing is:

  • calculating the derivative (in “derr”)
  • shifting the results by one timestamp (“derr_shift”) and joining them
  • filtering for when an inflection point happens (defined as by when the point before is <= 0 and point after >= 0 or the inverse)
  • using the elapse function to account for when the derrivative actually equals 0 (… hmm actually just realized we don’t need to use elapsed here if we change the expression to not = 0 perhaps…I’ll have to play around with it)
  • adding an index to those timestamps where there are inflection points
  • then performing an outer join and filling previous so we can determine which values in our original data are part of which “period”
  • grouping by the label (or index) to verify that our result is as expected

Data before:


Data after:

Hi @grant1 -
A couple more thoughts . . .

I looked at your CSV post - pulling utility co data via Node-RED. Telegraf can do http gets given a URL and massage the data as mentioned above before passing it on to Influx. So if dropping below 150 is indeed a trigger, then Telegraf can test for and mark the measurements accordingly. But, switching horses mid-race may not be something you want to do at this point.

My setup samples the data locally every 30 mins right at the power meter. With solar panels I get their production as well as whether the meter is running forwards or backwards. All that data is window()'ed and sum()'ed up daily. Then a group() and pivot() maneuver the relevant data into position and map() performs the math for several graphics that flow out of all this.

I’m intentionally leaving out actual flux code, instead just outlining the processing flow in case it’s useful to you. Sounds like you’re closing in on a solution!! Good luck.

1 Like

Hi @phill and @Anaisdg! Thank you both, but let me clarify…

So the beg. of period and end of billing period has nothing to do with peaks or valleys in the usage. Those valleys are when our factory was closed. As we are in the USA, you can easily spot the Memorial Day weekend and Fourth of July weekend. The others are just random weekends when we decided to close. Other than that, the factory runs 24 x 7.

This brings us back to the original question of how to define the beg. and end of a billing period, and my best guess at the moment is to create a second table like this (fake data, just for illustration)…

and then use some join function like Anais suggested (unless that is broken at the moment). Even so, I believe their might be another path to getting this. Just need some more sleep.

@grant1 & @Anaisdg -

With a 2nd table as described above, the start/stop range of a period can be looked-up & assigned to variables, say periodStart & periodStop. Then

periodRawData = from (bucket: "RawData")
  |> range (start: periodStart, stop: periodStop)

periodRawData would contain all records within that billing period and can be processed further as need be.

As an aside this video on join cautions that the timestamps of the raw data sets must match down to the nanosecond. There’s a date.truncate() function that can force that, but I’m now thinking this approach makes it harder than it needs to be.

Seems like a 2nd table answers your original Q1, but maybe I’m missing something.

Hello @grant1,
I was able to suppress the warning in the join funcion and discover peaks and valleys and use them to define periods as shown above.
Otherwise if you have the dates predefined in another table, then I’d go with @phill’s suggestion.
You can also extract the start and stop dates with the findRecord() function or findColumn() function and then pass them into the custom function @phill shared with you.

Thanks @Anaisdg

I installed InfluxDB 2.3 on a 2nd PC where I can mess around with this stuff (vs. potentially messing up our production data). I will start on this tonight and report back. I sense a solution is near…