Problem with sum()

Hello. In a flux query I have two joins, and each returns a value that comes from a sum() (aggregate function). The final result of each join is a number that does not depend on the time variable. The result obtained from each join is correct.
I want to get “type A” / “type B”, but since both are the results of sum() I don’t know how to do it (it can’t be done with the map() function). I leave you a copy of the code, which is missing the final operation

type_A = join(tables: {key1: blacks, key2: whites}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (r._value_key1 + r._value_key2)
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
  |> sum(column: "_value")

type_B = join(tables: {key3: yelow, key4: green}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (r._value_key3 - r._value_key4)
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
  |> sum(column: "_value")

How about something like this:



gen1 = from(bucket: "northbound")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "genData")
  |> filter(fn: (r) => r["_field"] == "fuel")
  |> filter(fn: (r) => r["generatorID"] == "generator1")
   |> filter(fn: (r) => r["host"] == "influxdata-roadshow")
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) 
  |> sum()
  |> yield(name: "gen1")


gen2 = from(bucket: "northbound")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "genData")
  |> filter(fn: (r) => r["_field"] == "fuel")
  |> filter(fn: (r) => r["generatorID"] == "generator2")
     |> filter(fn: (r) => r["host"] == "influxdata-roadshow")
  |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
  |> sum()
  |> yield(name: "gen2")

join(tables: {gen1: gen1, gen2: gen2}, on: ["_measurement"], method: "inner")
|> map(fn: (r) => ({ r with divide: r._value_gen1 / r._value_gen2 }))

The join you propose returns nothing. The query returns yiel(name: “gen1”) from gen1 and yiel(name: “gen2”) from gen2.

Hi @s118,
Not a problem,
Can you send me a screenshot of a yield on both tables you are wanting to merge and divide by?

Better yet if you can export me some of your data i can have a try

My data is all in a bucket (energymeter), there are several measurements (IMPORTADA, EXPORTADA, PRECIO_EXPORTADO), and each measurement has a field that is the same for all measurements (potencia). Attached the complete query.

import "math"
importada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "IMPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

exportada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "EXPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

precioexportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_EXPORTADO")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

kWhresultantes = join(tables: {key1: importada, key2: exportada}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
    _time: r._time,
    _value: ((r._value_key1 - r._value_key2)),
  })) 
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
 
preciokWhexportados = join(tables: {key3: kWhresultantes, key4: precioexportado}, on: ["_time"], method: "inner")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (((r._value_key3 - math.abs(x: r._value_key3))/(2.0 * r._value_key3)) * r._value_key3 * r._value_key4 )
  })) 
  |> sum(column: "_value")
  |> yield(name: "preciokWhexportados") 

kWhexportados = join(tables: {key5: kWhresultantes, key6: precioexportado}, on: ["_time"], method: "inner")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> map(fn: (r) => ({
    _time: r._time,
    _value: (((r._value_key5 - math.abs(x: r._value_key5))/(2.0 * r._value_key5)) * r._value_key5 )
  })) 
  |> sum(column: "_value")
  |> yield(name: "kWhexportados") 

join(tables: {key7: preciokWhexportados, key8: kWhexportados}, on: ["_measurement"], method: "inner")
  |> map(fn: (r) => ({ r with prueba: r._value_key7 / r._value_key8 }))
  |> yield()

The last join, if I cancel the two previous yield(), does not give results (NO RESULTS)

In that screenshot, you sent me you can see the bottom which looks like a download sign and is called CSV. if you can click that it will export the resulting data of your query. If you wouldn’t mind running the following and then using that button to export it:

from(bucket: "energymeter")
  |> range(start: -1d)
 

-10h (it becomes eternal)
2022-05-26_17_13_influxdb_data.gz (2.8 MB)

Hi @s118,
Can you try compressing as a tar.gz. for some reason i can extract the gz

I upload the compressed file again. Now I think it’s correct.
datos.tar.gz (5.0 MB)

Hi @s118,
Still looking at this, sorry I was having issues importing your data but working around it.

Thanks,
Jay

Hi @s118, I must admit I am really struggling on this one also. I have had a good hack it with your data. If anyone can crack it though my colleague @Anaisdg will be able to. There is currently a national holiday in America so hopefully she will get back to you tomorrow if not Wednesday :slight_smile: . I will keep following the thread as I am also curious of the answer to this one.

1 Like

Pretty sure your last “join()” doesn´t work, because you don´t have the “_measurement” column anymore. So you have no column on which you can join the data, so you get “no result”. So you can set “_measurement” as a group key or “carry” “_measurement” in your map() functions so you have it for the last join.

1 Like

Thank you very much Koma. The solution was much simpler than expected. I put the final code in case it helps someone.

import "math"
importada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "IMPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

exportada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "EXPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

precioexportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_EXPORTADO")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

kWhresultantes = join(tables: {key1: importada, key2: exportada}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
    _time: r._time,
    _value: ((r._value_key1 - r._value_key2)),
  })) 
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
 
preciokWhexportados = join(tables: {key3: kWhresultantes, key4: precioexportado}, on: ["_time"], method: "inner")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> map(fn: (r) => ({
   _time: r._time,
   _field: "preciokWhexport",
   _value: if r._value_key3 < 0 then r._value_key3 * r._value_key4 else 0.0
  })) 
  |> sum(column: "_value")
  |> map(fn: (r) => ({_field: "€", _value: r._value
  }))

kWhexportados = join(tables: {key5: kWhresultantes, key6: precioexportado}, on: ["_time"], method: "inner")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> map(fn: (r) => ({
    _time: r._time,
    _field: "kWhexport",
    _value: if r._value_key5 < 0 then r._value_key5 else 0.0
  })) 
  |> sum(column: "_value")
  |> map(fn: (r) => ({_field: "€", _value: r._value
  })) 

join(tables: {key7: preciokWhexportados, key8: kWhexportados}, on: ["_field"], method: "inner")
  |> map(fn: (r) => ({ r with preciomedio_kWhexportados: r._value_key7 / r._value_key8 }))
  |> drop(columns: ["_value_key7", "_value_key8"])
  |> yield()
1 Like