Flux join() performance

Good afternoon,

I’m facing performance problems with join() function. First I will explain the code and then we’ll take a look to the problem itself.

I want to take energy data for consumption and production. Once I have that data in variables, I want to perform some operations on that values, as shown in the example code. That operations depends on an if condition, so the final aggregation cannot be done on each single query.

Now, the problem is that the join function is taking too much time, almost the 95% time of the query or more (aproximately). At the moment, I have done this operations that the join function does in my javascript code, and if flux query takes 10s to do the whole process, my javascript code takes less than 2s, querying consumption and production and then perform aggregation calculations.

The think is I want to move all my calculations to flux queries, to avoid operating on lot of data on my backend app.

I will be gratefull if someone can help me with this issue. Thank you in advance

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

start = 2022-01-01T00:00:00Z
stop =  2023-01-01T00:00:00Z
        
consumption = from(bucket: "prodx_test")
  |> range(start: start, stop: stop)
  |> filter(fn: (r) => r["_measurement"] == "energy")
  |> filter(fn: (r) => r["_field"] == "consumedkWh")
  |> filter(fn: (r) => r["deviceId"] == "GREGAL")
  |> filter(fn: (r) => r["period"] == "1h")

production = from(bucket: "prodx_test")
  |> range(start: start, stop: stop)
  |> filter(fn: (r) => r["_measurement"] == "energy")
  |> filter(fn: (r) => r["_field"] == "producedkWh")
  |> filter(fn: (r) => r["deviceId"] == "PV GARBI")
  |> filter(fn: (r) => r["period"] == "1h")

test = join(tables: {consumption: consumption, production: production}, on: ["_time"])
  |> map(fn: (r) => {
  
    coefficient =  0.123
    consumption = r._value_consumption
    production = r._value_production * coefficient

    energyGrid = if consumption - production < 0.0 then 0.0 else consumption - production

    //Response format
    return {
      _measurement: "energy",
      _field: "energyGridkWh",
      _value: energyGrid,
      _start: start,
      _stop: stop,
      _time: r._time,
      _period: "period",
    }
  })
  |> aggregateWindow(every: 1mo, fn: sum)
  |> yield(name: "result")



Hi @Roger_Olive_Delgado

I believe the join function is what is causing the hangup. Maybe instead of a join, do a pivot? Something like this?

import "profiler"

option profiler.enabledProfilers = ["query", "operator"]

start = 2022-01-01T00:00:00Z
stop =  2023-01-01T00:00:00Z
        
from(bucket: "prodx_test")
  |> range(start: start, stop: stop)
  |> filter(fn: (r) => r["_measurement"] == "energy")
  |> filter(fn: (r) => r["_field"] == "consumedkWh")
  |> filter(fn: (r) => r["deviceId"] == "GREGAL") or r["deviceId"] == "PV GARBI")
  |> filter(fn: (r) => r["period"] == "1h")
  |> pivot(rowKey:["_time"], columnKey: ["MeasType"], valueColumn: "_value") // you may need to change this
  |> map(fn: (r) => {
  
    coefficient =  0.123
    consumption = r.GREGAL // you may need to change this
    production = r.PV GARBIN * coefficient // you may need to change this...I think PV_GARBIN (no space in tag name) is better

    energyGrid = if consumption - production < 0.0 then 0.0 else consumption - production

    //Response format
    return {
      _measurement: "energy",
      _field: "energyGridkWh",
      _value: energyGrid,
      _start: start,
      _stop: stop,
      _time: r._time,
      _period: "period",
    }
  })
  |> aggregateWindow(every: 1mo, fn: sum)
  |> yield(name: "result")
1 Like

Good morning @grant1.

Thank you for the support, I have finally solved my problem using group and then pivot function, as you have told me. Now my query performance have been solved.

The problem with join function is that time grows with row count squared. I have found many post that complain about the performance of that join function.

Have a nice day,

Roger

Great, glad that pivot() worked. Can you please mark the above as Solution so that others may find it in the future?

Although join() is for sure not the best in performance, I have found that not always Is better than pivot().

I started usin InfluxDB like 2 years ago and since then my data set has become Huge, the queries that I implemented first eventually became slower, and I had performance issues with my server so, I had to invest some time figuring out how to improve every single one of them. Let me share with you some tweaks that will make you queries blasting fast!

Of course, it also depends on the data scheme:

first filter by _field then by tags, if many of your tags share the same fields or if you have more fields than tags that will narrow down the results better.

I saw that you use the aggregate window was at the end that is a costly resource decision! if you don’t care a lot if or the in-between or more like every single data point, the aggregate windows should be immediate after the filters, basically aggregate window decides which datapoints to get and then you do your transformations with the data that it returns, the way you are doing it is pulling all the datapoints then applying the transformations and at the end sampling down.

And if you ever get in a situation on which you need to do some drop column first to do a pivot, then is most likely that making 2 different queries. and a join will be mor performant, but use the drop after the aggregate window, In general try to do any transformation after aggregate window.
:wink:

2 Likes

Thanks a lot for the solution @grant1. I have applied a group() function just to pivot() results in the same table. I don’t know if pivot() can be applied for results in differents tables, but results applying the above filters appear in different tables.

Thanks a lot for your comments @fercasjr .

The think about aggregation is that I need to perform the calculations for energy balance for every single row. So first, I must calculate energyGrid, for example, and then do the aggregation. If not, the results will not be accurate as the results depends on an if condition.

I will take into account all this recommendations in case I have more issues with performance.

Thank you both.

Well you are correct, that depends on your dataset of course, how precise is precise? it depends on each use case.

I mean your aggregation at the end is1 month, but I don’t know the span between of your data points It probably is the best way if you don’t have a lot of data points.

An example of what I was trying to say. If pull data every 1 second, and I want to analyze something for let’s say 2 weeks, it’s better to down sample like every 10s using mean, (that also smooths some noise In my data).And if the goal is to look for a specific average, as long there is enough data points and the data is spliced the same size chucks, the average of averages will be the same as the average of all data points.

1 Like

Btw pivot() can not be applied across different tables, you need to do some joins or unions first.

1 Like

You are completely right, I see your point now. The span between points is 1h, so from now I think it will work for me. In case the data span gets smaller, I think we should consider the possibility of doing the aggregation before all operations to improve performance.

Thanks a lot for your help again @fercasjr !