Join in flux. No results

Hi, I’m new to flux, and I want to do this query, but it doesn’t work. I don’t know what I’m wrong about. The result that comes out is “NO RESULTS”:
importada = from(bucket: “energymeter”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == “IMPORTADA”)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) =>
tables
|> integral(unit: 1h)
|> map(fn: (r) => ({ r with _value: r._value})))

exportada = from(bucket: “energymeter”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == “EXPORTADA”)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) =>
tables
|> integral(unit: 1h)
|> map(fn: (r) => ({ r with _value: r._value})))

precioimportado = from(bucket: “energymeter”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == “PRECIO_IMPORTADO”)
|> aggregateWindow(every: 1h, fn: last, createEmpty: false)

precioexportado = from(bucket: “energymeter”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == “PRECIO_EXPORTADO”)
|> aggregateWindow(every: 1h, fn: last, createEmpty: false)

kWhresultantes = join(tables: {key1: importada, key2: exportada}, on: ["_time", “_field”], method: “inner”)
|> map(fn: (r) => ({
_time: r._time,
_value: ((r._value_key1 - r._value_key2)),
}))

precioimp = join(tables: {key3: precioimportado, key4: precioexportado}, on: ["_time", “_field”], method: “inner”)
|> map(fn: (r) => ({
_time: r._time,
_value: ((r._value_key3)),
}))

resultado = join(tables: {key5: kWhresultantes, key6: precioimp}, on: ["_time", “_field”], method: “inner”)
|> map(fn: (r) => ({
_time: r._time,
_value: ((r._value_key5 * r._value_key6)),
}))
|> yield()

Hello @s118,
I like to use multiple yield() statements at several points in my flux query to makes sure I’m returning results.

I also like to use the limit() function to make looking through the results and ensuring that transformations are operating as expected more manageable.

For example I would do:

importada = from(bucket: “energymeter”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == “IMPORTADA”)
|> aggregateWindow(
every: 1h,
fn: (tables=<-, column) =>
tables
|> integral(unit: 1h)
|> map(fn: (r) => ({ r with _value: r._value})))
|> limit(n:5) 
|> yield(name: "importada") 

Hello. I just did the indicated test. Each query gives its result except the last join, which indicates “NO RESULTS”. I attach screenshots of the results of the queries that make up the last join.



Thank you

I am not sure, because i cannot replicate your problem. I think the problem is, that after the first 2 join() you “loose” your “_field” int the map() function. And because you try to join them together on “_field” , which is not present in your Screenshots, you get no result.

Maybe try changing:

to:
resultado = join(tables: {key5: kWhresultantes, key6: precioimp}, on: [“_time”], method: “inner”)

or:
carry your _field in your map() function with: _field = r._field_key(?) //not sure ybout this

As you say, the problem was that it lost the “_field” and tried to join by “_field”. When I stop joining by “_field” the problem has disappeared and I already have results. The problem I have is that I would like to add a “_field” to name the resulting data in grafana. Would this be very difficult?

you can do this with:

map()

|> map(fn: (r) => ({ r with 
        _field: "name",
    }))

set()

|> set(key: "_field", value: "name")

it depends on your use-case i think (performance should be different). But i think with both of these options you get the same name in “_field” in each record

Well, I’m doing something wrong, because in grafana only the fields _value, _field appear, but not “cost_horario”. I attach the full code:

import "math"
importada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "IMPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
  
exportada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "EXPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

precioimportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_IMPORTADO")
  |> filter(fn: (r) => r._field == "precio")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

precioexportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_EXPORTADO")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

kWhresultantes = join(tables: {key1: importada, key2: exportada}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
    _time: r._time,
    _value: ((r._value_key1 - r._value_key2)),
  })) 
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
  
costekWhimportado = join(tables: {key5: kWhresultantes, key6: precioimportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (((r._value_key5 + math.abs(x: r._value_key5))/(2.0 * r._value_key5)) * r._value_key5 * r._value_key6 )
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

costekWhexportado = join(tables: {key7: kWhresultantes, key8: precioexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (((r._value_key7 - math.abs(x: r._value_key7))/(2.0 * r._value_key7)) * r._value_key7 * r._value_key8 )
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])

costehorario = join(tables: {key9: costekWhimportado, key10: costekWhexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({r with
    _time: r._time,
    _field: "coste_horario",
    _value: ((r._value_key9 + r._value_key10)),
  }))
  |> sum(column: "_value")
  |> keep(columns: ["_time", "_field" "_value"])
  |> yield()

EDIT: // → comments look bad in the code block :expressionless:

costehorario = join(tables: {key9: costekWhimportado, key10: costekWhexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({r with
    _time: r._time,
    _field: "coste_horario",
    _value: ((r._value_key9 + r._value_key10)),
  }))
  |> sum(column: "_value")
//if you |> yield() here, then you should see, that there is no "_field" anymore
  |> keep(columns: ["_time", "_field" "_value"])  
//is in your implementation useless, because "_field" doesnt exist here anymore
  |> yield()

sum() is a " aggregate function." it will:

  • Drop all columns that are:
    • not in the group key
    • not the aggregated column

you can change the group key, but i have never done this, so this is a first time for me too:

costehorario = join(tables: {key9: costekWhimportado, key10: costekWhexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({r with
    _time: r._time,
    _field: "coste_horario",
    _value: ((r._value_key9 + r._value_key10)),
  }))
//INSERT THIS HERE
|> group(columns: ["_time", "_field"], mode: "by")
  |> sum(column: "_value")
....

The “group” function creates the correct “fields” in grafana, but the “sum” function does not create a single value. Create a table for each hour, and I want the sum of all the hours.

my bad. With the suggested |> group() function, each _time plus _field makes one table (you can see this in the table number on the left).

So i think the set() function would work best and should give the desired result:

costehorario = join(tables: {key9: costekWhimportado, key10: costekWhexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({r with
    _time: r._time,
    _value: ((r._value_key9 + r._value_key10)),
  }))
  |> sum(column: "_value")
|> set(key: "_field", value: "coste_horario")
  |> yield()

I already tried the “set” function before. In the raw data of influxdb it seems that everything works fine with the “set” function, but after grafana only comes “_value”

When the raw data of influx is correct, maybe it is a problem with grafana?
I am currently not working with grafana so i cannot help you with this problem. Good luck!

I already have the solution. In case anyone is interested, here is the full code:

import "math"
importada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "IMPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
exportada = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "EXPORTADA")
  |> aggregateWindow(
    every: 1h,
    fn: (tables=<-, column) =>
      tables
        |> integral(unit: 1h)
        |> map(fn: (r) => ({ r with _value: r._value})), timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
precioimportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_IMPORTADO")
  |> filter(fn: (r) => r._field == "precio")
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
precioexportado = from(bucket: "energymeter")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "PRECIO_EXPORTADO")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
kWhresultantes = join(tables: {key1: importada, key2: exportada}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
    _time: r._time,
    _value: ((r._value_key1 - r._value_key2)),
  })) 
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
costekWhimportado = join(tables: {key5: kWhresultantes, key6: precioimportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (((r._value_key5 + math.abs(x: r._value_key5))/(2.0 * r._value_key5)) * r._value_key5 * r._value_key6 )
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
costekWhexportado = join(tables: {key7: kWhresultantes, key8: precioexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({
   _time: r._time,
   _value: (((r._value_key7 - math.abs(x: r._value_key7))/(2.0 * r._value_key7)) * r._value_key7 * r._value_key8 )
  })) 
  |> aggregateWindow(every: 1h, fn: last, createEmpty: false, timeSrc: "_start")
  |> keep(columns: ["_time", "_value"])
costehorario = join(tables: {key9: costekWhimportado, key10: costekWhexportado}, on: ["_time"], method: "inner")
  |> map(fn: (r) => ({r with
    _time: r._time,
    _value: ((r._value_key9 + r._value_key10)),
  }))
  |> sum(column: "_value")
  |> map(fn: (r) => ({ r with coste_horario: r._value}))
  |> drop(columns: ["_value"])
  |> yield()

Thank you.

1 Like