Hi, I’m having issues writing joined data to my output bucket. When I run the query without trying to output it to my bucket it works and I see data generated in the data explorer window.
This is my Flux query:
inOctets = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "interfaces" and (r._field == "in-octets"))
|> aggregateWindow(every: 1m, fn: last)
|> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
|> map(fn: (r) => ({
r with inBPS: int(v:r._value) / 60 * 8
}))
ifaceDesc = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "ifaceDesc")
|> filter(fn: (r) => r._field == "description")
|> filter(fn: (r) => r["interface-name"] =~ /.+/)
|> aggregateWindow(every: 1m, fn: last)
join(tables: {inOctets: inOctets, ifaceDesc: ifaceDesc}, on: ["_time", "device", "interface-name"])
|> to(bucket: "bb_interfaces_test_stats_joined", org: "cde")
This is the error message I’m getting only when the “to” statement is there:
no column with label _measurement exists
FYI, “bb_interfaces_test_stats_joined” is a new bucket that I tried recreating but that did not help.
scott
March 12, 2020, 10:34pm
2
@mohsin106 In order to write back to InfluxDB, each point must have a _measurement
column. Before you write the joined tables back to InfluxDB, I’d drop columns you don’t need, the set a new measurement column/value:
join( tables: {inOctets: inOctets, ifaceDesc: ifaceDesc}, on: ["_time", "device", "interface-name"])
|> drop(...)
|> set(key: "_measurement", value: "measurement-name")
|> to(...)
Hi @scott , this is my new Flux query, I’m dropping everything I don’t need prior to the join.
inOctets = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "interfaces" and r._field == "in-octets")
|> aggregateWindow(every: 1m, fn: last)
|> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
|> drop(columns:["oper-status", "path", "_start", "_stop"])
|> map(fn: (r) => ({
_time: r._time,
device: string(v:r.device),
host: string(v:r.host),
"parent-ae-name": string(v:r["parent-ae-name"]),
"interface-name": string(v:r["interface-name"]),
"in-bps": int(v:r._value) / 60 * 8
}))
ifaceDesc = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "ifaceDesc")
|> filter(fn: (r) => r._field == "description")
|> filter(fn: (r) => r["interface-name"] =~ /.+/)
|> aggregateWindow(every: 1m, fn: last)
|> drop(columns:["_start", "_stop", "path", "system_id", "_measurement", "_field"])
join(
tables: {inOctets:inOctets, ifaceDesc:ifaceDesc},
on: ["device", "interface-name", "_time", "host"]
// |> set(key: "_measurement", value: "joined")
// |> to(bucket: "bb_interfaces_test_stats_joined", org: "cde")
)
This runs and joins the data in the explorer window. But as soon as I un-comment the “|>set” line in the join stanza I get the following error:
type error 27:8-27:49: cannot unify object with semantic.array
scott
March 17, 2020, 2:24pm
4
You need to close your join
. It’s missing the right parenthesis )
.
scott
March 17, 2020, 2:26pm
5
Side note, you’re also going to need a _field
column to write back to InfluxDB. Here are the requirements for writing: https://docs.influxdata.com/flux/v0.x/stdlib/influxdata/influxdb/to/
Hi @scott ,
I think I’m making progress but just not getting the data as I want it in my new bucket.
Here is my Flux query that is joining and then apparently inserting data into my “test” bucket:
inOctets = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "interfaces" and r._field == "in-octets")
|> aggregateWindow(every: 1m, fn: last)
|> derivative(unit: 1m, nonNegative: true, columns: ["_value"], timeColumn: "_time")
|> drop(columns:["oper-status", "path", "_start", "_stop"])
|> map(fn: (r) => ({
_time: r._time,
device: string(v:r.device),
host: string(v:r.host),
"parent-ae-name": string(v:r["parent-ae-name"]),
"interface-name": string(v:r["interface-name"]),
"in-bps": int(v:r._value) / 60 * 8
}))
ifaceDesc = from(bucket: "bb_interfaces_test")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "ifaceDesc")
|> filter(fn: (r) => r._field == "description")
|> filter(fn: (r) => r["interface-name"] =~ /.+/)
|> aggregateWindow(every: 1m, fn: last)
|> drop(columns:["_start", "_stop", "path", "system_id", "_measurement"])
join(tables: {inOctets:inOctets, ifaceDesc:ifaceDesc}, on: ["device", "interface-name", "_time", "host"])
|> set(key: "_measurement", value: "interfaces")
|> to(bucket: "test", org: "cde", timeColumn: "_time",
fieldFn: (r) => ({ "in-bps": r["in-bps"], "description": r._value}))
When that query runs I see the following in my explorer window, which is the how I’d like for the data to get saved to my “test” bucket:
Image 1
When I query my “test” bucket I see this:
Image 2
How do I get my “test” bucket to display data like the image 1 from above?
I basically want in-bps and description fields to show together when I query the “test” bucket and I should be able to group by device, interface-name, and parent-ae-name.
Here is an error message I get when I try to group by device from my “test” bucket:
expected string cursor type, got *reads.integerMultiShardArrayCursor