Flux help required

Hi,

currently I’m struggling with joining two tables together. Maybe someone has an idea, as I’m driving crazy understanding the possibilities of flux.

Suppose the following two tables:

left = from(bucket: "pon")
   |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
   |> filter(fn: (r) => r._measurement == "zum")
   |> filter(fn: (r) => r._field == "value")
   |> aggregateWindow(every: 1m, fn: mean)
   |> timeShift(duration: -1m)
// don't know if this is needed, but it didn't help anyway
   |> truncateTimeColumn(unit: 1m)

with the following example results

See comment below

and the right table:

right = from(bucket: "pon")
   |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
   |> filter(fn: (r) => r._measurement == "cur")
   |> aggregateWindow(every: 1m, fn: mean)
   |> timeShift(duration: -1m)
   |> truncateTimeColumn(unit: 1m)

with the following example data

But when I join the tables together on tag id and also keep all other column from both tables.

This is my join statement

join.full(
left: left,
right: right,
on: (l, r) => l.id== r.id,
as: (l, r) => ( {l with id: l.id} )
)

This is the first table

@sparkitny Welcome to the influxdata community!

Looking at your tables and code, there are a couple of issues to address:

Issue 1: Column Name Conflicts

Your tables have columns with the same names (_time, _measurement, _field, _value). You need to rename these columns to avoid conflicts.

Issue 2: Join Syntax

The as parameter in your join needs to include all columns you want to keep from both tables.

Here are a few ways to fix your join query:

Method 1: Rename columns before joining to avoid conflicts

left_renamed = from(bucket: "pon")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r._measurement == "zum")
    |> filter(fn: (r) => r._field == "value")
    |> aggregateWindow(every: 1m, fn: mean)
    |> timeShift(duration: -1m)
    |> rename(columns: {_value: "zum_value", _field: "zum_field", _measurement: "zum_measurement"})

right_renamed = from(bucket: "pon")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r._measurement == "cur")
    |> aggregateWindow(every: 1m, fn: mean)
    |> timeShift(duration: -1m)
    |> rename(columns: {_value: "cur_value", _field: "cur_field", _measurement: "cur_measurement"})

// Join with proper handling of all columns
joined = join.full(
    left: left_renamed,
    right: right_renamed,
    on: (l, r) => l.id == r.id and l._time == r._time,
    as: (l, r) => ({
        _time: l._time,
        id: l.id,
        zum_value: l.zum_value,
        cur_value: r.cur_value,
        cid: r.cid,
        com: r.com,
        d1: r.d1,
        ppid: r.ppid
    })
)

Method 2: Using join.inner with better column handling

alternative_join = join.inner(
    left: left_renamed,
    right: right_renamed,
    on: (l, r) => l.id == r.id and l._time == r._time,
    as: (l, r) => ({l with // Start with all left columns
        cur_value: r.cur_value,
        cid: r.cid,
        com: r.com,
        d1: r.d1,
        ppid: r.ppid
    })
)

Method 3: If you need to keep all rows from both tables

full_join_all_columns = join.full(
    left: left_renamed,
    right: right_renamed,
    on: (l, r) => l.id == r.id,
    as: (l, r) => {
        // Handle cases where one side might be empty
        id = if exists l.id then l.id else r.id
        time = if exists l._time then l._time else r._time
        
        return {
            _time: time,
            id: id,
            zum_value: l.zum_value,
            zum_field: l.zum_field,
            zum_measurement: l.zum_measurement,
            cur_value: r.cur_value,
            cur_field: r.cur_field,
            cur_measurement: r.cur_measurement,
            cid: r.cid,
            com: r.com,
            d1: r.d1,
            ppid: r.ppid
        }
    }
)

If you want to filter the result to only show specific columns:

final_result = joined
    |> keep(columns: ["_time", "id", "zum_value", "cur_value", "cid", "com", "d1", "ppid"])

Key points to remember:

  1. Always rename conflicting columns before joining
  2. Include all columns you want to keep in the as parameter
  3. Use both id and _time in your join condition if you want to match on both
  4. Consider using join.inner() if you don’t need rows where there’s no match in one table

Let me know if you need any clarification on these solutions!

1 Like