Failing to write joined data to output bucket

Hi, I’m having issues writing joined data to my output bucket. When I run the query without trying to output it to my bucket it works and I see data generated in the data explorer window.

This is my Flux query:

inOctets = from(bucket: "bb_interfaces_test")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "interfaces" and (r._field == "in-octets"))
  |> aggregateWindow(every: 1m, fn: last)
  |> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
  |> map(fn: (r) => ({
      r with inBPS: int(v:r._value) / 60 * 8
  }))

ifaceDesc = from(bucket: "bb_interfaces_test")
	|> range(start: -5m)
	|> filter(fn: (r) => r._measurement == "ifaceDesc")
  	|> filter(fn: (r) => r._field == "description") 
  	|> filter(fn: (r) => r["interface-name"] =~ /.+/) 
    |> aggregateWindow(every: 1m, fn: last)

join(tables: {inOctets: inOctets, ifaceDesc: ifaceDesc}, on: ["_time", "device", "interface-name"])
	|> to(bucket: "bb_interfaces_test_stats_joined", org: "cde")

This is the error message I’m getting only when the “to” statement is there:

no column with label _measurement exists

FYI, “bb_interfaces_test_stats_joined” is a new bucket that I tried recreating but that did not help.

@mohsin106 In order to write back to InfluxDB, each point must have a _measurement column. Before you write the joined tables back to InfluxDB, I’d drop columns you don’t need, the set a new measurement column/value:

join( tables: {inOctets: inOctets, ifaceDesc: ifaceDesc}, on: ["_time", "device", "interface-name"])
  |> drop(...)
  |> set(key: "_measurement", value: "measurement-name")
  |> to(...)

Hi @scott , this is my new Flux query, I’m dropping everything I don’t need prior to the join.

inOctets = from(bucket: "bb_interfaces_test")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "interfaces" and r._field == "in-octets")
  |> aggregateWindow(every: 1m, fn: last)
  |> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
  |> drop(columns:["oper-status", "path", "_start", "_stop"])
  |> map(fn: (r) => ({
      _time: r._time,
      device: string(v:r.device),
      host: string(v:r.host),
      "parent-ae-name": string(v:r["parent-ae-name"]),
      "interface-name": string(v:r["interface-name"]),
      "in-bps": int(v:r._value) / 60 * 8
    }))

ifaceDesc = from(bucket: "bb_interfaces_test")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "ifaceDesc")
  |> filter(fn: (r) => r._field == "description") 
  |> filter(fn: (r) => r["interface-name"] =~ /.+/) 
  |> aggregateWindow(every: 1m, fn: last)
  |> drop(columns:["_start", "_stop", "path", "system_id", "_measurement", "_field"])

join(
    tables: {inOctets:inOctets, ifaceDesc:ifaceDesc},
    on: ["device", "interface-name", "_time", "host"]
//     |> set(key: "_measurement", value: "joined")
//     |> to(bucket: "bb_interfaces_test_stats_joined", org: "cde")
)

This runs and joins the data in the explorer window. But as soon as I un-comment the “|>set” line in the join stanza I get the following error:

type error 27:8-27:49: cannot unify object with semantic.array

You need to close your join. It’s missing the right parenthesis ).

Side note, you’re also going to need a _field column to write back to InfluxDB. Here are the requirements for writing: https://v2.docs.influxdata.com/v2.0/reference/flux/stdlib/built-in/outputs/to/#output-data-requirements

Hi @scott,

I think I’m making progress but just not getting the data as I want it in my new bucket.

Here is my Flux query that is joining and then apparently inserting data into my “test” bucket:

inOctets = from(bucket: "bb_interfaces_test")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "interfaces" and r._field == "in-octets")
  |> aggregateWindow(every: 1m, fn: last)
  |> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
  |> drop(columns:["oper-status", "path", "_start", "_stop"])
  |> map(fn: (r) => ({
      _time: r._time,
      device: string(v:r.device),
      host: string(v:r.host),
      "parent-ae-name": string(v:r["parent-ae-name"]),
      "interface-name": string(v:r["interface-name"]),
      "in-bps": int(v:r._value) / 60 * 8
    }))

ifaceDesc = from(bucket: "bb_interfaces_test")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "ifaceDesc")
  |> filter(fn: (r) => r._field == "description") 
  |> filter(fn: (r) => r["interface-name"] =~ /.+/) 
  |> aggregateWindow(every: 1m, fn: last)
  |> drop(columns:["_start", "_stop", "path", "system_id", "_measurement"])

join(tables: {inOctets:inOctets, ifaceDesc:ifaceDesc}, on: ["device", "interface-name", "_time", "host"])
  |> set(key: "_measurement", value: "interfaces")
  |> to(bucket: "test", org: "cde", timeColumn: "_time",
        fieldFn: (r) => ({ "in-bps": r["in-bps"], "description": r._value}))

When that query runs I see the following in my explorer window, which is the how I’d like for the data to get saved to my “test” bucket:

Image 1

When I query my “test” bucket I see this:

Image 2

How do I get my “test” bucket to display data like the image 1 from above?

I basically want in-bps and description fields to show together when I query the “test” bucket and I should be able to group by device, interface-name, and parent-ae-name.

Here is an error message I get when I try to group by device from my “test” bucket:

expected string cursor type, got *reads.integerMultiShardArrayCursor