InfluxDB3 and InfluxQL, calculation by week starting on Monday in Grafana

Hello,
I am testing InfluxDB3 Enterprise (Alpha version) and I am struggling to get the correct calculation of some IOT device metrics through Grafana (and InfluxQL).
I was previously able to get the right calculation in InfluxDB 2, but unfortunately Flux language has been dropped and now I am learning once again from scratch.
Here is the scenario:
I have some Tasmota devices which store historical Energy data values. The metric increases every day/hour/minute based on the Energy consumption of the connected device. Unfortunately, from time to time the metric resets or decreases to a previous stored value, for instance: today the value stored is 100, tomorrow is 102 (2 kWh consumed in that day), the day after for some reasons it decrease or reset.
In Grafana I use the “non_negative_difference” of last value grouped by 1 minute. In this way I can get the energy consumed every minute. But I would like to aggregate by day, week, month…

This is the query:

SELECT non_negative_difference(last("Total")) FROM "tasmota" WHERE ("room"::tag = 'bathroom' AND "device"::tag = 'washer') AND $timeFilter GROUP BY time(1m) fill(none)

How can I get the total energy consumed by week, with week starting from Monday?

I also tried this query:

SELECT non_negative_difference(last("Total")) FROM "tasmota" WHERE ("room"::tag = 'bathroom' AND "device"::tag = 'lavatrice') AND $timeFilter GROUP BY time(1w, 4d) fill(none)

The latter calculates the total weekly energy from Monday, but if for any reason the IOT metric “Total” decreases its value during the week, no value is being shown because of the “non_negative_difference” takes control of calculation.
This happens this week:

Probably I missed something, but with Flux this task was pretty easy to accomplish.
Thank you.

1 Like

Hello @Ricky99,
Can you please share your Flux query?
You might be able to use abs and diff instead?

SELECT sum(ABS(difference(last("Total")))) 
FROM "tasmota" 
WHERE ("room" = 'bathroom' AND "device" = 'washer') 
AND $timeFilter 
GROUP BY time(1w, 4d) fill(none)

Does that work?

I might actually suggest using the python processing engine to do this calculation for you and either storing the results in a new db or just returning the results. For example you could use the

  • On-request: Triggered on a GET or POST request to the bound HTTP API endpoint at /api/v3/engine/<CUSTOM_PATH>.

and then do that calculation or any calculation in python in python.
For example this is what an on request plugin would look like:

import json
import pandas as pd

def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
    """
    Processes IoT energy data using InfluxDB v3.
    Queries raw data and calculates weekly energy consumption using Python.
    """

    for k, v in query_parameters.items():
        influxdb3_local.info(f"query_parameters: {k}={v}")
    for k, v in request_headers.items():
        influxdb3_local.info(f"request_headers: {k}={v}")

    request_data = json.loads(request_body)
    influxdb3_local.info("parsed JSON request body:", request_data)

    # Extract parameters
    room = request_data.get("room", "bathroom")
    device = request_data.get("device", "washer")

    # Step 1: Fetch raw energy data (no aggregation yet)
    influxql_query = f"""
        SELECT LAST("Total") 
        FROM "tasmota" 
        WHERE ("room" = '{room}' AND "device" = '{device}') 
        AND time >= now() - 4w 
        GROUP BY time(1d) fill(none)
    """

    influxdb3_local.info(f"Executing InfluxQL query: {influxql_query}")

    # Execute the query
    raw_data = influxdb3_local.query(influxql_query)

    if not raw_data:
        influxdb3_local.info("No data found for query.")
        return {"status": "no_data", "query": influxql_query, "data": []}

    # Step 2: Convert raw data to a Pandas DataFrame
    df = pd.DataFrame(raw_data)

    # Ensure proper column names
    df.rename(columns={"last": "Total"}, inplace=True)
    
    # Convert time column to datetime format
    df["time"] = pd.to_datetime(df["time"])
    df.sort_values("time", inplace=True)

    influxdb3_local.info(f"Raw data:\n{df}")

    # Step 3: Calculate absolute differences to handle resets
    df["diff"] = df["Total"].diff().abs()

    # Remove NaN values (first row has no difference)
    df.dropna(inplace=True)

    # Step 4: Aggregate by week, starting from Monday
    df["week_start"] = df["time"] - pd.to_timedelta(df["time"].dt.weekday, unit="d")  # Align to Monday
    weekly_consumption = df.groupby("week_start")["diff"].sum().reset_index()

    # Rename for clarity
    weekly_consumption.rename(columns={"week_start": "time", "diff": "energy_consumed"}, inplace=True)

    influxdb3_local.info(f"Processed weekly data:\n{weekly_consumption}")

    # Step 5: Convert to JSON for response
    response_data = weekly_consumption.to_dict(orient="records")

    return {
        "status": "ok",
        "query": influxql_query,
        "data": response_data
    }

And you’d call it like

curl -X POST "http://localhost:8086/api/v3/engine/my-plugin" \
     -H "Content-Type: application/json" \
     -H "Authorization: Bearer YOUR_TOKEN" \
     -d '{
           "room": "bathroom",
           "device": "washer"
         }'

Where the the data portion customizes the request.

You could also make the plugin work on a schedule (like once a week on monday) with a schedule trigger:

And execute the same login and then write the data back to influxdb. and then query it that way.

I haven’t tested the scripts or code above though.

Hello @Anaisdg,
thanks for your reply. I appreciate the effort but the suggested solution by using Python is much complex and way far my current skills, because I cannot programming in Python.
The Flux query (on the InfluxDB2 production node) is the following:

from(bucket: "tasmota")
  |> range(start: 2025-01-31T23:00:00Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "refoss-p11")
  |> filter(fn: (r) => r["_field"] == "Total")
  |> filter(fn: (r) => r["device"] == "heater")
  |> aggregateWindow(every: 30s, fn: last, createEmpty: false, offset: -1s)
  |> difference(nonNegative: true, columns: ["_value"])
  |> aggregateWindow(every: 1w, fn: sum, createEmpty: false, timeSrc: "_start", offset: 4d)

and it produces this output (please focus on the week starting Monday 10th, do not consider week 24/02 because I don’t have in the test node):

The values have been checked through detailed calculation on an Excel spreadsheet and they are relevant.

In the InfluxDB3 Enterprise testing node, with a partially different code compared the one you had suggested (see below), the result is not correct:

The calculation for week starting from Monday 10th is totally wrong.
This is because on Sunday 16th the metric used to calculate the Energy consumed had this drop in value:

This is reason why I should:

a) Consider an interval of 30s in the metric counter, to reduce as much as possible the impact on the calculation due to some events like one described;
b) Calculate the (non negative) difference between 30s interval data;
c) Sum them.

With Flux I am able to do so, essentially because I do two aggregations. With InfluxQL I don’t know, at the moment, how to get this result.
Thank you.

Hello @Ricky99,
I’m not a SQL pro so I’m trying to understand how this can be accomplished in SQL. For what it’s worth I can’t code in python either really, but I use LLMs to write it for me :wink:

Something like this might work?

WITH binned_data AS (
    SELECT 
        DATE_BIN(INTERVAL '30s', _time, TIMESTAMP '2025-01-31 23:00:00Z') AS window_start,
        LAST_VALUE(_value) AS last_value,
        device
    FROM tasmota
    WHERE 
        _measurement = 'refoss-p11' 
        AND _field = 'Total' 
        AND device = 'heater'
        AND _time >= TIMESTAMP '2025-01-31 23:00:00Z'
    GROUP BY window_start, device
),
diff_data AS (
    SELECT 
        window_start, 
        last_value - LAG(last_value) OVER (PARTITION BY device ORDER BY window_start) AS diff
    FROM binned_data
)
SELECT 
    DATE_BIN(INTERVAL '1 week', window_start, TIMESTAMP '2025-01-31 23:00:00Z') AS week_start,
    SUM(diff) AS weekly_total
FROM diff_data
WHERE diff >= 0
GROUP BY week_start
ORDER BY week_start;

But it would be helpful if you could share some of your data and expected output :slight_smile: so that I can use it. Graphs are great, but actual data is better if thats an option.

@Ricky99,
If you want help learning how to leverage an LLM to write python code to execute all sorts of data transformations for you please let me know! The functionality and power of python far exceeds that of sql and flux (and is easier to use and learn than flux imo) A lot of the python processing engine code there is boilerplate anyways.

This is definitely one of those queries that really makes Flux shine. I think the following SQL (not InfluxQL) query should get you close to what you’re looking for:

SELECT
  DATE_BIN(INTERVAL '1 week', time, ('1970-01-01T00:00:00Z'::TIMESTAMP + INTERVAL '4 days')) as _time,
  SUM(diff) as value
FROM (
  SELECT
    time,
    CASE
      WHEN "Total" - LAG("Total") OVER (ORDER BY time) < 0 THEN 0
      ELSE "Total" - LAG("Total") OVER (ORDER BY time)
    END as diff
  FROM (
    SELECT
      DATE_BIN(INTERVAL '30 seconds', time, ('1970-01-01T00:00:00Z'::TIMESTAMP - INTERVAL '1 second')) as time,
      LAST_VALUE("Total") OVER (
        PARTITION BY DATE_BIN(INTERVAL '30 seconds', time, ('1970-01-01T00:00:00Z'::TIMESTAMP - INTERVAL '1 second'))
        ORDER BY time
        RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
      ) as "Total"
    FROM tasmota
    WHERE
      AND device = 'heater'
      AND time >= '2025-01-31T23:00:00Z'
      AND time <= NOW()
  )
)
GROUP BY 1
ORDER BY 1
1 Like

I appreciate your efforts, but too much complex solutions.
In the last ten days, I have searched everywhere for a straightforward solution and only recently I have found that InfluxQL could/should/might manage subqueries: InfluxDB3 Subqueries

So, reading that, I tried to apply to my case with this code:

SELECT SUM(non_negative_difference) FROM (SELECT non_negative_difference(last("Total")) FROM "tasmota" WHERE ("room"::tag = 'bathroom' AND "device"::tag = 'heater') AND $timeFilter GROUP BY time(1h) fill(none)) GROUP BY time(1d)

Well, I have been really happy until yesterday, when I discovered that the code works no more.
In InfluxDB3 Alpha it has worked flawlessly, but after upgrading to Beta version, I get this:

One solution found, working for some days, no more now.
Incredible, unbelievable.
1 star rating for InfluxQL compared than Flux.

Also here:
https://community.grafana.com/t/query-with-multiple-group-by-time/86345/3

Hello @Ricky99,
Can you please share this feedback in Discord? Its a designated place for v3 users.

Hi @Ricky99 – when we moved from Alpha to Beta, we made some big upgrades to the API that were breaking changes to current datasets as we improved how data was stored.

This was a known possibility during the alpha, as we needed those learning to improve the data storage solution, but going forward from the beta into GA, we’ve committed to keeping the API consistent.

I don’t believe your issue in an InfluxQL issue, as we haven’t made changes here for that. You likely just need to reconnect your database, and that might require starting with a wipe of your current data (as the storage format has changed a bit). Again, this won’t happen going forward, an unfortunate but necessary aspect of being in Alpha.

1 Like